configurable for load and stat statistics logs (#980)

master
Kevin Wan 3 years ago committed by GitHub
parent 4bee60eb7f
commit db95b3f0e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -31,6 +31,8 @@ var (
// default to be enabled // default to be enabled
enabled = syncx.ForAtomicBool(true) enabled = syncx.ForAtomicBool(true)
// default to be enabled
logEnabled = syncx.ForAtomicBool(true)
// make it a variable for unit test // make it a variable for unit test
systemOverloadChecker = func(cpuThreshold int64) bool { systemOverloadChecker = func(cpuThreshold int64) bool {
return stat.CpuUsage() >= cpuThreshold return stat.CpuUsage() >= cpuThreshold
@ -80,6 +82,11 @@ func Disable() {
enabled.Set(false) enabled.Set(false)
} }
// DisableLog disables the stat logs for load shedding.
func DisableLog() {
logEnabled.Set(false)
}
// NewAdaptiveShedder returns an adaptive shedder. // NewAdaptiveShedder returns an adaptive shedder.
// opts can be used to customize the Shedder. // opts can be used to customize the Shedder.
func NewAdaptiveShedder(opts ...ShedderOption) Shedder { func NewAdaptiveShedder(opts ...ShedderOption) Shedder {

@ -25,6 +25,7 @@ func init() {
} }
func TestAdaptiveShedder(t *testing.T) { func TestAdaptiveShedder(t *testing.T) {
DisableLog()
shedder := NewAdaptiveShedder(WithWindow(bucketDuration), WithBuckets(buckets), WithCpuThreshold(100)) shedder := NewAdaptiveShedder(WithWindow(bucketDuration), WithBuckets(buckets), WithCpuThreshold(100))
var wg sync.WaitGroup var wg sync.WaitGroup
var drop int64 var drop int64

@ -48,20 +48,15 @@ func (s *SheddingStat) IncrementDrop() {
atomic.AddInt64(&s.drop, 1) atomic.AddInt64(&s.drop, 1)
} }
func (s *SheddingStat) reset() snapshot { func (s *SheddingStat) loop(c <-chan time.Time) {
return snapshot{ for range c {
Total: atomic.SwapInt64(&s.total, 0), st := s.reset()
Pass: atomic.SwapInt64(&s.pass, 0),
Drop: atomic.SwapInt64(&s.drop, 0), if !logEnabled.True() {
continue
} }
}
func (s *SheddingStat) run() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for range ticker.C {
c := stat.CpuUsage() c := stat.CpuUsage()
st := s.reset()
if st.Drop == 0 { if st.Drop == 0 {
logx.Statf("(%s) shedding_stat [1m], cpu: %d, total: %d, pass: %d, drop: %d", logx.Statf("(%s) shedding_stat [1m], cpu: %d, total: %d, pass: %d, drop: %d",
s.name, c, st.Total, st.Pass, st.Drop) s.name, c, st.Total, st.Pass, st.Drop)
@ -71,3 +66,18 @@ func (s *SheddingStat) run() {
} }
} }
} }
func (s *SheddingStat) reset() snapshot {
return snapshot{
Total: atomic.SwapInt64(&s.total, 0),
Pass: atomic.SwapInt64(&s.pass, 0),
Drop: atomic.SwapInt64(&s.drop, 0),
}
}
func (s *SheddingStat) run() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
s.loop(ticker.C)
}

@ -2,6 +2,7 @@ package load
import ( import (
"testing" "testing"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -22,3 +23,32 @@ func TestSheddingStat(t *testing.T) {
assert.Equal(t, int64(5), result.Pass) assert.Equal(t, int64(5), result.Pass)
assert.Equal(t, int64(7), result.Drop) assert.Equal(t, int64(7), result.Drop)
} }
func TestLoopTrue(t *testing.T) {
ch := make(chan time.Time, 1)
ch <- time.Now()
close(ch)
st := new(SheddingStat)
logEnabled.Set(true)
st.loop(ch)
}
func TestLoopTrueAndDrop(t *testing.T) {
ch := make(chan time.Time, 1)
ch <- time.Now()
close(ch)
st := new(SheddingStat)
st.IncrementDrop()
logEnabled.Set(true)
st.loop(ch)
}
func TestLoopFalseAndDrop(t *testing.T) {
ch := make(chan time.Time, 1)
ch <- time.Now()
close(ch)
st := new(SheddingStat)
st.IncrementDrop()
logEnabled.Set(false)
st.loop(ch)
}

@ -38,9 +38,11 @@ func init() {
atomic.StoreInt64(&cpuUsage, usage) atomic.StoreInt64(&cpuUsage, usage)
}) })
case <-allTicker.C: case <-allTicker.C:
if logEnabled.True() {
printUsage() printUsage()
} }
} }
}
}() }()
} }

Loading…
Cancel
Save