You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
102 lines
2.1 KiB
Go
102 lines
2.1 KiB
Go
4 years ago
|
package kq
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"strconv"
|
||
|
"time"
|
||
|
|
||
|
"zero/core/executors"
|
||
|
"zero/core/logx"
|
||
|
|
||
|
"github.com/segmentio/kafka-go"
|
||
|
"github.com/segmentio/kafka-go/snappy"
|
||
|
)
|
||
|
|
||
|
type (
|
||
|
PushOption func(options *chunkOptions)
|
||
|
|
||
|
Pusher struct {
|
||
|
produer *kafka.Writer
|
||
|
topic string
|
||
|
executor *executors.ChunkExecutor
|
||
|
}
|
||
|
|
||
|
chunkOptions struct {
|
||
|
chunkSize int
|
||
|
flushInterval time.Duration
|
||
|
}
|
||
|
)
|
||
|
|
||
|
func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
|
||
|
producer := kafka.NewWriter(kafka.WriterConfig{
|
||
|
Brokers: addrs,
|
||
|
Topic: topic,
|
||
|
Balancer: &kafka.LeastBytes{},
|
||
|
CompressionCodec: snappy.NewCompressionCodec(),
|
||
|
})
|
||
|
|
||
|
pusher := &Pusher{
|
||
|
produer: producer,
|
||
|
topic: topic,
|
||
|
}
|
||
|
pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {
|
||
|
chunk := make([]kafka.Message, len(tasks))
|
||
|
for i := range tasks {
|
||
|
chunk[i] = tasks[i].(kafka.Message)
|
||
|
}
|
||
|
if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil {
|
||
|
logx.Error(err)
|
||
|
}
|
||
|
}, newOptions(opts)...)
|
||
|
|
||
|
return pusher
|
||
|
}
|
||
|
|
||
|
func (p *Pusher) Close() error {
|
||
|
return p.produer.Close()
|
||
|
}
|
||
|
|
||
|
func (p *Pusher) Name() string {
|
||
|
return p.topic
|
||
|
}
|
||
|
|
||
|
func (p *Pusher) Push(v string) error {
|
||
|
msg := kafka.Message{
|
||
|
Key: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)),
|
||
|
Value: []byte(v),
|
||
|
}
|
||
|
if p.executor != nil {
|
||
|
return p.executor.Add(msg, len(v))
|
||
|
} else {
|
||
|
return p.produer.WriteMessages(context.Background(), msg)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func WithChunkSize(chunkSize int) PushOption {
|
||
|
return func(options *chunkOptions) {
|
||
|
options.chunkSize = chunkSize
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func WithFlushInterval(interval time.Duration) PushOption {
|
||
|
return func(options *chunkOptions) {
|
||
|
options.flushInterval = interval
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func newOptions(opts []PushOption) []executors.ChunkOption {
|
||
|
var options chunkOptions
|
||
|
for _, opt := range opts {
|
||
|
opt(&options)
|
||
|
}
|
||
|
|
||
|
var chunkOpts []executors.ChunkOption
|
||
|
if options.chunkSize > 0 {
|
||
|
chunkOpts = append(chunkOpts, executors.WithChunkBytes(options.chunkSize))
|
||
|
}
|
||
|
if options.flushInterval > 0 {
|
||
|
chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval))
|
||
|
}
|
||
|
return chunkOpts
|
||
|
}
|