From 6d9602fa35faa08ee79ca81c137e41e80060d432 Mon Sep 17 00:00:00 2001 From: kevin Date: Thu, 6 Aug 2020 13:55:09 +0800 Subject: [PATCH] remove packages --- core/queue/balancedqueuepusher.go | 44 --- core/queue/balancedqueuepusher_test.go | 43 --- core/queue/consumer.go | 10 - core/queue/messagequeue.go | 6 - core/queue/multiqueuepusher.go | 31 -- core/queue/multiqueuepusher_test.go | 39 -- core/queue/producer.go | 15 - core/queue/queue.go | 239 ------------ core/queue/queue_test.go | 94 ----- core/queue/util.go | 12 - core/queue/util_test.go | 78 ---- dq/config.go | 15 - dq/connection.go | 65 ---- dq/consumer.go | 100 ----- dq/consumernode.go | 95 ----- dq/producer.go | 156 -------- dq/producernode.go | 98 ----- dq/vars.go | 15 - example/beanstalk/consumer/consumer.go | 42 --- example/beanstalk/producer/producer.go | 40 -- example/jobqueue/jobqueue.go | 11 - example/kmq/consumer/config.json | 12 - example/kmq/consumer/queue.go | 20 - example/kmq/producer/produce.go | 51 --- example/queue/poll/poller.go | 86 ----- example/queue/push/pusher.go | 31 -- example/redis/cluster.go | 62 ---- example/sqlc/user.go | 142 -------- go.mod | 5 - go.sum | 18 - kq/config.go | 21 -- kq/pusher.go | 101 ----- kq/queue.go | 229 ------------ rq/config.go | 18 - rq/etc/config.json | 19 - rq/internal/conf.go | 19 - rq/internal/const.go | 7 - rq/internal/hashchange.go | 39 -- rq/internal/message.go | 6 - rq/internal/redisqueue_test.go | 82 ----- rq/internal/redisqueueproducer.go | 166 --------- rq/internal/redisqueuepusher.go | 78 ---- rq/internal/update/incrementalupdater.go | 179 --------- rq/internal/update/serverchange.go | 106 ------ rq/pusher.go | 445 ----------------------- rq/queue.go | 338 ----------------- rq/queue_test.go | 62 ---- 47 files changed, 3590 deletions(-) delete mode 100644 core/queue/balancedqueuepusher.go delete mode 100644 core/queue/balancedqueuepusher_test.go delete mode 100644 core/queue/consumer.go delete mode 100644 core/queue/messagequeue.go delete mode 100644 core/queue/multiqueuepusher.go delete mode 100644 core/queue/multiqueuepusher_test.go delete mode 100644 core/queue/producer.go delete mode 100644 core/queue/queue.go delete mode 100644 core/queue/queue_test.go delete mode 100644 core/queue/util.go delete mode 100644 core/queue/util_test.go delete mode 100644 dq/config.go delete mode 100644 dq/connection.go delete mode 100644 dq/consumer.go delete mode 100644 dq/consumernode.go delete mode 100644 dq/producer.go delete mode 100644 dq/producernode.go delete mode 100644 dq/vars.go delete mode 100644 example/beanstalk/consumer/consumer.go delete mode 100644 example/beanstalk/producer/producer.go delete mode 100644 example/jobqueue/jobqueue.go delete mode 100644 example/kmq/consumer/config.json delete mode 100644 example/kmq/consumer/queue.go delete mode 100644 example/kmq/producer/produce.go delete mode 100644 example/queue/poll/poller.go delete mode 100644 example/queue/push/pusher.go delete mode 100644 example/redis/cluster.go delete mode 100644 example/sqlc/user.go delete mode 100644 kq/config.go delete mode 100644 kq/pusher.go delete mode 100644 kq/queue.go delete mode 100644 rq/config.go delete mode 100644 rq/etc/config.json delete mode 100644 rq/internal/conf.go delete mode 100644 rq/internal/const.go delete mode 100644 rq/internal/hashchange.go delete mode 100644 rq/internal/message.go delete mode 100644 rq/internal/redisqueue_test.go delete mode 100644 rq/internal/redisqueueproducer.go delete mode 100644 rq/internal/redisqueuepusher.go delete mode 100644 rq/internal/update/incrementalupdater.go delete mode 100644 rq/internal/update/serverchange.go delete mode 100644 rq/pusher.go delete mode 100644 rq/queue.go delete mode 100644 rq/queue_test.go diff --git a/core/queue/balancedqueuepusher.go b/core/queue/balancedqueuepusher.go deleted file mode 100644 index 0c53c9e9..00000000 --- a/core/queue/balancedqueuepusher.go +++ /dev/null @@ -1,44 +0,0 @@ -package queue - -import ( - "errors" - "sync/atomic" - - "zero/core/logx" -) - -var ErrNoAvailablePusher = errors.New("no available pusher") - -type BalancedQueuePusher struct { - name string - pushers []QueuePusher - index uint64 -} - -func NewBalancedQueuePusher(pushers []QueuePusher) QueuePusher { - return &BalancedQueuePusher{ - name: generateName(pushers), - pushers: pushers, - } -} - -func (pusher *BalancedQueuePusher) Name() string { - return pusher.name -} - -func (pusher *BalancedQueuePusher) Push(message string) error { - size := len(pusher.pushers) - - for i := 0; i < size; i++ { - index := atomic.AddUint64(&pusher.index, 1) % uint64(size) - target := pusher.pushers[index] - - if err := target.Push(message); err != nil { - logx.Error(err) - } else { - return nil - } - } - - return ErrNoAvailablePusher -} diff --git a/core/queue/balancedqueuepusher_test.go b/core/queue/balancedqueuepusher_test.go deleted file mode 100644 index 89aaa3bb..00000000 --- a/core/queue/balancedqueuepusher_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package queue - -import ( - "fmt" - "strconv" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestBalancedQueuePusher(t *testing.T) { - const numPushers = 100 - var pushers []QueuePusher - var mockedPushers []*mockedPusher - for i := 0; i < numPushers; i++ { - p := &mockedPusher{ - name: "pusher:" + strconv.Itoa(i), - } - pushers = append(pushers, p) - mockedPushers = append(mockedPushers, p) - } - - pusher := NewBalancedQueuePusher(pushers) - assert.True(t, len(pusher.Name()) > 0) - - for i := 0; i < numPushers*1000; i++ { - assert.Nil(t, pusher.Push("item")) - } - - var counts []int - for _, p := range mockedPushers { - counts = append(counts, p.count) - } - mean := calcMean(counts) - variance := calcVariance(mean, counts) - assert.True(t, variance < 100, fmt.Sprintf("too big variance - %.2f", variance)) -} - -func TestBalancedQueuePusher_NoAvailable(t *testing.T) { - pusher := NewBalancedQueuePusher(nil) - assert.True(t, len(pusher.Name()) == 0) - assert.Equal(t, ErrNoAvailablePusher, pusher.Push("item")) -} diff --git a/core/queue/consumer.go b/core/queue/consumer.go deleted file mode 100644 index 8f12d97f..00000000 --- a/core/queue/consumer.go +++ /dev/null @@ -1,10 +0,0 @@ -package queue - -type ( - Consumer interface { - Consume(string) error - OnEvent(event interface{}) - } - - ConsumerFactory func() (Consumer, error) -) diff --git a/core/queue/messagequeue.go b/core/queue/messagequeue.go deleted file mode 100644 index 569ae566..00000000 --- a/core/queue/messagequeue.go +++ /dev/null @@ -1,6 +0,0 @@ -package queue - -type MessageQueue interface { - Start() - Stop() -} diff --git a/core/queue/multiqueuepusher.go b/core/queue/multiqueuepusher.go deleted file mode 100644 index f57b98af..00000000 --- a/core/queue/multiqueuepusher.go +++ /dev/null @@ -1,31 +0,0 @@ -package queue - -import "zero/core/errorx" - -type MultiQueuePusher struct { - name string - pushers []QueuePusher -} - -func NewMultiQueuePusher(pushers []QueuePusher) QueuePusher { - return &MultiQueuePusher{ - name: generateName(pushers), - pushers: pushers, - } -} - -func (pusher *MultiQueuePusher) Name() string { - return pusher.name -} - -func (pusher *MultiQueuePusher) Push(message string) error { - var batchError errorx.BatchError - - for _, each := range pusher.pushers { - if err := each.Push(message); err != nil { - batchError.Add(err) - } - } - - return batchError.Err() -} diff --git a/core/queue/multiqueuepusher_test.go b/core/queue/multiqueuepusher_test.go deleted file mode 100644 index 8af5200c..00000000 --- a/core/queue/multiqueuepusher_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package queue - -import ( - "fmt" - "math" - "strconv" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestMultiQueuePusher(t *testing.T) { - const numPushers = 100 - var pushers []QueuePusher - var mockedPushers []*mockedPusher - for i := 0; i < numPushers; i++ { - p := &mockedPusher{ - name: "pusher:" + strconv.Itoa(i), - } - pushers = append(pushers, p) - mockedPushers = append(mockedPushers, p) - } - - pusher := NewMultiQueuePusher(pushers) - assert.True(t, len(pusher.Name()) > 0) - - for i := 0; i < 1000; i++ { - _ = pusher.Push("item") - } - - var counts []int - for _, p := range mockedPushers { - counts = append(counts, p.count) - } - mean := calcMean(counts) - variance := calcVariance(mean, counts) - assert.True(t, math.Abs(mean-1000*(1-failProba)) < 10) - assert.True(t, variance < 100, fmt.Sprintf("too big variance - %.2f", variance)) -} diff --git a/core/queue/producer.go b/core/queue/producer.go deleted file mode 100644 index c0ca935d..00000000 --- a/core/queue/producer.go +++ /dev/null @@ -1,15 +0,0 @@ -package queue - -type ( - Producer interface { - AddListener(listener ProduceListener) - Produce() (string, bool) - } - - ProduceListener interface { - OnProducerPause() - OnProducerResume() - } - - ProducerFactory func() (Producer, error) -) diff --git a/core/queue/queue.go b/core/queue/queue.go deleted file mode 100644 index 40905716..00000000 --- a/core/queue/queue.go +++ /dev/null @@ -1,239 +0,0 @@ -package queue - -import ( - "runtime" - "sync" - "sync/atomic" - "time" - - "zero/core/logx" - "zero/core/rescue" - "zero/core/stat" - "zero/core/threading" - "zero/core/timex" -) - -const queueName = "queue" - -type ( - Queue struct { - name string - metrics *stat.Metrics - producerFactory ProducerFactory - producerRoutineGroup *threading.RoutineGroup - consumerFactory ConsumerFactory - consumerRoutineGroup *threading.RoutineGroup - producerCount int - consumerCount int - active int32 - channel chan string - quit chan struct{} - listeners []QueueListener - eventLock sync.Mutex - eventChannels []chan interface{} - } - - QueueListener interface { - OnPause() - OnResume() - } - - QueuePoller interface { - Name() string - Poll() string - } - - QueuePusher interface { - Name() string - Push(string) error - } -) - -func NewQueue(producerFactory ProducerFactory, consumerFactory ConsumerFactory) *Queue { - queue := &Queue{ - metrics: stat.NewMetrics(queueName), - producerFactory: producerFactory, - producerRoutineGroup: threading.NewRoutineGroup(), - consumerFactory: consumerFactory, - consumerRoutineGroup: threading.NewRoutineGroup(), - producerCount: runtime.NumCPU(), - consumerCount: runtime.NumCPU() << 1, - channel: make(chan string), - quit: make(chan struct{}), - } - queue.SetName(queueName) - - return queue -} - -func (queue *Queue) AddListener(listener QueueListener) { - queue.listeners = append(queue.listeners, listener) -} - -func (queue *Queue) Broadcast(message interface{}) { - go func() { - queue.eventLock.Lock() - defer queue.eventLock.Unlock() - - for _, channel := range queue.eventChannels { - channel <- message - } - }() -} - -func (queue *Queue) SetName(name string) { - queue.name = name - queue.metrics.SetName(name) -} - -func (queue *Queue) SetNumConsumer(count int) { - queue.consumerCount = count -} - -func (queue *Queue) SetNumProducer(count int) { - queue.producerCount = count -} - -func (queue *Queue) Start() { - queue.startProducers(queue.producerCount) - queue.startConsumers(queue.consumerCount) - - queue.producerRoutineGroup.Wait() - close(queue.channel) - queue.consumerRoutineGroup.Wait() -} - -func (queue *Queue) Stop() { - close(queue.quit) -} - -func (queue *Queue) consume(eventChan chan interface{}) { - var consumer Consumer - - for { - var err error - if consumer, err = queue.consumerFactory(); err != nil { - logx.Errorf("Error on creating consumer: %v", err) - time.Sleep(time.Second) - } else { - break - } - } - - for { - select { - case message, ok := <-queue.channel: - if ok { - queue.consumeOne(consumer, message) - } else { - logx.Info("Task channel was closed, quitting consumer...") - return - } - case event := <-eventChan: - consumer.OnEvent(event) - } - } -} - -func (queue *Queue) consumeOne(consumer Consumer, message string) { - threading.RunSafe(func() { - startTime := timex.Now() - defer func() { - duration := timex.Since(startTime) - queue.metrics.Add(stat.Task{ - Duration: duration, - }) - logx.WithDuration(duration).Infof("%s", message) - }() - - if err := consumer.Consume(message); err != nil { - logx.Errorf("Error occurred while consuming %v: %v", message, err) - } - }) -} - -func (queue *Queue) pause() { - for _, listener := range queue.listeners { - listener.OnPause() - } -} - -func (queue *Queue) produce() { - var producer Producer - - for { - var err error - if producer, err = queue.producerFactory(); err != nil { - logx.Errorf("Error on creating producer: %v", err) - time.Sleep(time.Second) - } else { - break - } - } - - atomic.AddInt32(&queue.active, 1) - producer.AddListener(routineListener{ - queue: queue, - }) - - for { - select { - case <-queue.quit: - logx.Info("Quitting producer") - return - default: - if v, ok := queue.produceOne(producer); ok { - queue.channel <- v - } - } - } -} - -func (queue *Queue) produceOne(producer Producer) (string, bool) { - // avoid panic quit the producer, just log it and continue - defer rescue.Recover() - - return producer.Produce() -} - -func (queue *Queue) resume() { - for _, listener := range queue.listeners { - listener.OnResume() - } -} - -func (queue *Queue) startConsumers(number int) { - for i := 0; i < number; i++ { - eventChan := make(chan interface{}) - queue.eventLock.Lock() - queue.eventChannels = append(queue.eventChannels, eventChan) - queue.eventLock.Unlock() - queue.consumerRoutineGroup.Run(func() { - queue.consume(eventChan) - }) - } -} - -func (queue *Queue) startProducers(number int) { - for i := 0; i < number; i++ { - queue.producerRoutineGroup.Run(func() { - queue.produce() - }) - } -} - -type routineListener struct { - queue *Queue -} - -func (rl routineListener) OnProducerPause() { - if atomic.AddInt32(&rl.queue.active, -1) <= 0 { - rl.queue.pause() - } -} - -func (rl routineListener) OnProducerResume() { - if atomic.AddInt32(&rl.queue.active, 1) == 1 { - rl.queue.resume() - } -} diff --git a/core/queue/queue_test.go b/core/queue/queue_test.go deleted file mode 100644 index 32279094..00000000 --- a/core/queue/queue_test.go +++ /dev/null @@ -1,94 +0,0 @@ -package queue - -import ( - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -const ( - consumers = 4 - rounds = 100 -) - -func TestQueue(t *testing.T) { - producer := newMockedProducer(rounds) - consumer := newMockedConsumer() - consumer.wait.Add(consumers) - q := NewQueue(func() (Producer, error) { - return producer, nil - }, func() (Consumer, error) { - return consumer, nil - }) - q.AddListener(new(mockedListener)) - q.SetName("mockqueue") - q.SetNumConsumer(consumers) - q.SetNumProducer(1) - q.pause() - q.resume() - go func() { - producer.wait.Wait() - q.Stop() - }() - q.Start() - assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count)) -} - -type mockedConsumer struct { - count int32 - events int32 - wait sync.WaitGroup -} - -func newMockedConsumer() *mockedConsumer { - return new(mockedConsumer) -} - -func (c *mockedConsumer) Consume(string) error { - atomic.AddInt32(&c.count, 1) - return nil -} - -func (c *mockedConsumer) OnEvent(interface{}) { - if atomic.AddInt32(&c.events, 1) <= consumers { - c.wait.Done() - } -} - -type mockedProducer struct { - total int32 - count int32 - wait sync.WaitGroup -} - -func newMockedProducer(total int32) *mockedProducer { - p := new(mockedProducer) - p.total = total - p.wait.Add(int(total)) - return p -} - -func (p *mockedProducer) AddListener(listener ProduceListener) { -} - -func (p *mockedProducer) Produce() (string, bool) { - if atomic.AddInt32(&p.count, 1) <= p.total { - p.wait.Done() - return "item", true - } else { - time.Sleep(time.Second) - return "", false - } -} - -type mockedListener struct { -} - -func (l *mockedListener) OnPause() { -} - -func (l *mockedListener) OnResume() { -} diff --git a/core/queue/util.go b/core/queue/util.go deleted file mode 100644 index f4377f12..00000000 --- a/core/queue/util.go +++ /dev/null @@ -1,12 +0,0 @@ -package queue - -import "strings" - -func generateName(pushers []QueuePusher) string { - names := make([]string, len(pushers)) - for i, pusher := range pushers { - names[i] = pusher.Name() - } - - return strings.Join(names, ",") -} diff --git a/core/queue/util_test.go b/core/queue/util_test.go deleted file mode 100644 index 6537866e..00000000 --- a/core/queue/util_test.go +++ /dev/null @@ -1,78 +0,0 @@ -package queue - -import ( - "errors" - "math" - "testing" - - "zero/core/logx" - "zero/core/mathx" - - "github.com/stretchr/testify/assert" -) - -var ( - proba = mathx.NewProba() - failProba = 0.01 -) - -func init() { - logx.Disable() -} - -func TestGenerateName(t *testing.T) { - pushers := []QueuePusher{ - &mockedPusher{name: "first"}, - &mockedPusher{name: "second"}, - &mockedPusher{name: "third"}, - } - - assert.Equal(t, "first,second,third", generateName(pushers)) -} - -func TestGenerateNameNil(t *testing.T) { - var pushers []QueuePusher - assert.Equal(t, "", generateName(pushers)) -} - -func calcMean(vals []int) float64 { - if len(vals) == 0 { - return 0 - } - - var result float64 - for _, val := range vals { - result += float64(val) - } - return result / float64(len(vals)) -} - -func calcVariance(mean float64, vals []int) float64 { - if len(vals) == 0 { - return 0 - } - - var result float64 - for _, val := range vals { - result += math.Pow(float64(val)-mean, 2) - } - return result / float64(len(vals)) -} - -type mockedPusher struct { - name string - count int -} - -func (p *mockedPusher) Name() string { - return p.name -} - -func (p *mockedPusher) Push(s string) error { - if proba.TrueOnProba(failProba) { - return errors.New("dummy") - } - - p.count++ - return nil -} diff --git a/dq/config.go b/dq/config.go deleted file mode 100644 index 2eabbd83..00000000 --- a/dq/config.go +++ /dev/null @@ -1,15 +0,0 @@ -package dq - -import "zero/core/stores/redis" - -type ( - Beanstalk struct { - Endpoint string - Tube string - } - - DqConf struct { - Beanstalks []Beanstalk - Redis redis.RedisConf - } -) diff --git a/dq/connection.go b/dq/connection.go deleted file mode 100644 index 638bd587..00000000 --- a/dq/connection.go +++ /dev/null @@ -1,65 +0,0 @@ -package dq - -import ( - "sync" - - "github.com/beanstalkd/go-beanstalk" -) - -type connection struct { - lock sync.RWMutex - endpoint string - tube string - conn *beanstalk.Conn -} - -func newConnection(endpint, tube string) *connection { - return &connection{ - endpoint: endpint, - tube: tube, - } -} - -func (c *connection) Close() error { - c.lock.Lock() - conn := c.conn - c.conn = nil - defer c.lock.Unlock() - - if conn != nil { - return conn.Close() - } - - return nil -} - -func (c *connection) get() (*beanstalk.Conn, error) { - c.lock.RLock() - conn := c.conn - c.lock.RUnlock() - if conn != nil { - return conn, nil - } - - c.lock.Lock() - defer c.lock.Unlock() - - var err error - c.conn, err = beanstalk.Dial("tcp", c.endpoint) - if err != nil { - return nil, err - } - - c.conn.Tube.Name = c.tube - return c.conn, err -} - -func (c *connection) reset() { - c.lock.Lock() - defer c.lock.Unlock() - - if c.conn != nil { - c.conn.Close() - c.conn = nil - } -} diff --git a/dq/consumer.go b/dq/consumer.go deleted file mode 100644 index 861a3916..00000000 --- a/dq/consumer.go +++ /dev/null @@ -1,100 +0,0 @@ -package dq - -import ( - "strconv" - "time" - - "zero/core/hash" - "zero/core/logx" - "zero/core/service" - "zero/core/stores/redis" -) - -const ( - expiration = 3600 // seconds - guardValue = "1" - tolerance = time.Minute * 30 -) - -var maxCheckBytes = getMaxTimeLen() - -type ( - Consume func(body []byte) - - Consumer interface { - Consume(consume Consume) - } - - consumerCluster struct { - nodes []*consumerNode - red *redis.Redis - } -) - -func NewConsumer(c DqConf) Consumer { - var nodes []*consumerNode - for _, node := range c.Beanstalks { - nodes = append(nodes, newConsumerNode(node.Endpoint, node.Tube)) - } - return &consumerCluster{ - nodes: nodes, - red: c.Redis.NewRedis(), - } -} - -func (c *consumerCluster) Consume(consume Consume) { - guardedConsume := func(body []byte) { - key := hash.Md5Hex(body) - body, ok := c.unwrap(body) - if !ok { - logx.Errorf("discarded: %q", string(body)) - return - } - - ok, err := c.red.SetnxEx(key, guardValue, expiration) - if err != nil { - logx.Error(err) - } else if ok { - consume(body) - } - } - - group := service.NewServiceGroup() - for _, node := range c.nodes { - group.Add(consumeService{ - c: node, - consume: guardedConsume, - }) - } - group.Start() -} - -func (c *consumerCluster) unwrap(body []byte) ([]byte, bool) { - var pos = -1 - for i := 0; i < maxCheckBytes; i++ { - if body[i] == timeSep { - pos = i - break - } - } - if pos < 0 { - return nil, false - } - - val, err := strconv.ParseInt(string(body[:pos]), 10, 64) - if err != nil { - logx.Error(err) - return nil, false - } - - t := time.Unix(0, val) - if t.Add(tolerance).Before(time.Now()) { - return nil, false - } - - return body[pos+1:], true -} - -func getMaxTimeLen() int { - return len(strconv.FormatInt(time.Now().UnixNano(), 10)) + 2 -} diff --git a/dq/consumernode.go b/dq/consumernode.go deleted file mode 100644 index faeb9b37..00000000 --- a/dq/consumernode.go +++ /dev/null @@ -1,95 +0,0 @@ -package dq - -import ( - "time" - - "zero/core/logx" - "zero/core/syncx" - - "github.com/beanstalkd/go-beanstalk" -) - -type ( - consumerNode struct { - conn *connection - tube string - on *syncx.AtomicBool - } - - consumeService struct { - c *consumerNode - consume Consume - } -) - -func newConsumerNode(endpoint, tube string) *consumerNode { - return &consumerNode{ - conn: newConnection(endpoint, tube), - tube: tube, - on: syncx.ForAtomicBool(true), - } -} - -func (c *consumerNode) dispose() { - c.on.Set(false) -} - -func (c *consumerNode) consumeEvents(consume Consume) { - for c.on.True() { - conn, err := c.conn.get() - if err != nil { - logx.Error(err) - time.Sleep(time.Second) - continue - } - - // because getting conn takes at most one second, reserve tasks at most 5 seconds, - // if don't check on/off here, the conn might not be closed due to - // graceful shutdon waits at most 5.5 seconds. - if !c.on.True() { - break - } - - conn.Tube.Name = c.tube - conn.TubeSet.Name[c.tube] = true - id, body, err := conn.Reserve(reserveTimeout) - if err == nil { - conn.Delete(id) - consume(body) - continue - } - - // the error can only be beanstalk.NameError or beanstalk.ConnError - switch cerr := err.(type) { - case beanstalk.ConnError: - switch cerr.Err { - case beanstalk.ErrTimeout: - // timeout error on timeout, just continue the loop - case beanstalk.ErrBadChar, beanstalk.ErrBadFormat, beanstalk.ErrBuried, beanstalk.ErrDeadline, - beanstalk.ErrDraining, beanstalk.ErrEmpty, beanstalk.ErrInternal, beanstalk.ErrJobTooBig, - beanstalk.ErrNoCRLF, beanstalk.ErrNotFound, beanstalk.ErrNotIgnored, beanstalk.ErrTooLong: - // won't reset - logx.Error(err) - default: - // beanstalk.ErrOOM, beanstalk.ErrUnknown and other errors - logx.Error(err) - c.conn.reset() - time.Sleep(time.Second) - } - default: - logx.Error(err) - } - } - - if err := c.conn.Close(); err != nil { - logx.Error(err) - } -} - -func (cs consumeService) Start() { - cs.c.consumeEvents(cs.consume) -} - -func (cs consumeService) Stop() { - cs.c.dispose() -} diff --git a/dq/producer.go b/dq/producer.go deleted file mode 100644 index 3b1efd23..00000000 --- a/dq/producer.go +++ /dev/null @@ -1,156 +0,0 @@ -package dq - -import ( - "bytes" - "log" - "math/rand" - "strconv" - "strings" - "time" - - "zero/core/errorx" - "zero/core/fx" - "zero/core/logx" -) - -const ( - replicaNodes = 3 - minWrittenNodes = 2 -) - -type ( - Producer interface { - At(body []byte, at time.Time) (string, error) - Close() error - Delay(body []byte, delay time.Duration) (string, error) - Revoke(ids string) error - } - - producerCluster struct { - nodes []Producer - } -) - -func init() { - rand.Seed(time.Now().UnixNano()) -} - -func NewProducer(beanstalks []Beanstalk) Producer { - if len(beanstalks) < minWrittenNodes { - log.Fatalf("nodes must be equal or greater than %d", minWrittenNodes) - } - - var nodes []Producer - for _, node := range beanstalks { - nodes = append(nodes, NewProducerNode(node.Endpoint, node.Tube)) - } - return &producerCluster{nodes: nodes} -} - -func (p *producerCluster) At(body []byte, at time.Time) (string, error) { - return p.insert(func(node Producer) (string, error) { - return node.At(p.wrap(body, at), at) - }) -} - -func (p *producerCluster) Close() error { - var be errorx.BatchError - for _, node := range p.nodes { - if err := node.Close(); err != nil { - be.Add(err) - } - } - return be.Err() -} - -func (p *producerCluster) Delay(body []byte, delay time.Duration) (string, error) { - return p.insert(func(node Producer) (string, error) { - return node.Delay(p.wrap(body, time.Now().Add(delay)), delay) - }) -} - -func (p *producerCluster) Revoke(ids string) error { - var be errorx.BatchError - - fx.From(func(source chan<- interface{}) { - for _, node := range p.nodes { - source <- node - } - }).Map(func(item interface{}) interface{} { - node := item.(Producer) - return node.Revoke(ids) - }).ForEach(func(item interface{}) { - if item != nil { - be.Add(item.(error)) - } - }) - - return be.Err() -} - -func (p *producerCluster) cloneNodes() []Producer { - return append([]Producer(nil), p.nodes...) -} - -func (p *producerCluster) getWriteNodes() []Producer { - if len(p.nodes) <= replicaNodes { - return p.nodes - } - - nodes := p.cloneNodes() - rand.Shuffle(len(nodes), func(i, j int) { - nodes[i], nodes[j] = nodes[j], nodes[i] - }) - return nodes[:replicaNodes] -} - -func (p *producerCluster) insert(fn func(node Producer) (string, error)) (string, error) { - type idErr struct { - id string - err error - } - var ret []idErr - fx.From(func(source chan<- interface{}) { - for _, node := range p.getWriteNodes() { - source <- node - } - }).Map(func(item interface{}) interface{} { - node := item.(Producer) - id, err := fn(node) - return idErr{ - id: id, - err: err, - } - }).ForEach(func(item interface{}) { - ret = append(ret, item.(idErr)) - }) - - var ids []string - var be errorx.BatchError - for _, val := range ret { - if val.err != nil { - be.Add(val.err) - } else { - ids = append(ids, val.id) - } - } - - jointId := strings.Join(ids, idSep) - if len(ids) >= minWrittenNodes { - return jointId, nil - } - - if err := p.Revoke(jointId); err != nil { - logx.Error(err) - } - - return "", be.Err() -} - -func (p *producerCluster) wrap(body []byte, at time.Time) []byte { - var builder bytes.Buffer - builder.WriteString(strconv.FormatInt(at.UnixNano(), 10)) - builder.WriteByte(timeSep) - builder.Write(body) - return builder.Bytes() -} diff --git a/dq/producernode.go b/dq/producernode.go deleted file mode 100644 index 730e890e..00000000 --- a/dq/producernode.go +++ /dev/null @@ -1,98 +0,0 @@ -package dq - -import ( - "errors" - "fmt" - "strconv" - "strings" - "time" - - "github.com/beanstalkd/go-beanstalk" -) - -var ErrTimeBeforeNow = errors.New("can't schedule task to past time") - -type producerNode struct { - endpoint string - tube string - conn *connection -} - -func NewProducerNode(endpoint, tube string) Producer { - return &producerNode{ - endpoint: endpoint, - tube: tube, - conn: newConnection(endpoint, tube), - } -} - -func (p *producerNode) At(body []byte, at time.Time) (string, error) { - now := time.Now() - if at.Before(now) { - return "", ErrTimeBeforeNow - } - - duration := at.Sub(now) - return p.Delay(body, duration) -} - -func (p *producerNode) Close() error { - return p.conn.Close() -} - -func (p *producerNode) Delay(body []byte, delay time.Duration) (string, error) { - conn, err := p.conn.get() - if err != nil { - return "", err - } - - id, err := conn.Put(body, PriNormal, delay, defaultTimeToRun) - if err == nil { - return fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id), nil - } - - // the error can only be beanstalk.NameError or beanstalk.ConnError - // just return when the error is beanstalk.NameError, don't reset - switch cerr := err.(type) { - case beanstalk.ConnError: - switch cerr.Err { - case beanstalk.ErrBadChar, beanstalk.ErrBadFormat, beanstalk.ErrBuried, beanstalk.ErrDeadline, - beanstalk.ErrDraining, beanstalk.ErrEmpty, beanstalk.ErrInternal, beanstalk.ErrJobTooBig, - beanstalk.ErrNoCRLF, beanstalk.ErrNotFound, beanstalk.ErrNotIgnored, beanstalk.ErrTooLong: - // won't reset - default: - // beanstalk.ErrOOM, beanstalk.ErrTimeout, beanstalk.ErrUnknown and other errors - p.conn.reset() - } - } - - return "", err -} - -func (p *producerNode) Revoke(jointId string) error { - ids := strings.Split(jointId, idSep) - for _, id := range ids { - fields := strings.Split(id, "/") - if len(fields) < 3 { - continue - } - if fields[0] != p.endpoint || fields[1] != p.tube { - continue - } - - conn, err := p.conn.get() - if err != nil { - return err - } - - n, err := strconv.ParseUint(fields[2], 10, 64) - if err != nil { - return err - } - - return conn.Delete(n) - } - - // if not in this beanstalk, ignore - return nil -} diff --git a/dq/vars.go b/dq/vars.go deleted file mode 100644 index 667f26dd..00000000 --- a/dq/vars.go +++ /dev/null @@ -1,15 +0,0 @@ -package dq - -import "time" - -const ( - PriHigh = 1 - PriNormal = 2 - PriLow = 3 - - defaultTimeToRun = time.Second * 5 - reserveTimeout = time.Second * 5 - - idSep = "," - timeSep = '/' -) diff --git a/example/beanstalk/consumer/consumer.go b/example/beanstalk/consumer/consumer.go deleted file mode 100644 index 13321953..00000000 --- a/example/beanstalk/consumer/consumer.go +++ /dev/null @@ -1,42 +0,0 @@ -package main - -import ( - "fmt" - - "zero/core/stores/redis" - "zero/dq" -) - -func main() { - consumer := dq.NewConsumer(dq.DqConf{ - Beanstalks: []dq.Beanstalk{ - { - Endpoint: "localhost:11300", - Tube: "tube", - }, - { - Endpoint: "localhost:11301", - Tube: "tube", - }, - { - Endpoint: "localhost:11302", - Tube: "tube", - }, - { - Endpoint: "localhost:11303", - Tube: "tube", - }, - { - Endpoint: "localhost:11304", - Tube: "tube", - }, - }, - Redis: redis.RedisConf{ - Host: "localhost:6379", - Type: redis.NodeType, - }, - }) - consumer.Consume(func(body []byte) { - fmt.Println(string(body)) - }) -} diff --git a/example/beanstalk/producer/producer.go b/example/beanstalk/producer/producer.go deleted file mode 100644 index 7d614526..00000000 --- a/example/beanstalk/producer/producer.go +++ /dev/null @@ -1,40 +0,0 @@ -package main - -import ( - "fmt" - "strconv" - "time" - - "zero/dq" -) - -func main() { - producer := dq.NewProducer([]dq.Beanstalk{ - { - Endpoint: "localhost:11300", - Tube: "tube", - }, - { - Endpoint: "localhost:11301", - Tube: "tube", - }, - { - Endpoint: "localhost:11302", - Tube: "tube", - }, - { - Endpoint: "localhost:11303", - Tube: "tube", - }, - { - Endpoint: "localhost:11304", - Tube: "tube", - }, - }) - for i := 0; i < 5; i++ { - _, err := producer.At([]byte(strconv.Itoa(i)), time.Now().Add(time.Second*10)) - if err != nil { - fmt.Println(err) - } - } -} diff --git a/example/jobqueue/jobqueue.go b/example/jobqueue/jobqueue.go deleted file mode 100644 index 71b6a3d3..00000000 --- a/example/jobqueue/jobqueue.go +++ /dev/null @@ -1,11 +0,0 @@ -package main - -import "zero/core/threading" - -func main() { - q := threading.NewTaskRunner(5) - q.Schedule(func() { - panic("hello") - }) - select {} -} diff --git a/example/kmq/consumer/config.json b/example/kmq/consumer/config.json deleted file mode 100644 index e5d36c2c..00000000 --- a/example/kmq/consumer/config.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "Name": "kmq", - "Brokers": [ - "172.16.56.64:19092", - "172.16.56.65:19092", - "172.16.56.66:19092" - ], - "Group": "adhoc", - "Topic": "kevin", - "Offset": "first", - "NumProducers": 1 -} \ No newline at end of file diff --git a/example/kmq/consumer/queue.go b/example/kmq/consumer/queue.go deleted file mode 100644 index fed25418..00000000 --- a/example/kmq/consumer/queue.go +++ /dev/null @@ -1,20 +0,0 @@ -package main - -import ( - "fmt" - - "zero/core/conf" - "zero/kq" -) - -func main() { - var c kq.KqConf - conf.MustLoad("config.json", &c) - - q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error { - fmt.Printf("=> %s\n", v) - return nil - })) - defer q.Stop() - q.Start() -} diff --git a/example/kmq/producer/produce.go b/example/kmq/producer/produce.go deleted file mode 100644 index 006883b2..00000000 --- a/example/kmq/producer/produce.go +++ /dev/null @@ -1,51 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "log" - "math/rand" - "strconv" - "time" - - "zero/core/cmdline" - "zero/kq" -) - -type message struct { - Key string `json:"key"` - Value string `json:"value"` - Payload string `json:"message"` -} - -func main() { - pusher := kq.NewPusher([]string{ - "172.16.56.64:19092", - "172.16.56.65:19092", - "172.16.56.66:19092", - }, "kevin") - - ticker := time.NewTicker(time.Millisecond) - for round := 0; round < 3; round++ { - select { - case <-ticker.C: - count := rand.Intn(100) - m := message{ - Key: strconv.FormatInt(time.Now().UnixNano(), 10), - Value: fmt.Sprintf("%d,%d", round, count), - Payload: fmt.Sprintf("%d,%d", round, count), - } - body, err := json.Marshal(m) - if err != nil { - log.Fatal(err) - } - - fmt.Println(string(body)) - if err := pusher.Push(string(body)); err != nil { - log.Fatal(err) - } - } - } - - cmdline.EnterToContinue() -} diff --git a/example/queue/poll/poller.go b/example/queue/poll/poller.go deleted file mode 100644 index 44474cd8..00000000 --- a/example/queue/poll/poller.go +++ /dev/null @@ -1,86 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "log" - "sync" - "time" - - "zero/core/discov" - "zero/core/lang" - "zero/core/logx" - "zero/core/service" - "zero/core/stores/redis" - "zero/rq" -) - -var ( - redisHost = flag.String("redis", "localhost:6379", "") - redisType = flag.String("type", "node", "") - redisKey = flag.String("key", "queue", "") - producers = flag.Int("producers", 1, "") - dropBefore = flag.Int64("drop", 0, "messages before seconds to drop") -) - -type Consumer struct { - lock sync.Mutex - resources map[string]interface{} -} - -func NewConsumer() *Consumer { - return &Consumer{ - resources: make(map[string]interface{}), - } -} - -func (c *Consumer) Consume(msg string) error { - fmt.Println("=>", msg) - c.lock.Lock() - defer c.lock.Unlock() - - c.resources[msg] = lang.Placeholder - - return nil -} - -func (c *Consumer) OnEvent(event interface{}) { - fmt.Printf("event: %+v\n", event) -} - -func main() { - flag.Parse() - - consumer := NewConsumer() - q, err := rq.NewMessageQueue(rq.RmqConf{ - ServiceConf: service.ServiceConf{ - Name: "queue", - Log: logx.LogConf{ - Path: "logs", - KeepDays: 3, - Compress: true, - }, - }, - Redis: redis.RedisKeyConf{ - RedisConf: redis.RedisConf{ - Host: *redisHost, - Type: *redisType, - }, - Key: *redisKey, - }, - Etcd: discov.EtcdConf{ - Hosts: []string{ - "localhost:2379", - }, - Key: "queue", - }, - DropBefore: *dropBefore, - NumProducers: *producers, - }, rq.WithHandler(consumer), rq.WithRenewId(time.Now().UnixNano())) - if err != nil { - log.Fatal(err) - } - defer q.Stop() - - q.Start() -} diff --git a/example/queue/push/pusher.go b/example/queue/push/pusher.go deleted file mode 100644 index da224271..00000000 --- a/example/queue/push/pusher.go +++ /dev/null @@ -1,31 +0,0 @@ -package main - -import ( - "log" - "strconv" - "time" - - "zero/core/discov" - "zero/rq" - - "github.com/google/gops/agent" -) - -func main() { - if err := agent.Listen(agent.Options{}); err != nil { - log.Fatal(err) - } - - pusher, err := rq.NewPusher([]string{"localhost:2379"}, "queue", rq.WithConsistentStrategy( - func(msg string) (string, string, error) { - return msg, msg, nil - }, discov.BalanceWithId()), rq.WithServerSensitive()) - if err != nil { - log.Fatal(err) - } - - for i := 0; ; i++ { - pusher.Push(strconv.Itoa(i)) - time.Sleep(time.Second) - } -} diff --git a/example/redis/cluster.go b/example/redis/cluster.go deleted file mode 100644 index 5cf76859..00000000 --- a/example/redis/cluster.go +++ /dev/null @@ -1,62 +0,0 @@ -package main - -import ( - "flag" - "log" - - "zero/core/logx" - "zero/core/queue" - "zero/core/service" - "zero/core/stores/redis" - "zero/rq" -) - -var ( - host = flag.String("s", "10.24.232.63:7002", "server address") - mode = flag.String("m", "queue", "cluster test mode") -) - -type bridgeHandler struct { - pusher queue.QueuePusher -} - -func newBridgeHandler() rq.ConsumeHandler { - return bridgeHandler{} -} - -func (h bridgeHandler) Consume(str string) error { - logx.Info("=>", str) - return nil -} - -func main() { - flag.Parse() - - if *mode == "queue" { - mq, err := rq.NewMessageQueue(rq.RmqConf{ - ServiceConf: service.ServiceConf{ - Log: logx.LogConf{ - Path: "logs", - }, - }, - Redis: redis.RedisKeyConf{ - RedisConf: redis.RedisConf{ - Host: *host, - Type: "cluster", - }, - Key: "notexist", - }, - NumProducers: 1, - }, rq.WithHandler(newBridgeHandler())) - if err != nil { - log.Fatal(err) - } - defer mq.Stop() - - mq.Start() - } else { - rds := redis.NewRedis(*host, "cluster") - rds.Llen("notexist") - select {} - } -} diff --git a/example/sqlc/user.go b/example/sqlc/user.go deleted file mode 100644 index 84f892fd..00000000 --- a/example/sqlc/user.go +++ /dev/null @@ -1,142 +0,0 @@ -package main - -import ( - "database/sql" - "fmt" - - "zero/core/stores/cache" - "zero/core/stores/sqlc" - "zero/core/stores/sqlx" - "zero/kq" -) - -var ( - userRows = "id, mobile, name, sex" - - cacheUserMobilePrefix = "cache#user#mobile#" - cacheUserIdPrefix = "cache#user#id#" - - ErrNotFound = sqlc.ErrNotFound -) - -type ( - User struct { - Id int64 `db:"id" json:"id,omitempty"` - Mobile string `db:"mobile" json:"mobile,omitempty"` - Name string `db:"name" json:"name,omitempty"` - Sex int `db:"sex" json:"sex,omitempty"` - } - - UserModel struct { - sqlc.CachedConn - // sqlx.SqlConn - table string - - // kafka use kq not kmq - push *kq.Pusher - } -) - -func NewUserModel(db sqlx.SqlConn, c cache.CacheConf, table string, pusher *kq.Pusher) *UserModel { - return &UserModel{ - CachedConn: sqlc.NewConn(db, c), - table: table, - push: pusher, - } -} - -func (um *UserModel) FindOne(id int64) (*User, error) { - key := fmt.Sprintf("%s%d", cacheUserIdPrefix, id) - var user User - err := um.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error { - query := fmt.Sprintf("SELECT %s FROM user WHERE id=?", userRows) - return conn.QueryRow(v, query, id) - }) - switch err { - case nil: - return &user, nil - case sqlc.ErrNotFound: - return nil, ErrNotFound - default: - return nil, err - } -} - -func (um *UserModel) FindByMobile(mobile string) (*User, error) { - var user User - key := fmt.Sprintf("%s%s", cacheUserMobilePrefix, mobile) - err := um.QueryRowIndex(&user, key, func(primary interface{}) string { - return fmt.Sprintf("%s%d", cacheUserIdPrefix, primary.(int64)) - }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) { - query := fmt.Sprintf("SELECT %s FROM user WHERE mobile=?", userRows) - if err := conn.QueryRow(&user, query, mobile); err != nil { - return nil, err - } - return user.Id, nil - }, func(conn sqlx.SqlConn, v interface{}, primary interface{}) error { - return conn.QueryRow(v, "SELECT * FROM user WHERE id=?", primary) - }) - switch err { - case nil: - return &user, nil - case sqlc.ErrNotFound: - return nil, ErrNotFound - default: - return nil, err - } -} - -// Count for no cache -func (um *UserModel) Count() (int64, error) { - var count int64 - err := um.QueryRowNoCache(&count, "SELECT count(1) FROM user") - if err != nil { - return 0, err - } - return count, nil -} - -// Query rows -func (um *UserModel) FindByName(name string) ([]*User, error) { - var users []*User - query := fmt.Sprintf("SELECT %s FROM user WHERE name=?", userRows) - err := um.QueryRowsNoCache(&userRows, query, name) - if err != nil { - return nil, err - } - return users, nil -} - -func (um *UserModel) UpdateSexById(sex int, id int64) error { - key := fmt.Sprintf("%s%d", cacheUserIdPrefix, id) - _, err := um.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) { - query := fmt.Sprintf("UPDATE user SET sex=? WHERE id=?") - return conn.Exec(query, sex, id) - }, key) - return err -} - -func (um *UserModel) UpdateMobileById(mobile string, id int64) error { - idKey := fmt.Sprintf("%s%d", cacheUserIdPrefix, id) - mobileKey := fmt.Sprintf("%s%s", cacheUserMobilePrefix, mobile) - _, err := um.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) { - query := fmt.Sprintf("UPDATE user SET mobile=? WHERE id=?") - return conn.Exec(query, mobile, id) - }, idKey, mobileKey) - return err -} - -func (um *UserModel) Update(u *User) error { - oldUser, err := um.FindOne(u.Id) - if err != nil { - return err - } - - idKey := fmt.Sprintf("%s%d", cacheUserIdPrefix, oldUser.Id) - mobileKey := fmt.Sprintf("%s%s", cacheUserMobilePrefix, oldUser.Mobile) - _, err = um.Exec(func(conn sqlx.SqlConn) (result sql.Result, err error) { - query := fmt.Sprintf("UPDATE user SET mobile=?, name=?, sex=? WHERE id=?") - return conn.Exec(query, u.Mobile, u.Name, u.Sex, u.Id) - }, idKey, mobileKey) - return err -} diff --git a/go.mod b/go.mod index 64ad3177..d167eb21 100644 --- a/go.mod +++ b/go.mod @@ -4,10 +4,8 @@ go 1.14 require ( github.com/DATA-DOG/go-sqlmock v1.4.1 - github.com/DataDog/zstd v1.4.5 // indirect github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 // indirect github.com/alicebob/miniredis v2.5.0+incompatible - github.com/beanstalkd/go-beanstalk v0.1.0 github.com/coreos/bbolt v1.3.1-coreos.6 // indirect github.com/coreos/etcd v3.3.18+incompatible github.com/coreos/go-semver v0.2.0 // indirect @@ -24,7 +22,6 @@ require ( github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect github.com/golang/mock v1.4.3 github.com/golang/protobuf v1.4.2 - github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf // indirect github.com/gomodule/redigo v2.0.0+incompatible // indirect github.com/google/btree v1.0.0 // indirect github.com/google/gops v0.3.7 @@ -49,13 +46,11 @@ require ( github.com/pierrec/lz4 v2.5.1+incompatible // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.5.1 - github.com/segmentio/kafka-go v0.3.5 github.com/soheilhy/cmux v0.1.4 // indirect github.com/spaolacci/murmur3 v1.1.0 github.com/stretchr/testify v1.5.1 github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect github.com/urfave/cli v1.22.4 - github.com/xdg/stringprep v1.0.1-0.20180714160509-73f8eece6fdc // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb // indirect go.etcd.io/etcd v3.3.17+incompatible diff --git a/go.sum b/go.sum index 93aa7a72..4c3d5327 100644 --- a/go.sum +++ b/go.sum @@ -2,9 +2,6 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DATA-DOG/go-sqlmock v1.4.1 h1:ThlnYciV1iM/V0OSF/dtkqWb6xo5qITT1TJBG1MRDJM= github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= -github.com/DataDog/zstd v1.4.0/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= -github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= -github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/StackExchange/wmi v0.0.0-20170410192909-ea383cf3ba6e/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= @@ -16,8 +13,6 @@ github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6/go.mod h1:SGn github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI= github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk= github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q= -github.com/beanstalkd/go-beanstalk v0.1.0 h1:IiNwYbAoVBDs5xEOmleGoX+DRD3Moz99EpATbl8672w= -github.com/beanstalkd/go-beanstalk v0.1.0/go.mod h1:/G8YTyChOtpOArwLTQPY1CHB+i212+av35bkPXXj56Y= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -52,8 +47,6 @@ github.com/dchest/siphash v1.2.1 h1:4cLinnzVJDKxTCl9B01807Yiy+W7ZzVHj/KIroQRvT4= github.com/dchest/siphash v1.2.1/go.mod h1:q+IRvb2gOSrUnYoPqHiyHXS0FOBBOdl6tONBlVnOnt4= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= -github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= -github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s= @@ -100,9 +93,6 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf h1:gFVkHXmVAhEbxZVDln5V9GKrLaluNoFHDbrZwAWZgws= -github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= @@ -222,8 +212,6 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= -github.com/segmentio/kafka-go v0.3.5 h1:2JVT1inno7LxEASWj+HflHh5sWGfM0gkRiLAxkXhGG4= -github.com/segmentio/kafka-go v0.3.5/go.mod h1:OT5KXBPbaJJTcvokhWR2KFmm0niEx3mnccTwjmLvSi4= github.com/shirou/gopsutil v0.0.0-20180427012116-c95755e4bcd7/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 h1:udFKJ0aHUL60LboW/A+DfgoHVedieIzIXE8uylPue0U= github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc= @@ -248,11 +236,6 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 h1:lYIiVD github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/urfave/cli v1.22.4 h1:u7tSpNPPswAFymm8IehJhy4uJMlUuU/GmqSkvJ1InXA= github.com/urfave/cli v1.22.4/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= -github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= -github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= -github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= -github.com/xdg/stringprep v1.0.1-0.20180714160509-73f8eece6fdc h1:vIp1tjhVogU0yBy7w96P027ewvNPeH6gzuNcoc+NReU= -github.com/xdg/stringprep v1.0.1-0.20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6 h1:YdYsPAZ2pC6Tow/nPZOPQ96O3hm/ToAkGsPLzedXERk= @@ -275,7 +258,6 @@ go.uber.org/zap v1.12.0 h1:dySoUQPFBGj6xwjmBzageVL8jGi8uxc6bEmJQjA06bw= go.uber.org/zap v1.12.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= diff --git a/kq/config.go b/kq/config.go deleted file mode 100644 index 773887b4..00000000 --- a/kq/config.go +++ /dev/null @@ -1,21 +0,0 @@ -package kq - -import "zero/core/service" - -const ( - firstOffset = "first" - lastOffset = "last" -) - -type KqConf struct { - service.ServiceConf - Brokers []string - Group string - Topic string - Offset string `json:",options=first|last,default=last"` - NumConns int `json:",default=1"` - NumProducers int `json:",default=8"` - NumConsumers int `json:",default=8"` - MinBytes int `json:",default=10240"` // 10K - MaxBytes int `json:",default=10485760"` // 10M -} diff --git a/kq/pusher.go b/kq/pusher.go deleted file mode 100644 index f95bab0f..00000000 --- a/kq/pusher.go +++ /dev/null @@ -1,101 +0,0 @@ -package kq - -import ( - "context" - "strconv" - "time" - - "zero/core/executors" - "zero/core/logx" - - "github.com/segmentio/kafka-go" - "github.com/segmentio/kafka-go/snappy" -) - -type ( - PushOption func(options *chunkOptions) - - Pusher struct { - produer *kafka.Writer - topic string - executor *executors.ChunkExecutor - } - - chunkOptions struct { - chunkSize int - flushInterval time.Duration - } -) - -func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher { - producer := kafka.NewWriter(kafka.WriterConfig{ - Brokers: addrs, - Topic: topic, - Balancer: &kafka.LeastBytes{}, - CompressionCodec: snappy.NewCompressionCodec(), - }) - - pusher := &Pusher{ - produer: producer, - topic: topic, - } - pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) { - chunk := make([]kafka.Message, len(tasks)) - for i := range tasks { - chunk[i] = tasks[i].(kafka.Message) - } - if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil { - logx.Error(err) - } - }, newOptions(opts)...) - - return pusher -} - -func (p *Pusher) Close() error { - return p.produer.Close() -} - -func (p *Pusher) Name() string { - return p.topic -} - -func (p *Pusher) Push(v string) error { - msg := kafka.Message{ - Key: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)), - Value: []byte(v), - } - if p.executor != nil { - return p.executor.Add(msg, len(v)) - } else { - return p.produer.WriteMessages(context.Background(), msg) - } -} - -func WithChunkSize(chunkSize int) PushOption { - return func(options *chunkOptions) { - options.chunkSize = chunkSize - } -} - -func WithFlushInterval(interval time.Duration) PushOption { - return func(options *chunkOptions) { - options.flushInterval = interval - } -} - -func newOptions(opts []PushOption) []executors.ChunkOption { - var options chunkOptions - for _, opt := range opts { - opt(&options) - } - - var chunkOpts []executors.ChunkOption - if options.chunkSize > 0 { - chunkOpts = append(chunkOpts, executors.WithChunkBytes(options.chunkSize)) - } - if options.flushInterval > 0 { - chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval)) - } - return chunkOpts -} diff --git a/kq/queue.go b/kq/queue.go deleted file mode 100644 index 823ceda4..00000000 --- a/kq/queue.go +++ /dev/null @@ -1,229 +0,0 @@ -package kq - -import ( - "context" - "io" - "log" - "time" - - "zero/core/logx" - "zero/core/queue" - "zero/core/service" - "zero/core/stat" - "zero/core/threading" - "zero/core/timex" - - "github.com/segmentio/kafka-go" - _ "github.com/segmentio/kafka-go/gzip" - _ "github.com/segmentio/kafka-go/lz4" - _ "github.com/segmentio/kafka-go/snappy" -) - -const ( - defaultCommitInterval = time.Second - defaultMaxWait = time.Second -) - -type ( - ConsumeHandle func(key, value string) error - - ConsumeHandler interface { - Consume(key, value string) error - } - - queueOptions struct { - commitInterval time.Duration - maxWait time.Duration - metrics *stat.Metrics - } - - QueueOption func(*queueOptions) - - kafkaQueue struct { - c KqConf - consumer *kafka.Reader - handler ConsumeHandler - channel chan kafka.Message - producerRoutines *threading.RoutineGroup - consumerRoutines *threading.RoutineGroup - metrics *stat.Metrics - } - - kafkaQueues struct { - queues []queue.MessageQueue - group *service.ServiceGroup - } -) - -func MustNewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue { - q, err := NewQueue(c, handler, opts...) - if err != nil { - log.Fatal(err) - } - - return q -} - -func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error) { - if err := c.SetUp(); err != nil { - return nil, err - } - - var options queueOptions - for _, opt := range opts { - opt(&options) - } - ensureQueueOptions(c, &options) - - if c.NumConns < 1 { - c.NumConns = 1 - } - q := kafkaQueues{ - group: service.NewServiceGroup(), - } - for i := 0; i < c.NumConns; i++ { - q.queues = append(q.queues, newKafkaQueue(c, handler, options)) - } - - return q, nil -} - -func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue.MessageQueue { - var offset int64 - if c.Offset == firstOffset { - offset = kafka.FirstOffset - } else { - offset = kafka.LastOffset - } - consumer := kafka.NewReader(kafka.ReaderConfig{ - Brokers: c.Brokers, - GroupID: c.Group, - Topic: c.Topic, - StartOffset: offset, - MinBytes: c.MinBytes, // 10KB - MaxBytes: c.MaxBytes, // 10MB - MaxWait: options.maxWait, - CommitInterval: options.commitInterval, - }) - - return &kafkaQueue{ - c: c, - consumer: consumer, - handler: handler, - channel: make(chan kafka.Message), - producerRoutines: threading.NewRoutineGroup(), - consumerRoutines: threading.NewRoutineGroup(), - metrics: options.metrics, - } -} - -func (q *kafkaQueue) Start() { - q.startConsumers() - q.startProducers() - - q.producerRoutines.Wait() - close(q.channel) - q.consumerRoutines.Wait() -} - -func (q *kafkaQueue) Stop() { - q.consumer.Close() - logx.Close() -} - -func (q *kafkaQueue) consumeOne(key, val string) error { - startTime := timex.Now() - err := q.handler.Consume(key, val) - q.metrics.Add(stat.Task{ - Duration: timex.Since(startTime), - }) - return err -} - -func (q *kafkaQueue) startConsumers() { - for i := 0; i < q.c.NumConsumers; i++ { - q.consumerRoutines.Run(func() { - for msg := range q.channel { - if err := q.consumeOne(string(msg.Key), string(msg.Value)); err != nil { - logx.Errorf("Error on consuming: %s, error: %v", string(msg.Value), err) - } - } - }) - } -} - -func (q *kafkaQueue) startProducers() { - for i := 0; i < q.c.NumProducers; i++ { - q.producerRoutines.Run(func() { - for { - msg, err := q.consumer.ReadMessage(context.Background()) - // io.EOF means consumer closed - // io.ErrClosedPipe means committing messages on the consumer, - // kafka will refire the messages on uncommitted messages, ignore - if err == io.EOF || err == io.ErrClosedPipe { - return - } - if err != nil { - logx.Errorf("Error on reading mesage, %q", err.Error()) - continue - } - q.channel <- msg - } - }) - } -} - -func (q kafkaQueues) Start() { - for _, each := range q.queues { - q.group.Add(each) - } - q.group.Start() -} - -func (q kafkaQueues) Stop() { - q.group.Stop() -} - -func WithCommitInterval(interval time.Duration) QueueOption { - return func(options *queueOptions) { - options.commitInterval = interval - } -} - -func WithHandle(handle ConsumeHandle) ConsumeHandler { - return innerConsumeHandler{ - handle: handle, - } -} - -func WithMaxWait(wait time.Duration) QueueOption { - return func(options *queueOptions) { - options.maxWait = wait - } -} - -func WithMetrics(metrics *stat.Metrics) QueueOption { - return func(options *queueOptions) { - options.metrics = metrics - } -} - -type innerConsumeHandler struct { - handle ConsumeHandle -} - -func (ch innerConsumeHandler) Consume(k, v string) error { - return ch.handle(k, v) -} - -func ensureQueueOptions(c KqConf, options *queueOptions) { - if options.commitInterval == 0 { - options.commitInterval = defaultCommitInterval - } - if options.maxWait == 0 { - options.maxWait = defaultMaxWait - } - if options.metrics == nil { - options.metrics = stat.NewMetrics(c.Name) - } -} diff --git a/rq/config.go b/rq/config.go deleted file mode 100644 index 0a4223ae..00000000 --- a/rq/config.go +++ /dev/null @@ -1,18 +0,0 @@ -package rq - -import ( - "zero/core/discov" - "zero/core/service" - "zero/core/stores/redis" -) - -type RmqConf struct { - service.ServiceConf - Redis redis.RedisKeyConf - Etcd discov.EtcdConf `json:",optional"` - NumProducers int `json:",optional"` - NumConsumers int `json:",optional"` - Timeout int64 `json:",optional"` - DropBefore int64 `json:",optional"` - ServerSensitive bool `json:",default=false"` -} diff --git a/rq/etc/config.json b/rq/etc/config.json deleted file mode 100644 index f8951987..00000000 --- a/rq/etc/config.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "Log": { - "Access": "logs/access.log", - "Error": "logs/error.log", - "Stat": "logs/stat.log" - }, - "MetricsUrl": "http://localhost:2222/add", - "Redis": { - "Host": "localhost:6379", - "Type": "node", - "Key": "reqs" - }, - "Etcd": { - "Hosts": [ - "localhost:2379" - ], - "EtcdKey": "rq" - } -} diff --git a/rq/internal/conf.go b/rq/internal/conf.go deleted file mode 100644 index db294632..00000000 --- a/rq/internal/conf.go +++ /dev/null @@ -1,19 +0,0 @@ -package internal - -import ( - "zero/core/queue" - "zero/core/stores/redis" -) - -type RedisKeyConf struct { - redis.RedisConf - Key string `json:",optional"` -} - -func (rkc RedisKeyConf) NewProducer(opts ...ProducerOption) (queue.Producer, error) { - return newProducer(rkc.NewRedis(), rkc.Key, opts...) -} - -func (rkc RedisKeyConf) NewPusher(opts ...PusherOption) queue.QueuePusher { - return NewPusher(rkc.NewRedis(), rkc.Key, opts...) -} diff --git a/rq/internal/const.go b/rq/internal/const.go deleted file mode 100644 index c444ea2d..00000000 --- a/rq/internal/const.go +++ /dev/null @@ -1,7 +0,0 @@ -package internal - -const ( - Delimeter = "/" - ServerSensitivePrefix = '*' - TimedQueueType = "timed" -) diff --git a/rq/internal/hashchange.go b/rq/internal/hashchange.go deleted file mode 100644 index 5409705f..00000000 --- a/rq/internal/hashchange.go +++ /dev/null @@ -1,39 +0,0 @@ -package internal - -import ( - "math/rand" - - "zero/core/hash" -) - -type HashChange struct { - id int64 - oldHash *hash.ConsistentHash - newHash *hash.ConsistentHash -} - -func NewHashChange(oldHash, newHash *hash.ConsistentHash) HashChange { - return HashChange{ - id: rand.Int63(), - oldHash: oldHash, - newHash: newHash, - } -} - -func (hc HashChange) GetId() int64 { - return hc.id -} - -func (hc HashChange) ShallEvict(key interface{}) bool { - oldTarget, oldOk := hc.oldHash.Get(key) - if !oldOk { - return false - } - - newTarget, newOk := hc.newHash.Get(key) - if !newOk { - return false - } - - return oldTarget != newTarget -} diff --git a/rq/internal/message.go b/rq/internal/message.go deleted file mode 100644 index 292d96b7..00000000 --- a/rq/internal/message.go +++ /dev/null @@ -1,6 +0,0 @@ -package internal - -type TimedMessage struct { - Time int64 `json:"time"` - Payload string `json:"payload"` -} diff --git a/rq/internal/redisqueue_test.go b/rq/internal/redisqueue_test.go deleted file mode 100644 index ff9025d9..00000000 --- a/rq/internal/redisqueue_test.go +++ /dev/null @@ -1,82 +0,0 @@ -package internal - -import ( - "strconv" - "sync" - "testing" - - "zero/core/logx" - "zero/core/queue" - "zero/core/stores/redis" - - "github.com/alicebob/miniredis" - "github.com/stretchr/testify/assert" -) - -func init() { - logx.Disable() -} - -func TestRedisQueue(t *testing.T) { - const ( - total = 1000 - key = "queue" - ) - r, err := miniredis.Run() - assert.Nil(t, err) - - c := RedisKeyConf{ - RedisConf: redis.RedisConf{ - Host: r.Addr(), - Type: redis.NodeType, - }, - Key: key, - } - - pusher := NewPusher(c.NewRedis(), key, WithTime()) - assert.True(t, len(pusher.Name()) > 0) - for i := 0; i < total; i++ { - err := pusher.Push(strconv.Itoa(i)) - assert.Nil(t, err) - } - - consumer := new(mockedConsumer) - consumer.wait.Add(total) - q := queue.NewQueue(func() (queue.Producer, error) { - return c.NewProducer(TimeSensitive(5)) - }, func() (queue.Consumer, error) { - return consumer, nil - }) - q.SetNumProducer(1) - q.SetNumConsumer(1) - go func() { - q.Start() - }() - consumer.wait.Wait() - q.Stop() - - var expect int - for i := 0; i < total; i++ { - expect ^= i - } - assert.Equal(t, expect, consumer.xor) -} - -type mockedConsumer struct { - wait sync.WaitGroup - xor int -} - -func (c *mockedConsumer) Consume(s string) error { - val, err := strconv.Atoi(s) - if err != nil { - return err - } - - c.xor ^= val - c.wait.Done() - return nil -} - -func (c *mockedConsumer) OnEvent(event interface{}) { -} diff --git a/rq/internal/redisqueueproducer.go b/rq/internal/redisqueueproducer.go deleted file mode 100644 index 4f7c993a..00000000 --- a/rq/internal/redisqueueproducer.go +++ /dev/null @@ -1,166 +0,0 @@ -package internal - -import ( - "fmt" - "sync" - "time" - - "zero/core/jsonx" - "zero/core/logx" - "zero/core/queue" - "zero/core/stores/redis" -) - -const ( - logIntervalMillis = 1000 - retryRedisInterval = time.Second -) - -type ( - ProducerOption func(p queue.Producer) queue.Producer - - RedisQueueProducer struct { - name string - store *redis.Redis - key string - redisNode redis.ClosableNode - listeners []queue.ProduceListener - } -) - -func NewProducerFactory(store *redis.Redis, key string, opts ...ProducerOption) queue.ProducerFactory { - return func() (queue.Producer, error) { - return newProducer(store, key, opts...) - } -} - -func (p *RedisQueueProducer) AddListener(listener queue.ProduceListener) { - p.listeners = append(p.listeners, listener) -} - -func (p *RedisQueueProducer) Name() string { - return p.name -} - -func (p *RedisQueueProducer) Produce() (string, bool) { - lessLogger := logx.NewLessLogger(logIntervalMillis) - - for { - value, ok, err := p.store.BlpopEx(p.redisNode, p.key) - if err == nil { - return value, ok - } else if err == redis.Nil { - // timed out without elements popped - continue - } else { - lessLogger.Errorf("Error on blpop: %v", err) - p.waitForRedisAvailable() - } - } -} - -func newProducer(store *redis.Redis, key string, opts ...ProducerOption) (queue.Producer, error) { - redisNode, err := redis.CreateBlockingNode(store) - if err != nil { - return nil, err - } - - var producer queue.Producer = &RedisQueueProducer{ - name: fmt.Sprintf("%s/%s/%s", store.Type, store.Addr, key), - store: store, - key: key, - redisNode: redisNode, - } - - for _, opt := range opts { - producer = opt(producer) - } - - return producer, nil -} - -func (p *RedisQueueProducer) resetRedisConnection() error { - if p.redisNode != nil { - p.redisNode.Close() - p.redisNode = nil - } - - redisNode, err := redis.CreateBlockingNode(p.store) - if err != nil { - return err - } - - p.redisNode = redisNode - return nil -} - -func (p *RedisQueueProducer) waitForRedisAvailable() { - var paused bool - var pauseOnce sync.Once - - for { - if err := p.resetRedisConnection(); err != nil { - pauseOnce.Do(func() { - paused = true - for _, listener := range p.listeners { - listener.OnProducerPause() - } - }) - logx.Errorf("Error occurred while connect to redis: %v", err) - time.Sleep(retryRedisInterval) - } else { - break - } - } - - if paused { - for _, listener := range p.listeners { - listener.OnProducerResume() - } - } -} - -func TimeSensitive(seconds int64) ProducerOption { - return func(p queue.Producer) queue.Producer { - if seconds > 0 { - return autoDropQueueProducer{ - seconds: seconds, - producer: p, - } - } - - return p - } -} - -type autoDropQueueProducer struct { - seconds int64 // seconds before to drop - producer queue.Producer -} - -func (p autoDropQueueProducer) AddListener(listener queue.ProduceListener) { - p.producer.AddListener(listener) -} - -func (p autoDropQueueProducer) Produce() (string, bool) { - lessLogger := logx.NewLessLogger(logIntervalMillis) - - for { - content, ok := p.producer.Produce() - if !ok { - return "", false - } - - var timedMsg TimedMessage - if err := jsonx.UnmarshalFromString(content, &timedMsg); err != nil { - lessLogger.Errorf("invalid timedMessage: %s, error: %s", content, err.Error()) - continue - } - - if timedMsg.Time+p.seconds < time.Now().Unix() { - lessLogger.Errorf("expired timedMessage: %s", content) - } - - return timedMsg.Payload, true - } -} diff --git a/rq/internal/redisqueuepusher.go b/rq/internal/redisqueuepusher.go deleted file mode 100644 index 2b218d60..00000000 --- a/rq/internal/redisqueuepusher.go +++ /dev/null @@ -1,78 +0,0 @@ -package internal - -import ( - "fmt" - "time" - - "zero/core/jsonx" - "zero/core/logx" - "zero/core/queue" - "zero/core/stores/redis" -) - -type ( - PusherOption func(p queue.QueuePusher) queue.QueuePusher - - RedisQueuePusher struct { - name string - store *redis.Redis - key string - } -) - -func NewPusher(store *redis.Redis, key string, opts ...PusherOption) queue.QueuePusher { - var pusher queue.QueuePusher = &RedisQueuePusher{ - name: fmt.Sprintf("%s/%s/%s", store.Type, store.Addr, key), - store: store, - key: key, - } - - for _, opt := range opts { - pusher = opt(pusher) - } - - return pusher -} - -func (saver *RedisQueuePusher) Name() string { - return saver.name -} - -func (saver *RedisQueuePusher) Push(message string) error { - _, err := saver.store.Rpush(saver.key, message) - if nil != err { - return err - } - - logx.Infof("<= %s", message) - return nil -} - -func WithTime() PusherOption { - return func(p queue.QueuePusher) queue.QueuePusher { - return timedQueuePusher{ - pusher: p, - } - } -} - -type timedQueuePusher struct { - pusher queue.QueuePusher -} - -func (p timedQueuePusher) Name() string { - return p.pusher.Name() -} - -func (p timedQueuePusher) Push(message string) error { - tm := TimedMessage{ - Time: time.Now().Unix(), - Payload: message, - } - - if content, err := jsonx.Marshal(tm); err != nil { - return err - } else { - return p.pusher.Push(string(content)) - } -} diff --git a/rq/internal/update/incrementalupdater.go b/rq/internal/update/incrementalupdater.go deleted file mode 100644 index d8187095..00000000 --- a/rq/internal/update/incrementalupdater.go +++ /dev/null @@ -1,179 +0,0 @@ -package update - -import ( - "sync" - "time" - - "zero/core/hash" - "zero/core/stringx" -) - -const ( - incrementalStep = 5 - stepDuration = time.Second * 3 -) - -type ( - updateEvent struct { - keys []string - newKey string - servers []string - } - - UpdateFunc func(change ServerChange) - - IncrementalUpdater struct { - lock sync.Mutex - started bool - taskChan chan updateEvent - updates ServerChange - updateFn UpdateFunc - pendingEvents []updateEvent - } -) - -func NewIncrementalUpdater(updateFn UpdateFunc) *IncrementalUpdater { - return &IncrementalUpdater{ - taskChan: make(chan updateEvent), - updates: ServerChange{ - Current: Snapshot{ - Keys: make([]string, 0), - WeightedKeys: make([]weightedKey, 0), - }, - Servers: make([]string, 0), - }, - updateFn: updateFn, - } -} - -func (ru *IncrementalUpdater) Update(keys []string, servers []string, newKey string) { - ru.lock.Lock() - defer ru.lock.Unlock() - - if !ru.started { - go ru.run() - ru.started = true - } - - ru.taskChan <- updateEvent{ - keys: keys, - newKey: newKey, - servers: servers, - } -} - -// Return true if incremental update is done -func (ru *IncrementalUpdater) advance() bool { - previous := ru.updates.Current - keys := make([]string, 0) - weightedKeys := make([]weightedKey, 0) - servers := ru.updates.Servers - for _, key := range ru.updates.Current.Keys { - keys = append(keys, key) - } - for _, wkey := range ru.updates.Current.WeightedKeys { - weight := wkey.Weight + incrementalStep - if weight >= hash.TopWeight { - keys = append(keys, wkey.Key) - } else { - weightedKeys = append(weightedKeys, weightedKey{ - Key: wkey.Key, - Weight: weight, - }) - } - } - - for _, event := range ru.pendingEvents { - // ignore reload events - if len(event.newKey) == 0 || len(event.servers) == 0 { - continue - } - - // anyway, add the servers, just to avoid missing notify any server - servers = stringx.Union(servers, event.servers) - if keyExists(keys, weightedKeys, event.newKey) { - continue - } - - weightedKeys = append(weightedKeys, weightedKey{ - Key: event.newKey, - Weight: incrementalStep, - }) - } - - // clear pending events - ru.pendingEvents = ru.pendingEvents[:0] - - change := ServerChange{ - Previous: previous, - Current: Snapshot{ - Keys: keys, - WeightedKeys: weightedKeys, - }, - Servers: servers, - } - ru.updates = change - ru.updateFn(change) - - return len(weightedKeys) == 0 -} - -func (ru *IncrementalUpdater) run() { - defer func() { - ru.lock.Lock() - ru.started = false - ru.lock.Unlock() - }() - - ticker := time.NewTicker(stepDuration) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if ru.advance() { - return - } - case event := <-ru.taskChan: - ru.updateKeys(event) - } - } -} - -func (ru *IncrementalUpdater) updateKeys(event updateEvent) { - isWeightedKey := func(key string) bool { - for _, wkey := range ru.updates.Current.WeightedKeys { - if wkey.Key == key { - return true - } - } - - return false - } - - keys := make([]string, 0, len(event.keys)) - for _, key := range event.keys { - if !isWeightedKey(key) { - keys = append(keys, key) - } - } - - ru.updates.Current.Keys = keys - ru.pendingEvents = append(ru.pendingEvents, event) -} - -func keyExists(keys []string, weightedKeys []weightedKey, key string) bool { - for _, each := range keys { - if key == each { - return true - } - } - - for _, wkey := range weightedKeys { - if wkey.Key == key { - return true - } - } - - return false -} diff --git a/rq/internal/update/serverchange.go b/rq/internal/update/serverchange.go deleted file mode 100644 index 04577833..00000000 --- a/rq/internal/update/serverchange.go +++ /dev/null @@ -1,106 +0,0 @@ -package update - -import ( - "crypto/md5" - "errors" - "fmt" - "io" - "sort" - - "zero/core/hash" - "zero/core/jsonx" - "zero/rq/internal" -) - -var ErrInvalidServerChange = errors.New("not a server change message") - -type ( - weightedKey struct { - Key string - Weight int - } - - Snapshot struct { - Keys []string - WeightedKeys []weightedKey - } - - ServerChange struct { - Previous Snapshot - Current Snapshot - Servers []string - } -) - -func (s Snapshot) GetCode() string { - keys := append([]string(nil), s.Keys...) - sort.Strings(keys) - weightedKeys := append([]weightedKey(nil), s.WeightedKeys...) - sort.SliceStable(weightedKeys, func(i, j int) bool { - return weightedKeys[i].Key < weightedKeys[j].Key - }) - - digest := md5.New() - for _, key := range keys { - io.WriteString(digest, fmt.Sprintf("%s\n", key)) - } - for _, wkey := range weightedKeys { - io.WriteString(digest, fmt.Sprintf("%s:%d\n", wkey.Key, wkey.Weight)) - } - - return fmt.Sprintf("%x", digest.Sum(nil)) -} - -func (sc ServerChange) CreateCurrentHash() *hash.ConsistentHash { - curHash := hash.NewConsistentHash() - - for _, key := range sc.Current.Keys { - curHash.Add(key) - } - for _, wkey := range sc.Current.WeightedKeys { - curHash.AddWithWeight(wkey.Key, wkey.Weight) - } - - return curHash -} - -func (sc ServerChange) CreatePrevHash() *hash.ConsistentHash { - prevHash := hash.NewConsistentHash() - - for _, key := range sc.Previous.Keys { - prevHash.Add(key) - } - for _, wkey := range sc.Previous.WeightedKeys { - prevHash.AddWithWeight(wkey.Key, wkey.Weight) - } - - return prevHash -} - -func (sc ServerChange) GetCode() string { - return sc.Current.GetCode() -} - -func IsServerChange(message string) bool { - return len(message) > 0 && message[0] == internal.ServerSensitivePrefix -} - -func (sc ServerChange) Marshal() (string, error) { - body, err := jsonx.Marshal(sc) - if err != nil { - return "", err - } - - return string(append([]byte{internal.ServerSensitivePrefix}, body...)), nil -} - -func UnmarshalServerChange(body string) (ServerChange, error) { - if len(body) == 0 { - return ServerChange{}, ErrInvalidServerChange - } - - var change ServerChange - err := jsonx.UnmarshalFromString(body[1:], &change) - - return change, err -} diff --git a/rq/pusher.go b/rq/pusher.go deleted file mode 100644 index 154dad62..00000000 --- a/rq/pusher.go +++ /dev/null @@ -1,445 +0,0 @@ -package rq - -import ( - "context" - "errors" - "fmt" - "strings" - "sync" - "time" - - "zero/core/discov" - "zero/core/errorx" - "zero/core/jsonx" - "zero/core/lang" - "zero/core/logx" - "zero/core/queue" - "zero/core/stores/redis" - "zero/core/threading" - "zero/rq/internal" - "zero/rq/internal/update" -) - -const ( - retryTimes = 3 - etcdRedisFields = 4 -) - -var ErrPusherTypeError = errors.New("not a QueuePusher instance") - -type ( - KeyFn func(string) (key, payload string, err error) - KeysFn func(string) (ctx context.Context, keys []string, err error) - AssembleFn func(context.Context, []string) (payload string, err error) - PusherOption func(*Pusher) error - - // just push once or do it retryTimes, it's a choice. - // because only when at least a server is alive, and - // pushing to the server failed, we'll return with an error - // if waken up, but the server is going down very quickly, - // we're going to wait again. so it's safe to push once. - pushStrategy interface { - addListener(listener discov.Listener) - push(string) error - } - - batchConsistentStrategy struct { - keysFn KeysFn - assembleFn AssembleFn - subClient *discov.BatchConsistentSubClient - } - - consistentStrategy struct { - keyFn KeyFn - subClient *discov.ConsistentSubClient - } - - roundRobinStrategy struct { - subClient *discov.RoundRobinSubClient - } - - serverListener struct { - updater *update.IncrementalUpdater - } - - Pusher struct { - name string - endpoints []string - key string - failovers sync.Map - strategy pushStrategy - serverSensitive bool - } -) - -func NewPusher(endpoints []string, key string, opts ...PusherOption) (*Pusher, error) { - pusher := &Pusher{ - name: getName(key), - endpoints: endpoints, - key: key, - } - - if len(opts) == 0 { - opts = []PusherOption{WithRoundRobinStrategy()} - } - - for _, opt := range opts { - if err := opt(pusher); err != nil { - return nil, err - } - } - - if pusher.serverSensitive { - listener := new(serverListener) - listener.updater = update.NewIncrementalUpdater(listener.update) - pusher.strategy.addListener(listener) - } - - return pusher, nil -} - -func (pusher *Pusher) Name() string { - return pusher.name -} - -func (pusher *Pusher) Push(message string) error { - return pusher.strategy.push(message) -} - -func (pusher *Pusher) close(server string, conn interface{}) error { - logx.Errorf("dropped redis node: %s", server) - - return pusher.failover(server) -} - -func (pusher *Pusher) dial(server string) (interface{}, error) { - pusher.failovers.Delete(server) - - p, err := newPusher(server) - if err != nil { - return nil, err - } - - logx.Infof("new redis node: %s", server) - - return p, nil -} - -func (pusher *Pusher) failover(server string) error { - pusher.failovers.Store(server, lang.Placeholder) - - rds, key, option, err := newRedisWithKey(server) - if err != nil { - return err - } - - threading.GoSafe(func() { - defer pusher.failovers.Delete(server) - - for { - _, ok := pusher.failovers.Load(server) - if !ok { - logx.Infof("redis queue (%s) revived", server) - return - } - - message, err := rds.Lpop(key) - if err != nil { - logx.Error(err) - return - } - - if len(message) == 0 { - logx.Infof("repush redis queue (%s) done", server) - return - } - - if option == internal.TimedQueueType { - message, err = unwrapTimedMessage(message) - if err != nil { - logx.Errorf("invalid timedMessage: %s, error: %s", message, err.Error()) - return - } - } - - if err = pusher.strategy.push(message); err != nil { - logx.Error(err) - return - } - } - }) - - return nil -} - -func UnmarshalPusher(server string) (queue.QueuePusher, error) { - store, key, option, err := newRedisWithKey(server) - if err != nil { - return nil, err - } - - if option == internal.TimedQueueType { - return internal.NewPusher(store, key, internal.WithTime()), nil - } - - return internal.NewPusher(store, key), nil -} - -func WithBatchConsistentStrategy(keysFn KeysFn, assembleFn AssembleFn, opts ...discov.BalanceOption) PusherOption { - return func(pusher *Pusher) error { - subClient, err := discov.NewBatchConsistentSubClient(pusher.endpoints, pusher.key, pusher.dial, - pusher.close, opts...) - if err != nil { - return err - } - - pusher.strategy = batchConsistentStrategy{ - keysFn: keysFn, - assembleFn: assembleFn, - subClient: subClient, - } - - return nil - } -} - -func WithConsistentStrategy(keyFn KeyFn, opts ...discov.BalanceOption) PusherOption { - return func(pusher *Pusher) error { - subClient, err := discov.NewConsistentSubClient(pusher.endpoints, pusher.key, pusher.dial, pusher.close, opts...) - if err != nil { - return err - } - - pusher.strategy = consistentStrategy{ - keyFn: keyFn, - subClient: subClient, - } - - return nil - } -} - -func WithRoundRobinStrategy() PusherOption { - return func(pusher *Pusher) error { - subClient, err := discov.NewRoundRobinSubClient(pusher.endpoints, pusher.key, pusher.dial, pusher.close) - if err != nil { - return err - } - - pusher.strategy = roundRobinStrategy{ - subClient: subClient, - } - - return nil - } -} - -func WithServerSensitive() PusherOption { - return func(pusher *Pusher) error { - pusher.serverSensitive = true - return nil - } -} - -func (bcs batchConsistentStrategy) addListener(listener discov.Listener) { - bcs.subClient.AddListener(listener) -} - -func (bcs batchConsistentStrategy) balance(keys []string) map[interface{}][]string { - // we need to make sure the servers are available, otherwise wait forever - for { - if mapping, ok := bcs.subClient.Next(keys); ok { - return mapping - } else { - bcs.subClient.WaitForServers() - // make sure we don't flood logs too much in extreme conditions - time.Sleep(time.Second) - } - } -} - -func (bcs batchConsistentStrategy) push(message string) error { - ctx, keys, err := bcs.keysFn(message) - if err != nil { - return err - } - - var batchError errorx.BatchError - mapping := bcs.balance(keys) - for conn, connKeys := range mapping { - payload, err := bcs.assembleFn(ctx, connKeys) - if err != nil { - batchError.Add(err) - continue - } - - for i := 0; i < retryTimes; i++ { - if err = bcs.pushOnce(conn, payload); err != nil { - batchError.Add(err) - } else { - break - } - } - } - - return batchError.Err() -} - -func (bcs batchConsistentStrategy) pushOnce(server interface{}, payload string) error { - pusher, ok := server.(queue.QueuePusher) - if ok { - return pusher.Push(payload) - } else { - return ErrPusherTypeError - } -} - -func (cs consistentStrategy) addListener(listener discov.Listener) { - cs.subClient.AddListener(listener) -} - -func (cs consistentStrategy) push(message string) error { - var batchError errorx.BatchError - - key, payload, err := cs.keyFn(message) - if err != nil { - return err - } - - for i := 0; i < retryTimes; i++ { - if err = cs.pushOnce(key, payload); err != nil { - batchError.Add(err) - } else { - return nil - } - } - - return batchError.Err() -} - -func (cs consistentStrategy) pushOnce(key, payload string) error { - // we need to make sure the servers are available, otherwise wait forever - for { - if server, ok := cs.subClient.Next(key); ok { - pusher, ok := server.(queue.QueuePusher) - if ok { - return pusher.Push(payload) - } else { - return ErrPusherTypeError - } - } else { - cs.subClient.WaitForServers() - // make sure we don't flood logs too much in extreme conditions - time.Sleep(time.Second) - } - } -} - -func (rrs roundRobinStrategy) addListener(listener discov.Listener) { - rrs.subClient.AddListener(listener) -} - -func (rrs roundRobinStrategy) push(message string) error { - var batchError errorx.BatchError - - for i := 0; i < retryTimes; i++ { - if err := rrs.pushOnce(message); err != nil { - batchError.Add(err) - } else { - return nil - } - } - - return batchError.Err() -} - -func (rrs roundRobinStrategy) pushOnce(message string) error { - if server, ok := rrs.subClient.Next(); ok { - pusher, ok := server.(queue.QueuePusher) - if ok { - return pusher.Push(message) - } else { - return ErrPusherTypeError - } - } else { - rrs.subClient.WaitForServers() - return rrs.pushOnce(message) - } -} - -func getName(key string) string { - return fmt.Sprintf("etcd:%s", key) -} - -func newPusher(server string) (queue.QueuePusher, error) { - if rds, key, option, err := newRedisWithKey(server); err != nil { - return nil, err - } else if option == internal.TimedQueueType { - return internal.NewPusher(rds, key, internal.WithTime()), nil - } else { - return internal.NewPusher(rds, key), nil - } -} - -func newRedisWithKey(server string) (rds *redis.Redis, key, option string, err error) { - fields := strings.Split(server, internal.Delimeter) - if len(fields) < etcdRedisFields { - err = fmt.Errorf("wrong redis queue: %s, should be ip:port/type/password/key/[option]", server) - return - } - - addr := fields[0] - tp := fields[1] - pass := fields[2] - key = fields[3] - - if len(fields) > etcdRedisFields { - option = fields[4] - } - - rds = redis.NewRedis(addr, tp, pass) - return -} - -func (sl *serverListener) OnUpdate(keys []string, servers []string, newKey string) { - sl.updater.Update(keys, servers, newKey) -} - -func (sl *serverListener) OnReload() { - sl.updater.Update(nil, nil, "") -} - -func (sl *serverListener) update(change update.ServerChange) { - content, err := change.Marshal() - if err != nil { - logx.Error(err) - } - - if err = broadcast(change.Servers, content); err != nil { - logx.Error(err) - } -} - -func broadcast(servers []string, message string) error { - var be errorx.BatchError - - for _, server := range servers { - q, err := UnmarshalPusher(server) - if err != nil { - be.Add(err) - } else { - q.Push(message) - } - } - - return be.Err() -} - -func unwrapTimedMessage(message string) (string, error) { - var tm internal.TimedMessage - if err := jsonx.UnmarshalFromString(message, &tm); err != nil { - return "", err - } - - return tm.Payload, nil -} diff --git a/rq/queue.go b/rq/queue.go deleted file mode 100644 index d72a5875..00000000 --- a/rq/queue.go +++ /dev/null @@ -1,338 +0,0 @@ -package rq - -import ( - "errors" - "fmt" - "log" - "strings" - "sync" - "time" - - "zero/core/discov" - "zero/core/logx" - "zero/core/queue" - "zero/core/service" - "zero/core/stores/redis" - "zero/core/stringx" - "zero/core/threading" - "zero/rq/internal" - "zero/rq/internal/update" -) - -const keyLen = 6 - -var ( - ErrTimeout = errors.New("timeout error") - - eventHandlerPlaceholder = dummyEventHandler(0) -) - -type ( - ConsumeHandle func(string) error - - ConsumeHandler interface { - Consume(string) error - } - - EventHandler interface { - OnEvent(event interface{}) - } - - QueueOption func(queue *MessageQueue) - - queueOptions struct { - renewId int64 - } - - MessageQueue struct { - c RmqConf - redisQueue *queue.Queue - consumerFactory queue.ConsumerFactory - options queueOptions - eventLock sync.Mutex - lastEvent string - } -) - -func MustNewMessageQueue(c RmqConf, factory queue.ConsumerFactory, opts ...QueueOption) queue.MessageQueue { - q, err := NewMessageQueue(c, factory, opts...) - if err != nil { - log.Fatal(err) - } - - return q -} - -func NewMessageQueue(c RmqConf, factory queue.ConsumerFactory, opts ...QueueOption) (queue.MessageQueue, error) { - if err := c.SetUp(); err != nil { - return nil, err - } - - q := &MessageQueue{ - c: c, - } - - if len(q.c.Redis.Key) == 0 { - if len(q.c.Name) == 0 { - q.c.Redis.Key = stringx.Randn(keyLen) - } else { - q.c.Redis.Key = fmt.Sprintf("%s-%s", q.c.Name, stringx.Randn(keyLen)) - } - } - if q.c.Timeout > 0 { - factory = wrapWithTimeout(factory, time.Duration(q.c.Timeout)*time.Millisecond) - } - factory = wrapWithServerSensitive(q, factory) - q.consumerFactory = factory - q.redisQueue = q.buildQueue() - - for _, opt := range opts { - opt(q) - } - - return q, nil -} - -func (q *MessageQueue) Start() { - serviceGroup := service.NewServiceGroup() - serviceGroup.Add(q.redisQueue) - q.maybeAppendRenewer(serviceGroup, q.redisQueue) - serviceGroup.Start() -} - -func (q *MessageQueue) Stop() { - logx.Close() -} - -func (q *MessageQueue) buildQueue() *queue.Queue { - inboundStore := redis.NewRedis(q.c.Redis.Host, q.c.Redis.Type, q.c.Redis.Pass) - producerFactory := internal.NewProducerFactory(inboundStore, q.c.Redis.Key, - internal.TimeSensitive(q.c.DropBefore)) - mq := queue.NewQueue(producerFactory, q.consumerFactory) - - if len(q.c.Name) > 0 { - mq.SetName(q.c.Name) - } - if q.c.NumConsumers > 0 { - mq.SetNumConsumer(q.c.NumConsumers) - } - if q.c.NumProducers > 0 { - mq.SetNumProducer(q.c.NumProducers) - } - - return mq -} - -func (q *MessageQueue) compareAndSetEvent(event string) bool { - q.eventLock.Lock() - defer q.eventLock.Unlock() - - if q.lastEvent == event { - return false - } - - q.lastEvent = event - return true -} - -func (q *MessageQueue) maybeAppendRenewer(group *service.ServiceGroup, mq *queue.Queue) { - if len(q.c.Etcd.Hosts) > 0 || len(q.c.Etcd.Key) > 0 { - etcdValue := MarshalQueue(q.c.Redis) - if q.c.DropBefore > 0 { - etcdValue = strings.Join([]string{etcdValue, internal.TimedQueueType}, internal.Delimeter) - } - keepAliver := discov.NewRenewer(q.c.Etcd.Hosts, q.c.Etcd.Key, etcdValue, q.options.renewId) - mq.AddListener(pauseResumeHandler{ - Renewer: keepAliver, - }) - group.Add(keepAliver) - } -} - -func MarshalQueue(rds redis.RedisKeyConf) string { - return strings.Join([]string{ - rds.Host, - rds.Type, - rds.Pass, - rds.Key, - }, internal.Delimeter) -} - -func WithHandle(handle ConsumeHandle) queue.ConsumerFactory { - return WithHandler(innerConsumerHandler{handle}) -} - -func WithHandler(handler ConsumeHandler, eventHandlers ...EventHandler) queue.ConsumerFactory { - return func() (queue.Consumer, error) { - if len(eventHandlers) < 1 { - return eventConsumer{ - consumeHandler: handler, - eventHandler: eventHandlerPlaceholder, - }, nil - } else { - return eventConsumer{ - consumeHandler: handler, - eventHandler: eventHandlers[0], - }, nil - } - } -} - -func WithHandlerFactory(factory func() (ConsumeHandler, error)) queue.ConsumerFactory { - return func() (queue.Consumer, error) { - if handler, err := factory(); err != nil { - return nil, err - } else { - return eventlessHandler{handler}, nil - } - } -} - -func WithRenewId(id int64) QueueOption { - return func(mq *MessageQueue) { - mq.options.renewId = id - } -} - -func wrapWithServerSensitive(mq *MessageQueue, factory queue.ConsumerFactory) queue.ConsumerFactory { - return func() (queue.Consumer, error) { - consumer, err := factory() - if err != nil { - return nil, err - } - - return &serverSensitiveConsumer{ - mq: mq, - consumer: consumer, - }, nil - } -} - -func wrapWithTimeout(factory queue.ConsumerFactory, dt time.Duration) queue.ConsumerFactory { - return func() (queue.Consumer, error) { - consumer, err := factory() - if err != nil { - return nil, err - } - - return &timeoutConsumer{ - consumer: consumer, - dt: dt, - timer: time.NewTimer(dt), - }, nil - } -} - -type innerConsumerHandler struct { - handle ConsumeHandle -} - -func (h innerConsumerHandler) Consume(v string) error { - return h.handle(v) -} - -type serverSensitiveConsumer struct { - mq *MessageQueue - consumer queue.Consumer -} - -func (c *serverSensitiveConsumer) Consume(msg string) error { - if update.IsServerChange(msg) { - change, err := update.UnmarshalServerChange(msg) - if err != nil { - return err - } - - code := change.GetCode() - if !c.mq.compareAndSetEvent(code) { - return nil - } - - oldHash := change.CreatePrevHash() - newHash := change.CreateCurrentHash() - hashChange := internal.NewHashChange(oldHash, newHash) - c.mq.redisQueue.Broadcast(hashChange) - - return nil - } - - return c.consumer.Consume(msg) -} - -func (c *serverSensitiveConsumer) OnEvent(event interface{}) { - c.consumer.OnEvent(event) -} - -type timeoutConsumer struct { - consumer queue.Consumer - dt time.Duration - timer *time.Timer -} - -func (c *timeoutConsumer) Consume(msg string) error { - done := make(chan error) - threading.GoSafe(func() { - if err := c.consumer.Consume(msg); err != nil { - done <- err - } - close(done) - }) - - c.timer.Reset(c.dt) - select { - case err, ok := <-done: - c.timer.Stop() - if ok { - return err - } else { - return nil - } - case <-c.timer.C: - return ErrTimeout - } -} - -func (c *timeoutConsumer) OnEvent(event interface{}) { - c.consumer.OnEvent(event) -} - -type pauseResumeHandler struct { - discov.Renewer -} - -func (pr pauseResumeHandler) OnPause() { - pr.Pause() -} - -func (pr pauseResumeHandler) OnResume() { - pr.Resume() -} - -type eventConsumer struct { - consumeHandler ConsumeHandler - eventHandler EventHandler -} - -func (ec eventConsumer) Consume(msg string) error { - return ec.consumeHandler.Consume(msg) -} - -func (ec eventConsumer) OnEvent(event interface{}) { - ec.eventHandler.OnEvent(event) -} - -type eventlessHandler struct { - handler ConsumeHandler -} - -func (h eventlessHandler) Consume(msg string) error { - return h.handler.Consume(msg) -} - -func (h eventlessHandler) OnEvent(event interface{}) { -} - -type dummyEventHandler int - -func (eh dummyEventHandler) OnEvent(event interface{}) { -} diff --git a/rq/queue_test.go b/rq/queue_test.go deleted file mode 100644 index 8722048b..00000000 --- a/rq/queue_test.go +++ /dev/null @@ -1,62 +0,0 @@ -package rq - -import ( - "strconv" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestQueueWithTimeout(t *testing.T) { - consumer, err := wrapWithTimeout(WithHandle(func(string) error { - time.Sleep(time.Minute) - return nil - }), 100)() - if err != nil { - t.Fatal(err) - } - - assert.Equal(t, ErrTimeout, consumer.Consume("any")) -} - -func TestQueueWithoutTimeout(t *testing.T) { - consumer, err := wrapWithTimeout(WithHandle(func(string) error { - return nil - }), 3600000)() - if err != nil { - t.Fatal(err) - } - - assert.Nil(t, consumer.Consume("any")) -} - -func BenchmarkQueue(b *testing.B) { - b.ReportAllocs() - - consumer, err := WithHandle(func(string) error { - return nil - })() - if err != nil { - b.Fatal(err) - } - - for i := 0; i < b.N; i++ { - consumer.Consume(strconv.Itoa(i)) - } -} - -func BenchmarkQueueWithTimeout(b *testing.B) { - b.ReportAllocs() - - consumer, err := wrapWithTimeout(WithHandle(func(string) error { - return nil - }), 1000)() - if err != nil { - b.Fatal(err) - } - - for i := 0; i < b.N; i++ { - consumer.Consume(strconv.Itoa(i)) - } -}