diff --git a/core/service/servicegroup.go b/core/service/servicegroup.go index 0968a627..98d6ff2d 100644 --- a/core/service/servicegroup.go +++ b/core/service/servicegroup.go @@ -25,29 +25,29 @@ type ( Stopper } - // A Group is a group of services. - Group struct { + // A ServiceGroup is a group of services. + ServiceGroup struct { services []Service stopOnce func() } ) -// NewGroup returns a Group. -func NewGroup() *Group { - sg := new(Group) +// NewServiceGroup returns a ServiceGroup. +func NewServiceGroup() *ServiceGroup { + sg := new(ServiceGroup) sg.stopOnce = syncx.Once(sg.doStop) return sg } // Add adds service into sg. -func (sg *Group) Add(service Service) { +func (sg *ServiceGroup) Add(service Service) { sg.services = append(sg.services, service) } -// Start starts the Group. +// Start starts the ServiceGroup. // There should not be any logic code after calling this method, because this method is a blocking one. // Also, quitting this method will close the logx output. -func (sg *Group) Start() { +func (sg *ServiceGroup) Start() { proc.AddShutdownListener(func() { log.Println("Shutting down...") sg.stopOnce() @@ -56,12 +56,12 @@ func (sg *Group) Start() { sg.doStart() } -// Stop stops the Group. -func (sg *Group) Stop() { +// Stop stops the ServiceGroup. +func (sg *ServiceGroup) Stop() { sg.stopOnce() } -func (sg *Group) doStart() { +func (sg *ServiceGroup) doStart() { routineGroup := threading.NewRoutineGroup() for i := range sg.services { @@ -74,7 +74,7 @@ func (sg *Group) doStart() { routineGroup.Wait() } -func (sg *Group) doStop() { +func (sg *ServiceGroup) doStop() { for _, service := range sg.services { service.Stop() } diff --git a/core/service/servicegroup_test.go b/core/service/servicegroup_test.go index e431cad5..7f2165dd 100644 --- a/core/service/servicegroup_test.go +++ b/core/service/servicegroup_test.go @@ -41,7 +41,7 @@ func TestServiceGroup(t *testing.T) { multipliers := []int{2, 3, 5, 7} want := 1 - group := NewGroup() + group := NewServiceGroup() for _, multiplier := range multipliers { want *= multiplier service := newMockedService(multiplier) @@ -68,7 +68,7 @@ func TestServiceGroup_WithStart(t *testing.T) { var wait sync.WaitGroup var lock sync.Mutex wait.Add(len(multipliers)) - group := NewGroup() + group := NewServiceGroup() for _, multiplier := range multipliers { var mul = multiplier group.Add(WithStart(func() { @@ -95,7 +95,7 @@ func TestServiceGroup_WithStarter(t *testing.T) { var wait sync.WaitGroup var lock sync.Mutex wait.Add(len(multipliers)) - group := NewGroup() + group := NewServiceGroup() for _, multiplier := range multipliers { var mul = multiplier group.Add(WithStarter(mockedStarter{ diff --git a/core/stat/alert.go b/core/stat/alert.go index 61993954..afa43d42 100644 --- a/core/stat/alert.go +++ b/core/stat/alert.go @@ -37,6 +37,7 @@ func init() { } } +// Report reports given message. func Report(msg string) { lock.RLock() fn := reporter @@ -63,6 +64,7 @@ func Report(msg string) { } } +// SetReporter sets the given reporter. func SetReporter(fn func(string)) { lock.Lock() defer lock.Unlock() diff --git a/core/stat/alert_polyfill.go b/core/stat/alert_polyfill.go index cef947bc..d76e6c53 100644 --- a/core/stat/alert_polyfill.go +++ b/core/stat/alert_polyfill.go @@ -2,8 +2,10 @@ package stat +// Report reports given message. func Report(string) { } +// SetReporter sets the given reporter. func SetReporter(func(string)) { } diff --git a/core/stat/metrics.go b/core/stat/metrics.go index d8981e39..f3c8b430 100644 --- a/core/stat/metrics.go +++ b/core/stat/metrics.go @@ -7,20 +7,23 @@ import ( "github.com/tal-tech/go-zero/core/executors" "github.com/tal-tech/go-zero/core/logx" + "github.com/tal-tech/go-zero/core/syncx" ) var ( - LogInterval = time.Minute - + logInterval = time.Minute writerLock sync.Mutex reportWriter Writer = nil + logEnabled = syncx.ForAtomicBool(true) ) type ( + // Writer interface wraps the Write method. Writer interface { Write(report *StatReport) error } + // A StatReport is a stat report entry. StatReport struct { Name string `json:"name"` Timestamp int64 `json:"tm"` @@ -34,18 +37,26 @@ type ( Top99p9th float32 `json:"t99p9"` } + // A Metrics is used to log and report stat reports. Metrics struct { executor *executors.PeriodicalExecutor container *metricsContainer } ) +// DisableLog disables logs of stats. +func DisableLog() { + logEnabled.Set(false) +} + +// SetReportWriter sets the report writer. func SetReportWriter(writer Writer) { writerLock.Lock() reportWriter = writer writerLock.Unlock() } +// NewMetrics returns a Metrics. func NewMetrics(name string) *Metrics { container := &metricsContainer{ name: name, @@ -53,21 +64,24 @@ func NewMetrics(name string) *Metrics { } return &Metrics{ - executor: executors.NewPeriodicalExecutor(LogInterval, container), + executor: executors.NewPeriodicalExecutor(logInterval, container), container: container, } } +// Add adds task to m. func (m *Metrics) Add(task Task) { m.executor.Add(task) } +// AddDrop adds a drop to m. func (m *Metrics) AddDrop() { m.executor.Add(Task{ Drop: true, }) } +// SetName sets the name of m. func (m *Metrics) SetName(name string) { m.executor.Sync(func() { m.container.name = name @@ -113,7 +127,7 @@ func (c *metricsContainer) Execute(v interface{}) { Name: c.name, Timestamp: time.Now().Unix(), Pid: c.pid, - ReqsPerSecond: float32(size) / float32(LogInterval/time.Second), + ReqsPerSecond: float32(size) / float32(logInterval/time.Second), Drops: drops, } @@ -192,10 +206,12 @@ func getTopDuration(tasks []Task) float32 { func log(report *StatReport) { writeReport(report) - logx.Statf("(%s) - qps: %.1f/s, drops: %d, avg time: %.1fms, med: %.1fms, "+ - "90th: %.1fms, 99th: %.1fms, 99.9th: %.1fms", - report.Name, report.ReqsPerSecond, report.Drops, report.Average, report.Median, - report.Top90th, report.Top99th, report.Top99p9th) + if logEnabled.True() { + logx.Statf("(%s) - qps: %.1f/s, drops: %d, avg time: %.1fms, med: %.1fms, "+ + "90th: %.1fms, 99th: %.1fms, 99.9th: %.1fms", + report.Name, report.ReqsPerSecond, report.Drops, report.Average, report.Median, + report.Top90th, report.Top99th, report.Top99p9th) + } } func writeReport(report *StatReport) { diff --git a/core/stat/metrics_test.go b/core/stat/metrics_test.go index a8dd93f4..4da547ed 100644 --- a/core/stat/metrics_test.go +++ b/core/stat/metrics_test.go @@ -6,9 +6,14 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/tal-tech/go-zero/core/logx" ) func TestMetrics(t *testing.T) { + logx.Disable() + DisableLog() + defer logEnabled.Set(true) + counts := []int{1, 5, 10, 100, 1000, 1000} for _, count := range counts { m := NewMetrics("foo") diff --git a/core/stat/remotewriter.go b/core/stat/remotewriter.go index 66091a05..a4c5ba71 100644 --- a/core/stat/remotewriter.go +++ b/core/stat/remotewriter.go @@ -12,12 +12,15 @@ import ( const httpTimeout = time.Second * 5 +// ErrWriteFailed is an error that indicates failed to submit a StatReport. var ErrWriteFailed = errors.New("submit failed") +// A RemoteWriter is a writer to write StatReport. type RemoteWriter struct { endpoint string } +// NewRemoteWriter returns a RemoteWriter. func NewRemoteWriter(endpoint string) Writer { return &RemoteWriter{ endpoint: endpoint, diff --git a/core/stat/task.go b/core/stat/task.go index 41c1ee22..637e0b76 100644 --- a/core/stat/task.go +++ b/core/stat/task.go @@ -2,6 +2,7 @@ package stat import "time" +// A Task is a task that is reported to Metrics. type Task struct { Drop bool Duration time.Duration diff --git a/core/stat/usage.go b/core/stat/usage.go index 7d50fbc8..b6a5e3df 100644 --- a/core/stat/usage.go +++ b/core/stat/usage.go @@ -44,6 +44,7 @@ func init() { }() } +// CpuUsage returns current cpu usage. func CpuUsage() int64 { return atomic.LoadInt64(&cpuUsage) }