diff --git a/doc/images/resilience.jpg b/doc/images/resilience.jpg new file mode 100644 index 00000000..058e4e76 Binary files /dev/null and b/doc/images/resilience.jpg differ diff --git a/readme.md b/readme.md new file mode 100644 index 00000000..56578aac --- /dev/null +++ b/readme.md @@ -0,0 +1,70 @@ +# go-zero开源项目介绍 + +> 部门:晓黑板研发部 +> +> git地址: + +## 1. go-zero框架背景 + +18年初,晓黑板后端在经过频繁的宕机后,决定从`Java+MongoDB`的单体架构迁移到微服务架构,经过仔细思考和对比,我们决定: + +* 基于Go语言 + * 高效的性能 + * 简洁的语法 + * 广泛验证的工程效率 + * 极致的部署体验 + * 极低的服务端资源成本 +* 自研微服务框架 + * 个人有过很多微服务框架自研经验 + * 需要有更快速的问题定位能力 + * 更便捷的增加新特性 + +## 2. go-zero框架设计思考 + +对于微服务框架的设计,我们期望保障微服务稳定性的同时,也要特别注重研发效率。所以设计之初,我们就有如下一些准则: + +* 保持简单 +* 高可用 +* 高并发 +* 易扩展 +* 弹性设计,面向故障编程 +* 尽可能对业务开发友好,封装复杂度 +* 尽可能约束做一件事只有一种方式 + +我们经历不到半年时间,彻底完成了从`Java+MongoDB`到`Golang+MySQL`为主的微服务体系迁移,并于18年8月底完全上线,稳定保障了晓黑板后续增长,确保了整个服务的高可用。 + +## 3. go-zero项目实现和特点 + +go-zero是一个集成了各种工程实践的包含web和rpc框架,有如下主要特点: + +* 强大的工具支持,尽可能少的代码编写 +* 极简的接口 +* 完全兼容net/http +* 支持中间件,方便扩展 +* 高性能 +* 面向故障编程,弹性设计 +* 内建服务发现、负载均衡 +* 内建限流、熔断、降载,且自动触发,自动恢复 +* API参数自动校验 +* 超时级联控制 +* 自动缓存控制 +* 链路跟踪、统计报警等 +* 高并发支撑,稳定保障了晓黑板疫情期间每天的流量洪峰 + +如下图,我们从多个层面保障了整体服务的高可用: + +![弹性设计](/Users/kevin/Develop/go/opensource/go-zero/doc/images/resilience.jpg) + +## 4. go-zero框架收益 + +* 保障大并发服务端的稳定性,经受了充分的实战检验 +* 极简的API定义 +* 一键生成Go, iOS, Android, Dart, TypeScript, JavaScript代码,并可直接运行 +* 服务端自动校验参数合法性 + +## 5. go-zero近期开发计划 + +* 升级grpc到最新版 +* 加入Power of Two Choices Peak Exponentially Weighted Moving Average负载均衡算法 +* 自动生成API mock server,便于客户端开发 +* 自动生成服务端功能测试 \ No newline at end of file diff --git a/stash/config/config.go b/stash/config/config.go deleted file mode 100644 index e0a0d507..00000000 --- a/stash/config/config.go +++ /dev/null @@ -1,41 +0,0 @@ -package config - -import ( - "time" - - "zero/kq" -) - -type ( - Condition struct { - Key string - Value string - Type string `json:",default=match,options=match|contains"` - Op string `json:",default=and,options=and|or"` - } - - ElasticSearchConf struct { - Hosts []string - DailyIndexPrefix string - TimeZone string `json:",optional"` - MaxChunkBytes int `json:",default=1048576"` - Compress bool `json:",default=false"` - } - - Filter struct { - Action string `json:",options=drop|remove_field"` - Conditions []Condition `json:",optional"` - Fields []string `json:",optional"` - } - - Config struct { - Input struct { - Kafka kq.KqConf - } - Filters []Filter - Output struct { - ElasticSearch ElasticSearchConf - } - GracePeriod time.Duration `json:",default=10s"` - } -) diff --git a/stash/es/index.go b/stash/es/index.go deleted file mode 100644 index ed150f6f..00000000 --- a/stash/es/index.go +++ /dev/null @@ -1,82 +0,0 @@ -package es - -import ( - "context" - "sync" - "time" - - "zero/core/fx" - "zero/core/logx" - "zero/core/syncx" - - "github.com/olivere/elastic" -) - -const sharedCallsKey = "ensureIndex" - -type ( - IndexFormat func(time.Time) string - IndexFunc func() string - - Index struct { - client *elastic.Client - indexFormat IndexFormat - index string - lock sync.RWMutex - sharedCalls syncx.SharedCalls - } -) - -func NewIndex(client *elastic.Client, indexFormat IndexFormat) *Index { - return &Index{ - client: client, - indexFormat: indexFormat, - sharedCalls: syncx.NewSharedCalls(), - } -} - -func (idx *Index) GetIndex(t time.Time) string { - index := idx.indexFormat(t) - if err := idx.ensureIndex(index); err != nil { - logx.Error(err) - } - return index -} - -func (idx *Index) ensureIndex(index string) error { - idx.lock.RLock() - if index == idx.index { - idx.lock.RUnlock() - return nil - } - idx.lock.RUnlock() - - _, err := idx.sharedCalls.Do(sharedCallsKey, func() (i interface{}, err error) { - idx.lock.Lock() - defer idx.lock.Unlock() - - existsService := elastic.NewIndicesExistsService(idx.client) - existsService.Index([]string{index}) - exist, err := existsService.Do(context.Background()) - if err != nil { - return nil, err - } - if exist { - idx.index = index - return nil, nil - } - - createService := idx.client.CreateIndex(index) - if err := fx.DoWithRetries(func() error { - // is it necessary to check the result? - _, err := createService.Do(context.Background()) - return err - }); err != nil { - return nil, err - } - - idx.index = index - return nil, nil - }) - return err -} diff --git a/stash/es/writer.go b/stash/es/writer.go deleted file mode 100644 index ac4e0fa6..00000000 --- a/stash/es/writer.go +++ /dev/null @@ -1,65 +0,0 @@ -package es - -import ( - "context" - "time" - - "zero/core/executors" - "zero/core/logx" - "zero/stash/config" - - "github.com/olivere/elastic" -) - -const docType = "doc" - -type ( - Writer struct { - client *elastic.Client - indexer *Index - inserter *executors.ChunkExecutor - } - - valueWithTime struct { - t time.Time - val string - } -) - -func NewWriter(c config.ElasticSearchConf, indexer *Index) (*Writer, error) { - client, err := elastic.NewClient( - elastic.SetSniff(false), - elastic.SetURL(c.Hosts...), - elastic.SetGzip(c.Compress), - ) - if err != nil { - return nil, err - } - - writer := Writer{ - client: client, - indexer: indexer, - } - writer.inserter = executors.NewChunkExecutor(writer.execute, executors.WithChunkBytes(c.MaxChunkBytes)) - return &writer, nil -} - -func (w *Writer) Write(t time.Time, val string) error { - return w.inserter.Add(valueWithTime{ - t: t, - val: val, - }, len(val)) -} - -func (w *Writer) execute(vals []interface{}) { - var bulk = w.client.Bulk() - for _, val := range vals { - pair := val.(valueWithTime) - req := elastic.NewBulkIndexRequest().Index(w.indexer.GetIndex(pair.t)).Type(docType).Doc(pair.val) - bulk.Add(req) - } - _, err := bulk.Do(context.Background()) - if err != nil { - logx.Error(err) - } -} diff --git a/stash/etc/config.json b/stash/etc/config.json deleted file mode 100644 index 35a610d2..00000000 --- a/stash/etc/config.json +++ /dev/null @@ -1,75 +0,0 @@ -{ - "Input": { - "Kafka": { - "Name": "easystash", - "Brokers": [ - "172.16.186.156:19092", - "172.16.186.157:19092", - "172.16.186.158:19092", - "172.16.186.159:19092", - "172.16.186.160:19092", - "172.16.186.161:19092" - ], - "Topic": "k8slog", - "Group": "pro", - "NumProducers": 16, - "MetricsUrl": "http://localhost:2222/add" - } - }, - "Filters": [ - { - "Action": "drop", - "Conditions": [ - { - "Key": "k8s_container_name", - "Value": "-rpc", - "Type": "contains" - }, - { - "Key": "level", - "Value": "info", - "Type": "match", - "Op": "and" - } - ] - }, - { - "Action": "remove_field", - "Fields": [ - "message", - "_source", - "_type", - "_score", - "_id", - "@version", - "topic", - "index", - "beat", - "docker_container", - "offset", - "prospector", - "source", - "stream" - ] - } - ], - "Output": { - "ElasticSearch": { - "Hosts": [ - "172.16.141.14:9200", - "172.16.141.15:9200", - "172.16.141.16:9200", - "172.16.141.17:9200", - "172.16.140.195:9200", - "172.16.140.196:9200", - "172.16.140.197:9200", - "172.16.140.198:9200", - "172.16.140.199:9200", - "172.16.140.200:9200", - "172.16.140.201:9200", - "172.16.140.202:9200" - ], - "DailyIndexPrefix": "k8s_pro-" - } - } -} \ No newline at end of file diff --git a/stash/filter/filters.go b/stash/filter/filters.go deleted file mode 100644 index c1a59f30..00000000 --- a/stash/filter/filters.go +++ /dev/null @@ -1,104 +0,0 @@ -package filter - -import ( - "strings" - - "zero/stash/config" - - "github.com/globalsign/mgo/bson" -) - -const ( - filterDrop = "drop" - filterRemoveFields = "remove_field" - opAnd = "and" - opOr = "or" - typeContains = "contains" - typeMatch = "match" -) - -type FilterFunc func(map[string]interface{}) map[string]interface{} - -func CreateFilters(c config.Config) []FilterFunc { - var filters []FilterFunc - - for _, f := range c.Filters { - switch f.Action { - case filterDrop: - filters = append(filters, DropFilter(f.Conditions)) - case filterRemoveFields: - filters = append(filters, RemoveFieldFilter(f.Fields)) - } - } - - return filters -} - -func DropFilter(conds []config.Condition) FilterFunc { - return func(m map[string]interface{}) map[string]interface{} { - var qualify bool - for _, cond := range conds { - var qualifyOnce bool - switch cond.Type { - case typeMatch: - qualifyOnce = cond.Value == m[cond.Key] - case typeContains: - if val, ok := m[cond.Key].(string); ok { - qualifyOnce = strings.Contains(val, cond.Value) - } - } - - switch cond.Op { - case opAnd: - if !qualifyOnce { - return m - } else { - qualify = true - } - case opOr: - if qualifyOnce { - qualify = true - } - } - } - - if qualify { - return nil - } else { - return m - } - } -} - -func RemoveFieldFilter(fields []string) FilterFunc { - return func(m map[string]interface{}) map[string]interface{} { - for _, field := range fields { - delete(m, field) - } - return m - } -} - -func AddUriFieldFilter(inField, outFirld string) FilterFunc { - return func(m map[string]interface{}) map[string]interface{} { - if val, ok := m[inField].(string); ok { - var datas []string - idx := strings.Index(val, "?") - if idx < 0 { - datas = strings.Split(val, "/") - } else { - datas = strings.Split(val[:idx], "/") - } - - for i, data := range datas { - if bson.IsObjectIdHex(data) { - datas[i] = "*" - } - } - - m[outFirld] = strings.Join(datas, "/") - } - - return m - } -} diff --git a/stash/handler/handler.go b/stash/handler/handler.go deleted file mode 100644 index c278542f..00000000 --- a/stash/handler/handler.go +++ /dev/null @@ -1,64 +0,0 @@ -package handler - -import ( - "time" - - "zero/stash/es" - "zero/stash/filter" - - jsoniter "github.com/json-iterator/go" -) - -const ( - timestampFormat = "2006-01-02T15:04:05.000Z" - timestampKey = "@timestamp" -) - -type MessageHandler struct { - writer *es.Writer - filters []filter.FilterFunc -} - -func NewHandler(writer *es.Writer) *MessageHandler { - return &MessageHandler{ - writer: writer, - } -} - -func (mh *MessageHandler) AddFilters(filters ...filter.FilterFunc) { - for _, f := range filters { - mh.filters = append(mh.filters, f) - } -} - -func (mh *MessageHandler) Consume(_, val string) error { - m := make(map[string]interface{}) - if err := jsoniter.Unmarshal([]byte(val), &m); err != nil { - return err - } - - for _, proc := range mh.filters { - if m = proc(m); m == nil { - return nil - } - } - - bs, err := jsoniter.Marshal(m) - if err != nil { - return err - } - - return mh.writer.Write(mh.getTime(m), string(bs)) -} - -func (mh *MessageHandler) getTime(m map[string]interface{}) time.Time { - if ti, ok := m[timestampKey]; ok { - if ts, ok := ti.(string); ok { - if t, err := time.Parse(timestampFormat, ts); err == nil { - return t - } - } - } - - return time.Now() -} diff --git a/stash/stash.go b/stash/stash.go deleted file mode 100644 index b0eddf9c..00000000 --- a/stash/stash.go +++ /dev/null @@ -1,57 +0,0 @@ -package main - -import ( - "flag" - "time" - - "zero/core/conf" - "zero/core/lang" - "zero/core/proc" - "zero/kq" - "zero/stash/config" - "zero/stash/es" - "zero/stash/filter" - "zero/stash/handler" - - "github.com/olivere/elastic" -) - -const dateFormat = "2006.01.02" - -var configFile = flag.String("f", "etc/config.json", "Specify the config file") - -func main() { - flag.Parse() - - var c config.Config - conf.MustLoad(*configFile, &c) - proc.SetTimeoutToForceQuit(c.GracePeriod) - - client, err := elastic.NewClient( - elastic.SetSniff(false), - elastic.SetURL(c.Output.ElasticSearch.Hosts...), - ) - lang.Must(err) - - indexFormat := c.Output.ElasticSearch.DailyIndexPrefix + dateFormat - var loc *time.Location - if len(c.Output.ElasticSearch.TimeZone) > 0 { - loc, err = time.LoadLocation(c.Output.ElasticSearch.TimeZone) - lang.Must(err) - } else { - loc = time.Local - } - indexer := es.NewIndex(client, func(t time.Time) string { - return t.In(loc).Format(indexFormat) - }) - - filters := filter.CreateFilters(c) - writer, err := es.NewWriter(c.Output.ElasticSearch, indexer) - lang.Must(err) - - handle := handler.NewHandler(writer) - handle.AddFilters(filters...) - handle.AddFilters(filter.AddUriFieldFilter("url", "uri")) - q := kq.MustNewQueue(c.Input.Kafka, handle) - q.Start() -}