diff --git a/core/load/adaptiveshedder_test.go b/core/load/adaptiveshedder_test.go index 48e1343f..7be62b7e 100644 --- a/core/load/adaptiveshedder_test.go +++ b/core/load/adaptiveshedder_test.go @@ -135,6 +135,7 @@ func TestAdaptiveShedderShouldDrop(t *testing.T) { passCounter: passCounter, rtCounter: rtCounter, windows: buckets, + dropTime: syncx.NewAtomicDuration(), droppedRecently: syncx.NewAtomicBool(), } // cpu >= 800, inflight < maxPass @@ -160,6 +161,40 @@ func TestAdaptiveShedderShouldDrop(t *testing.T) { } shedder.avgFlying = 80 assert.False(t, shedder.shouldDrop()) + + // cpu >= 800, inflight < maxPass + systemOverloadChecker = func(int64) bool { + return true + } + shedder.avgFlying = 80 + shedder.flying = 80 + _, err := shedder.Allow() + assert.NotNil(t, err) +} + +func TestAdaptiveShedderStillHot(t *testing.T) { + logx.Disable() + passCounter := newRollingWindow() + rtCounter := newRollingWindow() + for i := 0; i < 10; i++ { + if i > 0 { + time.Sleep(bucketDuration) + } + passCounter.Add(float64((i + 1) * 100)) + for j := i*10 + 1; j <= i*10+10; j++ { + rtCounter.Add(float64(j)) + } + } + shedder := &adaptiveShedder{ + passCounter: passCounter, + rtCounter: rtCounter, + windows: buckets, + dropTime: syncx.NewAtomicDuration(), + droppedRecently: syncx.ForAtomicBool(true), + } + assert.False(t, shedder.stillHot()) + shedder.dropTime.Set(-coolOffDuration * 2) + assert.False(t, shedder.stillHot()) } func BenchmarkAdaptiveShedder_Allow(b *testing.B) { diff --git a/core/load/sheddergroup_test.go b/core/load/sheddergroup_test.go index f58f1588..1e3b82e9 100644 --- a/core/load/sheddergroup_test.go +++ b/core/load/sheddergroup_test.go @@ -13,3 +13,8 @@ func TestGroup(t *testing.T) { assert.NotNil(t, limiter) }) } + +func TestShedderClose(t *testing.T) { + var nop nopCloser + assert.Nil(t, nop.Close()) +} diff --git a/core/syncx/limit.go b/core/syncx/limit.go index 04fa3aaa..d36c3fc9 100644 --- a/core/syncx/limit.go +++ b/core/syncx/limit.go @@ -6,18 +6,22 @@ import ( "github.com/tal-tech/go-zero/core/lang" ) -var ErrReturn = errors.New("discarding limited token, resource pool is full, someone returned multiple times") +// ErrLimitReturn indicates that the more than borrowed elements were returned. +var ErrLimitReturn = errors.New("discarding limited token, resource pool is full, someone returned multiple times") +// Limit controls the concurrent requests. type Limit struct { pool chan lang.PlaceholderType } +// NewLimit creates a Limit that can borrow n elements from it concurrently. func NewLimit(n int) Limit { return Limit{ pool: make(chan lang.PlaceholderType, n), } } +// Borrow borrows an element from Limit in blocking mode. func (l Limit) Borrow() { l.pool <- lang.Placeholder } @@ -28,10 +32,12 @@ func (l Limit) Return() error { case <-l.pool: return nil default: - return ErrReturn + return ErrLimitReturn } } +// TryBorrow tries to borrow an element from Limit, in non-blocking mode. +// If success, true returned, false for otherwise. func (l Limit) TryBorrow() bool { select { case l.pool <- lang.Placeholder: diff --git a/core/syncx/limit_test.go b/core/syncx/limit_test.go index 338e23c5..1465dbb0 100644 --- a/core/syncx/limit_test.go +++ b/core/syncx/limit_test.go @@ -13,5 +13,5 @@ func TestLimit(t *testing.T) { assert.False(t, limit.TryBorrow()) assert.Nil(t, limit.Return()) assert.Nil(t, limit.Return()) - assert.Equal(t, ErrReturn, limit.Return()) + assert.Equal(t, ErrLimitReturn, limit.Return()) } diff --git a/core/syncx/timeoutlimit_test.go b/core/syncx/timeoutlimit_test.go index a1e54704..5f6e3fe2 100644 --- a/core/syncx/timeoutlimit_test.go +++ b/core/syncx/timeoutlimit_test.go @@ -29,5 +29,5 @@ func TestTimeoutLimit(t *testing.T) { assert.Equal(t, ErrTimeout, limit.Borrow(time.Millisecond*100)) assert.Nil(t, limit.Return()) assert.Nil(t, limit.Return()) - assert.Equal(t, ErrReturn, limit.Return()) + assert.Equal(t, ErrLimitReturn, limit.Return()) } diff --git a/rest/handler/maxconnshandler.go b/rest/handler/maxconnshandler.go index d4a51233..6fffc880 100644 --- a/rest/handler/maxconnshandler.go +++ b/rest/handler/maxconnshandler.go @@ -28,7 +28,7 @@ func MaxConns(n int) func(http.Handler) http.Handler { next.ServeHTTP(w, r) } else { - internal.Errorf(r, "Concurrent connections over %d, rejected with code %d", + internal.Errorf(r, "concurrent connections over %d, rejected with code %d", n, http.StatusServiceUnavailable) w.WriteHeader(http.StatusServiceUnavailable) }