|
|
@ -2,6 +2,7 @@ package queue
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
import (
|
|
|
|
"errors"
|
|
|
|
"errors"
|
|
|
|
|
|
|
|
"math"
|
|
|
|
"sync"
|
|
|
|
"sync"
|
|
|
|
"sync/atomic"
|
|
|
|
"sync/atomic"
|
|
|
|
"testing"
|
|
|
|
"testing"
|
|
|
@ -39,7 +40,7 @@ func TestQueue(t *testing.T) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func TestQueue_Broadcast(t *testing.T) {
|
|
|
|
func TestQueue_Broadcast(t *testing.T) {
|
|
|
|
producer := newMockedProducer(rounds)
|
|
|
|
producer := newMockedProducer(math.MaxInt32)
|
|
|
|
consumer := newMockedConsumer()
|
|
|
|
consumer := newMockedConsumer()
|
|
|
|
consumer.wait.Add(consumers)
|
|
|
|
consumer.wait.Add(consumers)
|
|
|
|
q := NewQueue(func() (Producer, error) {
|
|
|
|
q := NewQueue(func() (Producer, error) {
|
|
|
@ -51,14 +52,14 @@ func TestQueue_Broadcast(t *testing.T) {
|
|
|
|
q.SetName("mockqueue")
|
|
|
|
q.SetName("mockqueue")
|
|
|
|
q.SetNumConsumer(consumers)
|
|
|
|
q.SetNumConsumer(consumers)
|
|
|
|
q.SetNumProducer(1)
|
|
|
|
q.SetNumProducer(1)
|
|
|
|
q.Broadcast("message")
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
go func() {
|
|
|
|
producer.wait.Wait()
|
|
|
|
time.Sleep(time.Millisecond * 100)
|
|
|
|
q.Stop()
|
|
|
|
q.Stop()
|
|
|
|
}()
|
|
|
|
}()
|
|
|
|
q.Start()
|
|
|
|
go q.Start()
|
|
|
|
|
|
|
|
time.Sleep(time.Millisecond * 50)
|
|
|
|
|
|
|
|
q.Broadcast("message")
|
|
|
|
consumer.wait.Wait()
|
|
|
|
consumer.wait.Wait()
|
|
|
|
assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
|
|
|
|
|
|
|
|
assert.Equal(t, int32(consumers), atomic.LoadInt32(&consumer.events))
|
|
|
|
assert.Equal(t, int32(consumers), atomic.LoadInt32(&consumer.events))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|