package kq import ( "context" "io" "log" "time" "zero/core/logx" "zero/core/queue" "zero/core/service" "zero/core/stat" "zero/core/threading" "zero/core/timex" "github.com/segmentio/kafka-go" _ "github.com/segmentio/kafka-go/gzip" _ "github.com/segmentio/kafka-go/lz4" _ "github.com/segmentio/kafka-go/snappy" ) const ( defaultCommitInterval = time.Second defaultMaxWait = time.Second ) type ( ConsumeHandle func(key, value string) error ConsumeHandler interface { Consume(key, value string) error } queueOptions struct { commitInterval time.Duration maxWait time.Duration metrics *stat.Metrics } QueueOption func(*queueOptions) kafkaQueue struct { c KqConf consumer *kafka.Reader handler ConsumeHandler channel chan kafka.Message producerRoutines *threading.RoutineGroup consumerRoutines *threading.RoutineGroup metrics *stat.Metrics } kafkaQueues struct { queues []queue.MessageQueue group *service.ServiceGroup } ) func MustNewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue { q, err := NewQueue(c, handler, opts...) if err != nil { log.Fatal(err) } return q } func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error) { if err := c.SetUp(); err != nil { return nil, err } var options queueOptions for _, opt := range opts { opt(&options) } ensureQueueOptions(c, &options) if c.NumConns < 1 { c.NumConns = 1 } q := kafkaQueues{ group: service.NewServiceGroup(), } for i := 0; i < c.NumConns; i++ { q.queues = append(q.queues, newKafkaQueue(c, handler, options)) } return q, nil } func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue.MessageQueue { var offset int64 if c.Offset == firstOffset { offset = kafka.FirstOffset } else { offset = kafka.LastOffset } consumer := kafka.NewReader(kafka.ReaderConfig{ Brokers: c.Brokers, GroupID: c.Group, Topic: c.Topic, StartOffset: offset, MinBytes: c.MinBytes, // 10KB MaxBytes: c.MaxBytes, // 10MB MaxWait: options.maxWait, CommitInterval: options.commitInterval, }) return &kafkaQueue{ c: c, consumer: consumer, handler: handler, channel: make(chan kafka.Message), producerRoutines: threading.NewRoutineGroup(), consumerRoutines: threading.NewRoutineGroup(), metrics: options.metrics, } } func (q *kafkaQueue) Start() { q.startConsumers() q.startProducers() q.producerRoutines.Wait() close(q.channel) q.consumerRoutines.Wait() } func (q *kafkaQueue) Stop() { q.consumer.Close() logx.Close() } func (q *kafkaQueue) consumeOne(key, val string) error { startTime := timex.Now() err := q.handler.Consume(key, val) q.metrics.Add(stat.Task{ Duration: timex.Since(startTime), }) return err } func (q *kafkaQueue) startConsumers() { for i := 0; i < q.c.NumConsumers; i++ { q.consumerRoutines.Run(func() { for msg := range q.channel { if err := q.consumeOne(string(msg.Key), string(msg.Value)); err != nil { logx.Errorf("Error on consuming: %s, error: %v", string(msg.Value), err) } } }) } } func (q *kafkaQueue) startProducers() { for i := 0; i < q.c.NumProducers; i++ { q.producerRoutines.Run(func() { for { msg, err := q.consumer.ReadMessage(context.Background()) // io.EOF means consumer closed // io.ErrClosedPipe means committing messages on the consumer, // kafka will refire the messages on uncommitted messages, ignore if err == io.EOF || err == io.ErrClosedPipe { return } if err != nil { logx.Errorf("Error on reading mesage, %q", err.Error()) continue } q.channel <- msg } }) } } func (q kafkaQueues) Start() { for _, each := range q.queues { q.group.Add(each) } q.group.Start() } func (q kafkaQueues) Stop() { q.group.Stop() } func WithCommitInterval(interval time.Duration) QueueOption { return func(options *queueOptions) { options.commitInterval = interval } } func WithHandle(handle ConsumeHandle) ConsumeHandler { return innerConsumeHandler{ handle: handle, } } func WithMaxWait(wait time.Duration) QueueOption { return func(options *queueOptions) { options.maxWait = wait } } func WithMetrics(metrics *stat.Metrics) QueueOption { return func(options *queueOptions) { options.metrics = metrics } } type innerConsumeHandler struct { handle ConsumeHandle } func (ch innerConsumeHandler) Consume(k, v string) error { return ch.handle(k, v) } func ensureQueueOptions(c KqConf, options *queueOptions) { if options.commitInterval == 0 { options.commitInterval = defaultCommitInterval } if options.maxWait == 0 { options.maxWait = defaultMaxWait } if options.metrics == nil { options.metrics = stat.NewMetrics(c.Name) } }