You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
252 lines
5.2 KiB
Go
252 lines
5.2 KiB
Go
package queue
|
|
|
|
import (
|
|
"runtime"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/tal-tech/go-zero/core/logx"
|
|
"github.com/tal-tech/go-zero/core/rescue"
|
|
"github.com/tal-tech/go-zero/core/stat"
|
|
"github.com/tal-tech/go-zero/core/threading"
|
|
"github.com/tal-tech/go-zero/core/timex"
|
|
)
|
|
|
|
const queueName = "queue"
|
|
|
|
type (
|
|
// A Queue is a message queue.
|
|
Queue struct {
|
|
name string
|
|
metrics *stat.Metrics
|
|
producerFactory ProducerFactory
|
|
producerRoutineGroup *threading.RoutineGroup
|
|
consumerFactory ConsumerFactory
|
|
consumerRoutineGroup *threading.RoutineGroup
|
|
producerCount int
|
|
consumerCount int
|
|
active int32
|
|
channel chan string
|
|
quit chan struct{}
|
|
listeners []Listener
|
|
eventLock sync.Mutex
|
|
eventChannels []chan interface{}
|
|
}
|
|
|
|
// A Listener interface represents a listener that can be notified with queue events.
|
|
Listener interface {
|
|
OnPause()
|
|
OnResume()
|
|
}
|
|
|
|
// A Poller interface wraps the method Poll.
|
|
Poller interface {
|
|
Name() string
|
|
Poll() string
|
|
}
|
|
|
|
// A Pusher interface wraps the method Push.
|
|
Pusher interface {
|
|
Name() string
|
|
Push(string) error
|
|
}
|
|
)
|
|
|
|
// NewQueue returns a Queue.
|
|
func NewQueue(producerFactory ProducerFactory, consumerFactory ConsumerFactory) *Queue {
|
|
q := &Queue{
|
|
metrics: stat.NewMetrics(queueName),
|
|
producerFactory: producerFactory,
|
|
producerRoutineGroup: threading.NewRoutineGroup(),
|
|
consumerFactory: consumerFactory,
|
|
consumerRoutineGroup: threading.NewRoutineGroup(),
|
|
producerCount: runtime.NumCPU(),
|
|
consumerCount: runtime.NumCPU() << 1,
|
|
channel: make(chan string),
|
|
quit: make(chan struct{}),
|
|
}
|
|
q.SetName(queueName)
|
|
|
|
return q
|
|
}
|
|
|
|
// AddListener adds a litener to q.
|
|
func (q *Queue) AddListener(listener Listener) {
|
|
q.listeners = append(q.listeners, listener)
|
|
}
|
|
|
|
// Broadcast broadcasts message to all event channels.
|
|
func (q *Queue) Broadcast(message interface{}) {
|
|
go func() {
|
|
q.eventLock.Lock()
|
|
defer q.eventLock.Unlock()
|
|
|
|
for _, channel := range q.eventChannels {
|
|
channel <- message
|
|
}
|
|
}()
|
|
}
|
|
|
|
// SetName sets the name of q.
|
|
func (q *Queue) SetName(name string) {
|
|
q.name = name
|
|
q.metrics.SetName(name)
|
|
}
|
|
|
|
// SetNumConsumer sets the number of consumers.
|
|
func (q *Queue) SetNumConsumer(count int) {
|
|
q.consumerCount = count
|
|
}
|
|
|
|
// SetNumProducer sets the number of producers.
|
|
func (q *Queue) SetNumProducer(count int) {
|
|
q.producerCount = count
|
|
}
|
|
|
|
// Start starts q.
|
|
func (q *Queue) Start() {
|
|
q.startProducers(q.producerCount)
|
|
q.startConsumers(q.consumerCount)
|
|
|
|
q.producerRoutineGroup.Wait()
|
|
close(q.channel)
|
|
q.consumerRoutineGroup.Wait()
|
|
}
|
|
|
|
// Stop stops q.
|
|
func (q *Queue) Stop() {
|
|
close(q.quit)
|
|
}
|
|
|
|
func (q *Queue) consume(eventChan chan interface{}) {
|
|
var consumer Consumer
|
|
|
|
for {
|
|
var err error
|
|
if consumer, err = q.consumerFactory(); err != nil {
|
|
logx.Errorf("Error on creating consumer: %v", err)
|
|
time.Sleep(time.Second)
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case message, ok := <-q.channel:
|
|
if ok {
|
|
q.consumeOne(consumer, message)
|
|
} else {
|
|
logx.Info("Task channel was closed, quitting consumer...")
|
|
return
|
|
}
|
|
case event := <-eventChan:
|
|
consumer.OnEvent(event)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (q *Queue) consumeOne(consumer Consumer, message string) {
|
|
threading.RunSafe(func() {
|
|
startTime := timex.Now()
|
|
defer func() {
|
|
duration := timex.Since(startTime)
|
|
q.metrics.Add(stat.Task{
|
|
Duration: duration,
|
|
})
|
|
logx.WithDuration(duration).Infof("%s", message)
|
|
}()
|
|
|
|
if err := consumer.Consume(message); err != nil {
|
|
logx.Errorf("Error occurred while consuming %v: %v", message, err)
|
|
}
|
|
})
|
|
}
|
|
|
|
func (q *Queue) pause() {
|
|
for _, listener := range q.listeners {
|
|
listener.OnPause()
|
|
}
|
|
}
|
|
|
|
func (q *Queue) produce() {
|
|
var producer Producer
|
|
|
|
for {
|
|
var err error
|
|
if producer, err = q.producerFactory(); err != nil {
|
|
logx.Errorf("Error on creating producer: %v", err)
|
|
time.Sleep(time.Second)
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
atomic.AddInt32(&q.active, 1)
|
|
producer.AddListener(routineListener{
|
|
queue: q,
|
|
})
|
|
|
|
for {
|
|
select {
|
|
case <-q.quit:
|
|
logx.Info("Quitting producer")
|
|
return
|
|
default:
|
|
if v, ok := q.produceOne(producer); ok {
|
|
q.channel <- v
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (q *Queue) produceOne(producer Producer) (string, bool) {
|
|
// avoid panic quit the producer, just log it and continue
|
|
defer rescue.Recover()
|
|
|
|
return producer.Produce()
|
|
}
|
|
|
|
func (q *Queue) resume() {
|
|
for _, listener := range q.listeners {
|
|
listener.OnResume()
|
|
}
|
|
}
|
|
|
|
func (q *Queue) startConsumers(number int) {
|
|
for i := 0; i < number; i++ {
|
|
eventChan := make(chan interface{})
|
|
q.eventLock.Lock()
|
|
q.eventChannels = append(q.eventChannels, eventChan)
|
|
q.eventLock.Unlock()
|
|
q.consumerRoutineGroup.Run(func() {
|
|
q.consume(eventChan)
|
|
})
|
|
}
|
|
}
|
|
|
|
func (q *Queue) startProducers(number int) {
|
|
for i := 0; i < number; i++ {
|
|
q.producerRoutineGroup.Run(func() {
|
|
q.produce()
|
|
})
|
|
}
|
|
}
|
|
|
|
type routineListener struct {
|
|
queue *Queue
|
|
}
|
|
|
|
func (rl routineListener) OnProducerPause() {
|
|
if atomic.AddInt32(&rl.queue.active, -1) <= 0 {
|
|
rl.queue.pause()
|
|
}
|
|
}
|
|
|
|
func (rl routineListener) OnProducerResume() {
|
|
if atomic.AddInt32(&rl.queue.active, 1) == 1 {
|
|
rl.queue.resume()
|
|
}
|
|
}
|