From 6fdee77fa9ff2cf5a69d6a8c7075281b341b850a Mon Sep 17 00:00:00 2001 From: kevin Date: Thu, 13 Aug 2020 17:00:53 +0800 Subject: [PATCH] add queue package --- core/queue/balancedqueuepusher.go | 44 +++++ core/queue/balancedqueuepusher_test.go | 43 +++++ core/queue/consumer.go | 10 ++ core/queue/messagequeue.go | 6 + core/queue/multiqueuepusher.go | 31 ++++ core/queue/multiqueuepusher_test.go | 39 ++++ core/queue/producer.go | 15 ++ core/queue/queue.go | 239 +++++++++++++++++++++++++ core/queue/queue_test.go | 94 ++++++++++ core/queue/util.go | 12 ++ core/queue/util_test.go | 77 ++++++++ 11 files changed, 610 insertions(+) create mode 100644 core/queue/balancedqueuepusher.go create mode 100644 core/queue/balancedqueuepusher_test.go create mode 100644 core/queue/consumer.go create mode 100644 core/queue/messagequeue.go create mode 100644 core/queue/multiqueuepusher.go create mode 100644 core/queue/multiqueuepusher_test.go create mode 100644 core/queue/producer.go create mode 100644 core/queue/queue.go create mode 100644 core/queue/queue_test.go create mode 100644 core/queue/util.go create mode 100644 core/queue/util_test.go diff --git a/core/queue/balancedqueuepusher.go b/core/queue/balancedqueuepusher.go new file mode 100644 index 00000000..1e40cf30 --- /dev/null +++ b/core/queue/balancedqueuepusher.go @@ -0,0 +1,44 @@ +package queue + +import ( + "errors" + "sync/atomic" + + "github.com/tal-tech/go-zero/core/logx" +) + +var ErrNoAvailablePusher = errors.New("no available pusher") + +type BalancedQueuePusher struct { + name string + pushers []Pusher + index uint64 +} + +func NewBalancedQueuePusher(pushers []Pusher) Pusher { + return &BalancedQueuePusher{ + name: generateName(pushers), + pushers: pushers, + } +} + +func (pusher *BalancedQueuePusher) Name() string { + return pusher.name +} + +func (pusher *BalancedQueuePusher) Push(message string) error { + size := len(pusher.pushers) + + for i := 0; i < size; i++ { + index := atomic.AddUint64(&pusher.index, 1) % uint64(size) + target := pusher.pushers[index] + + if err := target.Push(message); err != nil { + logx.Error(err) + } else { + return nil + } + } + + return ErrNoAvailablePusher +} diff --git a/core/queue/balancedqueuepusher_test.go b/core/queue/balancedqueuepusher_test.go new file mode 100644 index 00000000..ff246132 --- /dev/null +++ b/core/queue/balancedqueuepusher_test.go @@ -0,0 +1,43 @@ +package queue + +import ( + "fmt" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBalancedQueuePusher(t *testing.T) { + const numPushers = 100 + var pushers []Pusher + var mockedPushers []*mockedPusher + for i := 0; i < numPushers; i++ { + p := &mockedPusher{ + name: "pusher:" + strconv.Itoa(i), + } + pushers = append(pushers, p) + mockedPushers = append(mockedPushers, p) + } + + pusher := NewBalancedQueuePusher(pushers) + assert.True(t, len(pusher.Name()) > 0) + + for i := 0; i < numPushers*1000; i++ { + assert.Nil(t, pusher.Push("item")) + } + + var counts []int + for _, p := range mockedPushers { + counts = append(counts, p.count) + } + mean := calcMean(counts) + variance := calcVariance(mean, counts) + assert.True(t, variance < 100, fmt.Sprintf("too big variance - %.2f", variance)) +} + +func TestBalancedQueuePusher_NoAvailable(t *testing.T) { + pusher := NewBalancedQueuePusher(nil) + assert.True(t, len(pusher.Name()) == 0) + assert.Equal(t, ErrNoAvailablePusher, pusher.Push("item")) +} diff --git a/core/queue/consumer.go b/core/queue/consumer.go new file mode 100644 index 00000000..8f12d97f --- /dev/null +++ b/core/queue/consumer.go @@ -0,0 +1,10 @@ +package queue + +type ( + Consumer interface { + Consume(string) error + OnEvent(event interface{}) + } + + ConsumerFactory func() (Consumer, error) +) diff --git a/core/queue/messagequeue.go b/core/queue/messagequeue.go new file mode 100644 index 00000000..569ae566 --- /dev/null +++ b/core/queue/messagequeue.go @@ -0,0 +1,6 @@ +package queue + +type MessageQueue interface { + Start() + Stop() +} diff --git a/core/queue/multiqueuepusher.go b/core/queue/multiqueuepusher.go new file mode 100644 index 00000000..dd811dc1 --- /dev/null +++ b/core/queue/multiqueuepusher.go @@ -0,0 +1,31 @@ +package queue + +import "github.com/tal-tech/go-zero/core/errorx" + +type MultiQueuePusher struct { + name string + pushers []Pusher +} + +func NewMultiQueuePusher(pushers []Pusher) Pusher { + return &MultiQueuePusher{ + name: generateName(pushers), + pushers: pushers, + } +} + +func (pusher *MultiQueuePusher) Name() string { + return pusher.name +} + +func (pusher *MultiQueuePusher) Push(message string) error { + var batchError errorx.BatchError + + for _, each := range pusher.pushers { + if err := each.Push(message); err != nil { + batchError.Add(err) + } + } + + return batchError.Err() +} diff --git a/core/queue/multiqueuepusher_test.go b/core/queue/multiqueuepusher_test.go new file mode 100644 index 00000000..5721154f --- /dev/null +++ b/core/queue/multiqueuepusher_test.go @@ -0,0 +1,39 @@ +package queue + +import ( + "fmt" + "math" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMultiQueuePusher(t *testing.T) { + const numPushers = 100 + var pushers []Pusher + var mockedPushers []*mockedPusher + for i := 0; i < numPushers; i++ { + p := &mockedPusher{ + name: "pusher:" + strconv.Itoa(i), + } + pushers = append(pushers, p) + mockedPushers = append(mockedPushers, p) + } + + pusher := NewMultiQueuePusher(pushers) + assert.True(t, len(pusher.Name()) > 0) + + for i := 0; i < 1000; i++ { + _ = pusher.Push("item") + } + + var counts []int + for _, p := range mockedPushers { + counts = append(counts, p.count) + } + mean := calcMean(counts) + variance := calcVariance(mean, counts) + assert.True(t, math.Abs(mean-1000*(1-failProba)) < 10) + assert.True(t, variance < 100, fmt.Sprintf("too big variance - %.2f", variance)) +} diff --git a/core/queue/producer.go b/core/queue/producer.go new file mode 100644 index 00000000..c0ca935d --- /dev/null +++ b/core/queue/producer.go @@ -0,0 +1,15 @@ +package queue + +type ( + Producer interface { + AddListener(listener ProduceListener) + Produce() (string, bool) + } + + ProduceListener interface { + OnProducerPause() + OnProducerResume() + } + + ProducerFactory func() (Producer, error) +) diff --git a/core/queue/queue.go b/core/queue/queue.go new file mode 100644 index 00000000..ec917bec --- /dev/null +++ b/core/queue/queue.go @@ -0,0 +1,239 @@ +package queue + +import ( + "runtime" + "sync" + "sync/atomic" + "time" + + "github.com/tal-tech/go-zero/core/logx" + "github.com/tal-tech/go-zero/core/rescue" + "github.com/tal-tech/go-zero/core/stat" + "github.com/tal-tech/go-zero/core/threading" + "github.com/tal-tech/go-zero/core/timex" +) + +const queueName = "queue" + +type ( + Queue struct { + name string + metrics *stat.Metrics + producerFactory ProducerFactory + producerRoutineGroup *threading.RoutineGroup + consumerFactory ConsumerFactory + consumerRoutineGroup *threading.RoutineGroup + producerCount int + consumerCount int + active int32 + channel chan string + quit chan struct{} + listeners []Listener + eventLock sync.Mutex + eventChannels []chan interface{} + } + + Listener interface { + OnPause() + OnResume() + } + + Poller interface { + Name() string + Poll() string + } + + Pusher interface { + Name() string + Push(string) error + } +) + +func NewQueue(producerFactory ProducerFactory, consumerFactory ConsumerFactory) *Queue { + queue := &Queue{ + metrics: stat.NewMetrics(queueName), + producerFactory: producerFactory, + producerRoutineGroup: threading.NewRoutineGroup(), + consumerFactory: consumerFactory, + consumerRoutineGroup: threading.NewRoutineGroup(), + producerCount: runtime.NumCPU(), + consumerCount: runtime.NumCPU() << 1, + channel: make(chan string), + quit: make(chan struct{}), + } + queue.SetName(queueName) + + return queue +} + +func (queue *Queue) AddListener(listener Listener) { + queue.listeners = append(queue.listeners, listener) +} + +func (queue *Queue) Broadcast(message interface{}) { + go func() { + queue.eventLock.Lock() + defer queue.eventLock.Unlock() + + for _, channel := range queue.eventChannels { + channel <- message + } + }() +} + +func (queue *Queue) SetName(name string) { + queue.name = name + queue.metrics.SetName(name) +} + +func (queue *Queue) SetNumConsumer(count int) { + queue.consumerCount = count +} + +func (queue *Queue) SetNumProducer(count int) { + queue.producerCount = count +} + +func (queue *Queue) Start() { + queue.startProducers(queue.producerCount) + queue.startConsumers(queue.consumerCount) + + queue.producerRoutineGroup.Wait() + close(queue.channel) + queue.consumerRoutineGroup.Wait() +} + +func (queue *Queue) Stop() { + close(queue.quit) +} + +func (queue *Queue) consume(eventChan chan interface{}) { + var consumer Consumer + + for { + var err error + if consumer, err = queue.consumerFactory(); err != nil { + logx.Errorf("Error on creating consumer: %v", err) + time.Sleep(time.Second) + } else { + break + } + } + + for { + select { + case message, ok := <-queue.channel: + if ok { + queue.consumeOne(consumer, message) + } else { + logx.Info("Task channel was closed, quitting consumer...") + return + } + case event := <-eventChan: + consumer.OnEvent(event) + } + } +} + +func (queue *Queue) consumeOne(consumer Consumer, message string) { + threading.RunSafe(func() { + startTime := timex.Now() + defer func() { + duration := timex.Since(startTime) + queue.metrics.Add(stat.Task{ + Duration: duration, + }) + logx.WithDuration(duration).Infof("%s", message) + }() + + if err := consumer.Consume(message); err != nil { + logx.Errorf("Error occurred while consuming %v: %v", message, err) + } + }) +} + +func (queue *Queue) pause() { + for _, listener := range queue.listeners { + listener.OnPause() + } +} + +func (queue *Queue) produce() { + var producer Producer + + for { + var err error + if producer, err = queue.producerFactory(); err != nil { + logx.Errorf("Error on creating producer: %v", err) + time.Sleep(time.Second) + } else { + break + } + } + + atomic.AddInt32(&queue.active, 1) + producer.AddListener(routineListener{ + queue: queue, + }) + + for { + select { + case <-queue.quit: + logx.Info("Quitting producer") + return + default: + if v, ok := queue.produceOne(producer); ok { + queue.channel <- v + } + } + } +} + +func (queue *Queue) produceOne(producer Producer) (string, bool) { + // avoid panic quit the producer, just log it and continue + defer rescue.Recover() + + return producer.Produce() +} + +func (queue *Queue) resume() { + for _, listener := range queue.listeners { + listener.OnResume() + } +} + +func (queue *Queue) startConsumers(number int) { + for i := 0; i < number; i++ { + eventChan := make(chan interface{}) + queue.eventLock.Lock() + queue.eventChannels = append(queue.eventChannels, eventChan) + queue.eventLock.Unlock() + queue.consumerRoutineGroup.Run(func() { + queue.consume(eventChan) + }) + } +} + +func (queue *Queue) startProducers(number int) { + for i := 0; i < number; i++ { + queue.producerRoutineGroup.Run(func() { + queue.produce() + }) + } +} + +type routineListener struct { + queue *Queue +} + +func (rl routineListener) OnProducerPause() { + if atomic.AddInt32(&rl.queue.active, -1) <= 0 { + rl.queue.pause() + } +} + +func (rl routineListener) OnProducerResume() { + if atomic.AddInt32(&rl.queue.active, 1) == 1 { + rl.queue.resume() + } +} diff --git a/core/queue/queue_test.go b/core/queue/queue_test.go new file mode 100644 index 00000000..32279094 --- /dev/null +++ b/core/queue/queue_test.go @@ -0,0 +1,94 @@ +package queue + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +const ( + consumers = 4 + rounds = 100 +) + +func TestQueue(t *testing.T) { + producer := newMockedProducer(rounds) + consumer := newMockedConsumer() + consumer.wait.Add(consumers) + q := NewQueue(func() (Producer, error) { + return producer, nil + }, func() (Consumer, error) { + return consumer, nil + }) + q.AddListener(new(mockedListener)) + q.SetName("mockqueue") + q.SetNumConsumer(consumers) + q.SetNumProducer(1) + q.pause() + q.resume() + go func() { + producer.wait.Wait() + q.Stop() + }() + q.Start() + assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count)) +} + +type mockedConsumer struct { + count int32 + events int32 + wait sync.WaitGroup +} + +func newMockedConsumer() *mockedConsumer { + return new(mockedConsumer) +} + +func (c *mockedConsumer) Consume(string) error { + atomic.AddInt32(&c.count, 1) + return nil +} + +func (c *mockedConsumer) OnEvent(interface{}) { + if atomic.AddInt32(&c.events, 1) <= consumers { + c.wait.Done() + } +} + +type mockedProducer struct { + total int32 + count int32 + wait sync.WaitGroup +} + +func newMockedProducer(total int32) *mockedProducer { + p := new(mockedProducer) + p.total = total + p.wait.Add(int(total)) + return p +} + +func (p *mockedProducer) AddListener(listener ProduceListener) { +} + +func (p *mockedProducer) Produce() (string, bool) { + if atomic.AddInt32(&p.count, 1) <= p.total { + p.wait.Done() + return "item", true + } else { + time.Sleep(time.Second) + return "", false + } +} + +type mockedListener struct { +} + +func (l *mockedListener) OnPause() { +} + +func (l *mockedListener) OnResume() { +} diff --git a/core/queue/util.go b/core/queue/util.go new file mode 100644 index 00000000..126680cc --- /dev/null +++ b/core/queue/util.go @@ -0,0 +1,12 @@ +package queue + +import "strings" + +func generateName(pushers []Pusher) string { + names := make([]string, len(pushers)) + for i, pusher := range pushers { + names[i] = pusher.Name() + } + + return strings.Join(names, ",") +} diff --git a/core/queue/util_test.go b/core/queue/util_test.go new file mode 100644 index 00000000..44846848 --- /dev/null +++ b/core/queue/util_test.go @@ -0,0 +1,77 @@ +package queue + +import ( + "errors" + "math" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tal-tech/go-zero/core/logx" + "github.com/tal-tech/go-zero/core/mathx" +) + +var ( + proba = mathx.NewProba() + failProba = 0.01 +) + +func init() { + logx.Disable() +} + +func TestGenerateName(t *testing.T) { + pushers := []Pusher{ + &mockedPusher{name: "first"}, + &mockedPusher{name: "second"}, + &mockedPusher{name: "third"}, + } + + assert.Equal(t, "first,second,third", generateName(pushers)) +} + +func TestGenerateNameNil(t *testing.T) { + var pushers []Pusher + assert.Equal(t, "", generateName(pushers)) +} + +func calcMean(vals []int) float64 { + if len(vals) == 0 { + return 0 + } + + var result float64 + for _, val := range vals { + result += float64(val) + } + return result / float64(len(vals)) +} + +func calcVariance(mean float64, vals []int) float64 { + if len(vals) == 0 { + return 0 + } + + var result float64 + for _, val := range vals { + result += math.Pow(float64(val)-mean, 2) + } + return result / float64(len(vals)) +} + +type mockedPusher struct { + name string + count int +} + +func (p *mockedPusher) Name() string { + return p.name +} + +func (p *mockedPusher) Push(s string) error { + if proba.TrueOnProba(failProba) { + return errors.New("dummy") + } + + p.count++ + return nil +}