diff --git a/core/redisqueue/conf.go b/rq/internal/conf.go similarity index 95% rename from core/redisqueue/conf.go rename to rq/internal/conf.go index 4661260c..db294632 100644 --- a/core/redisqueue/conf.go +++ b/rq/internal/conf.go @@ -1,4 +1,4 @@ -package redisqueue +package internal import ( "zero/core/queue" diff --git a/rq/constant/const.go b/rq/internal/const.go similarity index 85% rename from rq/constant/const.go rename to rq/internal/const.go index ae4d22e9..c444ea2d 100644 --- a/rq/constant/const.go +++ b/rq/internal/const.go @@ -1,4 +1,4 @@ -package constant +package internal const ( Delimeter = "/" diff --git a/rq/hashchange.go b/rq/internal/hashchange.go similarity index 97% rename from rq/hashchange.go rename to rq/internal/hashchange.go index 2f68b5e2..5409705f 100644 --- a/rq/hashchange.go +++ b/rq/internal/hashchange.go @@ -1,4 +1,4 @@ -package rq +package internal import ( "math/rand" diff --git a/core/redisqueue/message.go b/rq/internal/message.go similarity index 83% rename from core/redisqueue/message.go rename to rq/internal/message.go index 75bbb9d0..292d96b7 100644 --- a/core/redisqueue/message.go +++ b/rq/internal/message.go @@ -1,4 +1,4 @@ -package redisqueue +package internal type TimedMessage struct { Time int64 `json:"time"` diff --git a/core/redisqueue/redisqueue_test.go b/rq/internal/redisqueue_test.go similarity index 98% rename from core/redisqueue/redisqueue_test.go rename to rq/internal/redisqueue_test.go index fbce6a62..ff9025d9 100644 --- a/core/redisqueue/redisqueue_test.go +++ b/rq/internal/redisqueue_test.go @@ -1,4 +1,4 @@ -package redisqueue +package internal import ( "strconv" diff --git a/core/redisqueue/redisqueueproducer.go b/rq/internal/redisqueueproducer.go similarity index 99% rename from core/redisqueue/redisqueueproducer.go rename to rq/internal/redisqueueproducer.go index 6496353f..4f7c993a 100644 --- a/core/redisqueue/redisqueueproducer.go +++ b/rq/internal/redisqueueproducer.go @@ -1,4 +1,4 @@ -package redisqueue +package internal import ( "fmt" diff --git a/core/redisqueue/redisqueuepusher.go b/rq/internal/redisqueuepusher.go similarity index 98% rename from core/redisqueue/redisqueuepusher.go rename to rq/internal/redisqueuepusher.go index 79008af9..2b218d60 100644 --- a/core/redisqueue/redisqueuepusher.go +++ b/rq/internal/redisqueuepusher.go @@ -1,4 +1,4 @@ -package redisqueue +package internal import ( "fmt" diff --git a/rq/update/incrementalupdater.go b/rq/internal/update/incrementalupdater.go similarity index 100% rename from rq/update/incrementalupdater.go rename to rq/internal/update/incrementalupdater.go diff --git a/rq/update/serverchange.go b/rq/internal/update/serverchange.go similarity index 93% rename from rq/update/serverchange.go rename to rq/internal/update/serverchange.go index 44ea40c7..04577833 100644 --- a/rq/update/serverchange.go +++ b/rq/internal/update/serverchange.go @@ -9,7 +9,7 @@ import ( "zero/core/hash" "zero/core/jsonx" - "zero/rq/constant" + "zero/rq/internal" ) var ErrInvalidServerChange = errors.New("not a server change message") @@ -82,7 +82,7 @@ func (sc ServerChange) GetCode() string { } func IsServerChange(message string) bool { - return len(message) > 0 && message[0] == constant.ServerSensitivePrefix + return len(message) > 0 && message[0] == internal.ServerSensitivePrefix } func (sc ServerChange) Marshal() (string, error) { @@ -91,7 +91,7 @@ func (sc ServerChange) Marshal() (string, error) { return "", err } - return string(append([]byte{constant.ServerSensitivePrefix}, body...)), nil + return string(append([]byte{internal.ServerSensitivePrefix}, body...)), nil } func UnmarshalServerChange(body string) (ServerChange, error) { diff --git a/rq/pusher.go b/rq/pusher.go index 14fe4d79..154dad62 100644 --- a/rq/pusher.go +++ b/rq/pusher.go @@ -14,11 +14,10 @@ import ( "zero/core/lang" "zero/core/logx" "zero/core/queue" - "zero/core/redisqueue" "zero/core/stores/redis" "zero/core/threading" - "zero/rq/constant" - "zero/rq/update" + "zero/rq/internal" + "zero/rq/internal/update" ) const ( @@ -155,7 +154,7 @@ func (pusher *Pusher) failover(server string) error { return } - if option == constant.TimedQueueType { + if option == internal.TimedQueueType { message, err = unwrapTimedMessage(message) if err != nil { logx.Errorf("invalid timedMessage: %s, error: %s", message, err.Error()) @@ -179,11 +178,11 @@ func UnmarshalPusher(server string) (queue.QueuePusher, error) { return nil, err } - if option == constant.TimedQueueType { - return redisqueue.NewPusher(store, key, redisqueue.WithTime()), nil + if option == internal.TimedQueueType { + return internal.NewPusher(store, key, internal.WithTime()), nil } - return redisqueue.NewPusher(store, key), nil + return internal.NewPusher(store, key), nil } func WithBatchConsistentStrategy(keysFn KeysFn, assembleFn AssembleFn, opts ...discov.BalanceOption) PusherOption { @@ -375,15 +374,15 @@ func getName(key string) string { func newPusher(server string) (queue.QueuePusher, error) { if rds, key, option, err := newRedisWithKey(server); err != nil { return nil, err - } else if option == constant.TimedQueueType { - return redisqueue.NewPusher(rds, key, redisqueue.WithTime()), nil + } else if option == internal.TimedQueueType { + return internal.NewPusher(rds, key, internal.WithTime()), nil } else { - return redisqueue.NewPusher(rds, key), nil + return internal.NewPusher(rds, key), nil } } func newRedisWithKey(server string) (rds *redis.Redis, key, option string, err error) { - fields := strings.Split(server, constant.Delimeter) + fields := strings.Split(server, internal.Delimeter) if len(fields) < etcdRedisFields { err = fmt.Errorf("wrong redis queue: %s, should be ip:port/type/password/key/[option]", server) return @@ -437,7 +436,7 @@ func broadcast(servers []string, message string) error { } func unwrapTimedMessage(message string) (string, error) { - var tm redisqueue.TimedMessage + var tm internal.TimedMessage if err := jsonx.UnmarshalFromString(message, &tm); err != nil { return "", err } diff --git a/rq/queue.go b/rq/queue.go index e6a093a0..d72a5875 100644 --- a/rq/queue.go +++ b/rq/queue.go @@ -11,13 +11,12 @@ import ( "zero/core/discov" "zero/core/logx" "zero/core/queue" - "zero/core/redisqueue" "zero/core/service" "zero/core/stores/redis" "zero/core/stringx" "zero/core/threading" - "zero/rq/constant" - "zero/rq/update" + "zero/rq/internal" + "zero/rq/internal/update" ) const keyLen = 6 @@ -107,8 +106,8 @@ func (q *MessageQueue) Stop() { func (q *MessageQueue) buildQueue() *queue.Queue { inboundStore := redis.NewRedis(q.c.Redis.Host, q.c.Redis.Type, q.c.Redis.Pass) - producerFactory := redisqueue.NewProducerFactory(inboundStore, q.c.Redis.Key, - redisqueue.TimeSensitive(q.c.DropBefore)) + producerFactory := internal.NewProducerFactory(inboundStore, q.c.Redis.Key, + internal.TimeSensitive(q.c.DropBefore)) mq := queue.NewQueue(producerFactory, q.consumerFactory) if len(q.c.Name) > 0 { @@ -140,7 +139,7 @@ func (q *MessageQueue) maybeAppendRenewer(group *service.ServiceGroup, mq *queue if len(q.c.Etcd.Hosts) > 0 || len(q.c.Etcd.Key) > 0 { etcdValue := MarshalQueue(q.c.Redis) if q.c.DropBefore > 0 { - etcdValue = strings.Join([]string{etcdValue, constant.TimedQueueType}, constant.Delimeter) + etcdValue = strings.Join([]string{etcdValue, internal.TimedQueueType}, internal.Delimeter) } keepAliver := discov.NewRenewer(q.c.Etcd.Hosts, q.c.Etcd.Key, etcdValue, q.options.renewId) mq.AddListener(pauseResumeHandler{ @@ -156,7 +155,7 @@ func MarshalQueue(rds redis.RedisKeyConf) string { rds.Type, rds.Pass, rds.Key, - }, constant.Delimeter) + }, internal.Delimeter) } func WithHandle(handle ConsumeHandle) queue.ConsumerFactory { @@ -251,7 +250,7 @@ func (c *serverSensitiveConsumer) Consume(msg string) error { oldHash := change.CreatePrevHash() newHash := change.CreateCurrentHash() - hashChange := NewHashChange(oldHash, newHash) + hashChange := internal.NewHashChange(oldHash, newHash) c.mq.redisQueue.Broadcast(hashChange) return nil