You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
go-zero/dq/producernode.go

99 lines
2.1 KiB
Go

4 years ago
package dq
import (
"errors"
"fmt"
"strconv"
"strings"
"time"
4 years ago
"github.com/beanstalkd/go-beanstalk"
4 years ago
)
var ErrTimeBeforeNow = errors.New("can't schedule task to past time")
type producerNode struct {
endpoint string
tube string
conn *connection
}
func NewProducerNode(endpoint, tube string) Producer {
return &producerNode{
endpoint: endpoint,
tube: tube,
conn: newConnection(endpoint, tube),
}
}
func (p *producerNode) At(body []byte, at time.Time) (string, error) {
now := time.Now()
if at.Before(now) {
return "", ErrTimeBeforeNow
}
duration := at.Sub(now)
return p.Delay(body, duration)
}
func (p *producerNode) Close() error {
return p.conn.Close()
}
func (p *producerNode) Delay(body []byte, delay time.Duration) (string, error) {
conn, err := p.conn.get()
if err != nil {
return "", err
}
id, err := conn.Put(body, PriNormal, delay, defaultTimeToRun)
if err == nil {
return fmt.Sprintf("%s/%s/%d", p.endpoint, p.tube, id), nil
}
// the error can only be beanstalk.NameError or beanstalk.ConnError
// just return when the error is beanstalk.NameError, don't reset
switch cerr := err.(type) {
case beanstalk.ConnError:
switch cerr.Err {
case beanstalk.ErrBadChar, beanstalk.ErrBadFormat, beanstalk.ErrBuried, beanstalk.ErrDeadline,
beanstalk.ErrDraining, beanstalk.ErrEmpty, beanstalk.ErrInternal, beanstalk.ErrJobTooBig,
beanstalk.ErrNoCRLF, beanstalk.ErrNotFound, beanstalk.ErrNotIgnored, beanstalk.ErrTooLong:
// won't reset
default:
// beanstalk.ErrOOM, beanstalk.ErrTimeout, beanstalk.ErrUnknown and other errors
p.conn.reset()
}
}
return "", err
}
func (p *producerNode) Revoke(jointId string) error {
ids := strings.Split(jointId, idSep)
for _, id := range ids {
fields := strings.Split(id, "/")
if len(fields) < 3 {
continue
}
if fields[0] != p.endpoint || fields[1] != p.tube {
continue
}
conn, err := p.conn.get()
if err != nil {
return err
}
n, err := strconv.ParseUint(fields[2], 10, 64)
if err != nil {
return err
}
return conn.Delete(n)
}
// if not in this beanstalk, ignore
return nil
}