You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
go-zero/kq/queue.go

230 lines
4.8 KiB
Go

package kq
import (
"context"
"io"
"log"
"time"
"zero/core/logx"
"zero/core/queue"
"zero/core/service"
"zero/core/stat"
"zero/core/threading"
"zero/core/timex"
"github.com/segmentio/kafka-go"
_ "github.com/segmentio/kafka-go/gzip"
_ "github.com/segmentio/kafka-go/lz4"
_ "github.com/segmentio/kafka-go/snappy"
)
const (
defaultCommitInterval = time.Second
defaultMaxWait = time.Second
)
type (
ConsumeHandle func(key, value string) error
ConsumeHandler interface {
Consume(key, value string) error
}
queueOptions struct {
commitInterval time.Duration
maxWait time.Duration
metrics *stat.Metrics
}
QueueOption func(*queueOptions)
kafkaQueue struct {
c KqConf
consumer *kafka.Reader
handler ConsumeHandler
channel chan kafka.Message
producerRoutines *threading.RoutineGroup
consumerRoutines *threading.RoutineGroup
metrics *stat.Metrics
}
kafkaQueues struct {
queues []queue.MessageQueue
group *service.ServiceGroup
}
)
func MustNewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue {
q, err := NewQueue(c, handler, opts...)
if err != nil {
log.Fatal(err)
}
return q
}
func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error) {
if err := c.SetUp(); err != nil {
return nil, err
}
var options queueOptions
for _, opt := range opts {
opt(&options)
}
ensureQueueOptions(c, &options)
if c.NumConns < 1 {
c.NumConns = 1
}
q := kafkaQueues{
group: service.NewServiceGroup(),
}
for i := 0; i < c.NumConns; i++ {
q.queues = append(q.queues, newKafkaQueue(c, handler, options))
}
return q, nil
}
func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue.MessageQueue {
var offset int64
if c.Offset == firstOffset {
offset = kafka.FirstOffset
} else {
offset = kafka.LastOffset
}
consumer := kafka.NewReader(kafka.ReaderConfig{
Brokers: c.Brokers,
GroupID: c.Group,
Topic: c.Topic,
StartOffset: offset,
MinBytes: c.MinBytes, // 10KB
MaxBytes: c.MaxBytes, // 10MB
MaxWait: options.maxWait,
CommitInterval: options.commitInterval,
})
return &kafkaQueue{
c: c,
consumer: consumer,
handler: handler,
channel: make(chan kafka.Message),
producerRoutines: threading.NewRoutineGroup(),
consumerRoutines: threading.NewRoutineGroup(),
metrics: options.metrics,
}
}
func (q *kafkaQueue) Start() {
q.startConsumers()
q.startProducers()
q.producerRoutines.Wait()
close(q.channel)
q.consumerRoutines.Wait()
}
func (q *kafkaQueue) Stop() {
q.consumer.Close()
logx.Close()
}
func (q *kafkaQueue) consumeOne(key, val string) error {
startTime := timex.Now()
err := q.handler.Consume(key, val)
q.metrics.Add(stat.Task{
Duration: timex.Since(startTime),
})
return err
}
func (q *kafkaQueue) startConsumers() {
for i := 0; i < q.c.NumConsumers; i++ {
q.consumerRoutines.Run(func() {
for msg := range q.channel {
if err := q.consumeOne(string(msg.Key), string(msg.Value)); err != nil {
logx.Errorf("Error on consuming: %s, error: %v", string(msg.Value), err)
}
}
})
}
}
func (q *kafkaQueue) startProducers() {
for i := 0; i < q.c.NumProducers; i++ {
q.producerRoutines.Run(func() {
for {
msg, err := q.consumer.ReadMessage(context.Background())
// io.EOF means consumer closed
// io.ErrClosedPipe means committing messages on the consumer,
// kafka will refire the messages on uncommitted messages, ignore
if err == io.EOF || err == io.ErrClosedPipe {
return
}
if err != nil {
logx.Errorf("Error on reading mesage, %q", err.Error())
continue
}
q.channel <- msg
}
})
}
}
func (q kafkaQueues) Start() {
for _, each := range q.queues {
q.group.Add(each)
}
q.group.Start()
}
func (q kafkaQueues) Stop() {
q.group.Stop()
}
func WithCommitInterval(interval time.Duration) QueueOption {
return func(options *queueOptions) {
options.commitInterval = interval
}
}
func WithHandle(handle ConsumeHandle) ConsumeHandler {
return innerConsumeHandler{
handle: handle,
}
}
func WithMaxWait(wait time.Duration) QueueOption {
return func(options *queueOptions) {
options.maxWait = wait
}
}
func WithMetrics(metrics *stat.Metrics) QueueOption {
return func(options *queueOptions) {
options.metrics = metrics
}
}
type innerConsumeHandler struct {
handle ConsumeHandle
}
func (ch innerConsumeHandler) Consume(k, v string) error {
return ch.handle(k, v)
}
func ensureQueueOptions(c KqConf, options *queueOptions) {
if options.commitInterval == 0 {
options.commitInterval = defaultCommitInterval
}
if options.maxWait == 0 {
options.maxWait = defaultMaxWait
}
if options.metrics == nil {
options.metrics = stat.NewMetrics(c.Name)
}
}