You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
169 lines
3.5 KiB
Go
169 lines
3.5 KiB
Go
package queue
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
)
|
|
|
|
const (
|
|
consumers = 4
|
|
rounds = 100
|
|
)
|
|
|
|
func TestQueue(t *testing.T) {
|
|
producer := newMockedProducer(rounds)
|
|
consumer := newMockedConsumer()
|
|
consumer.wait.Add(consumers)
|
|
q := NewQueue(func() (Producer, error) {
|
|
return producer, nil
|
|
}, func() (Consumer, error) {
|
|
return consumer, nil
|
|
})
|
|
q.AddListener(new(mockedListener))
|
|
q.SetName("mockqueue")
|
|
q.SetNumConsumer(consumers)
|
|
q.SetNumProducer(1)
|
|
q.pause()
|
|
q.resume()
|
|
go func() {
|
|
producer.wait.Wait()
|
|
q.Stop()
|
|
}()
|
|
q.Start()
|
|
assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
|
|
}
|
|
|
|
func TestQueue_Broadcast(t *testing.T) {
|
|
producer := newMockedProducer(rounds)
|
|
consumer := newMockedConsumer()
|
|
consumer.wait.Add(consumers)
|
|
q := NewQueue(func() (Producer, error) {
|
|
return producer, nil
|
|
}, func() (Consumer, error) {
|
|
return consumer, nil
|
|
})
|
|
q.AddListener(new(mockedListener))
|
|
q.SetName("mockqueue")
|
|
q.SetNumConsumer(consumers)
|
|
q.SetNumProducer(1)
|
|
q.Broadcast("message")
|
|
go func() {
|
|
producer.wait.Wait()
|
|
q.Stop()
|
|
}()
|
|
q.Start()
|
|
consumer.wait.Wait()
|
|
assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
|
|
assert.Equal(t, int32(consumers), atomic.LoadInt32(&consumer.events))
|
|
}
|
|
|
|
func TestQueue_PauseResume(t *testing.T) {
|
|
producer := newMockedProducer(rounds)
|
|
consumer := newMockedConsumer()
|
|
consumer.wait.Add(consumers)
|
|
q := NewQueue(func() (Producer, error) {
|
|
return producer, nil
|
|
}, func() (Consumer, error) {
|
|
return consumer, nil
|
|
})
|
|
q.AddListener(new(mockedListener))
|
|
q.SetName("mockqueue")
|
|
q.SetNumConsumer(consumers)
|
|
q.SetNumProducer(1)
|
|
go func() {
|
|
producer.wait.Wait()
|
|
q.Stop()
|
|
}()
|
|
q.Start()
|
|
producer.listener.OnProducerPause()
|
|
assert.Equal(t, int32(0), atomic.LoadInt32(&q.active))
|
|
producer.listener.OnProducerResume()
|
|
assert.Equal(t, int32(1), atomic.LoadInt32(&q.active))
|
|
assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
|
|
}
|
|
|
|
func TestQueue_ConsumeError(t *testing.T) {
|
|
producer := newMockedProducer(rounds)
|
|
consumer := newMockedConsumer()
|
|
consumer.consumeErr = errors.New("consume error")
|
|
consumer.wait.Add(consumers)
|
|
q := NewQueue(func() (Producer, error) {
|
|
return producer, nil
|
|
}, func() (Consumer, error) {
|
|
return consumer, nil
|
|
})
|
|
q.AddListener(new(mockedListener))
|
|
q.SetName("mockqueue")
|
|
q.SetNumConsumer(consumers)
|
|
q.SetNumProducer(1)
|
|
go func() {
|
|
producer.wait.Wait()
|
|
q.Stop()
|
|
}()
|
|
q.Start()
|
|
assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
|
|
}
|
|
|
|
type mockedConsumer struct {
|
|
count int32
|
|
events int32
|
|
consumeErr error
|
|
wait sync.WaitGroup
|
|
}
|
|
|
|
func newMockedConsumer() *mockedConsumer {
|
|
return new(mockedConsumer)
|
|
}
|
|
|
|
func (c *mockedConsumer) Consume(string) error {
|
|
atomic.AddInt32(&c.count, 1)
|
|
return c.consumeErr
|
|
}
|
|
|
|
func (c *mockedConsumer) OnEvent(any) {
|
|
if atomic.AddInt32(&c.events, 1) <= consumers {
|
|
c.wait.Done()
|
|
}
|
|
}
|
|
|
|
type mockedProducer struct {
|
|
total int32
|
|
count int32
|
|
listener ProduceListener
|
|
wait sync.WaitGroup
|
|
}
|
|
|
|
func newMockedProducer(total int32) *mockedProducer {
|
|
p := new(mockedProducer)
|
|
p.total = total
|
|
p.wait.Add(int(total))
|
|
return p
|
|
}
|
|
|
|
func (p *mockedProducer) AddListener(listener ProduceListener) {
|
|
p.listener = listener
|
|
}
|
|
|
|
func (p *mockedProducer) Produce() (string, bool) {
|
|
if atomic.AddInt32(&p.count, 1) <= p.total {
|
|
p.wait.Done()
|
|
return "item", true
|
|
}
|
|
|
|
time.Sleep(time.Second)
|
|
return "", false
|
|
}
|
|
|
|
type mockedListener struct{}
|
|
|
|
func (l *mockedListener) OnPause() {
|
|
}
|
|
|
|
func (l *mockedListener) OnResume() {
|
|
}
|