print message when starting api server

master
kevin 4 years ago
parent a50bcb90a6
commit a561884fcf

@ -1,4 +1,4 @@
# 进程内共享调用SharedCalls # 防止缓存击穿之进程内共享调用
go-zero微服务框架中提供了许多开箱即用的工具好的工具不仅能提升服务的性能而且还能提升代码的鲁棒性避免出错实现代码风格的统一方便他人阅读等等。 go-zero微服务框架中提供了许多开箱即用的工具好的工具不仅能提升服务的性能而且还能提升代码的鲁棒性避免出错实现代码风格的统一方便他人阅读等等。
@ -8,68 +8,51 @@ go-zero微服务框架中提供了许多开箱即用的工具好的工具不
并发场景下可能会有多个线程协程同时请求同一份资源如果每个请求都要走一遍资源的请求过程除了比较低效之外还会对资源服务造成并发的压力。举一个具体例子比如缓存失效多个请求同时到达某服务请求某资源该资源在缓存中已经失效此时这些请求会继续访问DB做查询会引起数据库压力瞬间增大。而使用SharedCalls可以使得同时多个请求只需要发起一次拿结果的调用其他请求"坐享其成",这种设计有效减少了资源服务的并发压力,可以有效防止缓存击穿。 并发场景下可能会有多个线程协程同时请求同一份资源如果每个请求都要走一遍资源的请求过程除了比较低效之外还会对资源服务造成并发的压力。举一个具体例子比如缓存失效多个请求同时到达某服务请求某资源该资源在缓存中已经失效此时这些请求会继续访问DB做查询会引起数据库压力瞬间增大。而使用SharedCalls可以使得同时多个请求只需要发起一次拿结果的调用其他请求"坐享其成",这种设计有效减少了资源服务的并发压力,可以有效防止缓存击穿。
例如高并发情况下多个请求同时查询某用户信息以下是利用SharedCalls工具包实现的用户信息获取方法注意⚠go-zero框架中已经提供了针对此场景的代码实现以下代码只为说明SharedCalls的使用方式具体可参看[sqlc](https://github.com/tal-tech/go-zero/blob/master/core/stores/sqlc/cachedsql.go)和[mongoc](https://github.com/tal-tech/go-zero/blob/master/core/stores/mongoc/cachedcollection.go)等处代码) 高并发场景下当某个热点key缓存失效后多个请求会同时从数据库加载该资源并保存到缓存如果不做防范可能会导致数据库被直接打死。针对这种场景go-zero框架中已经提供了实现具体可参看[sqlc](https://github.com/tal-tech/go-zero/blob/master/core/stores/sqlc/cachedsql.go)和[mongoc](https://github.com/tal-tech/go-zero/blob/master/core/stores/mongoc/cachedcollection.go)等实现代码。
为了简化演示代码我们通过多个线程同时去获取一个id来模拟缓存的场景。如下
```go ```go
// 用户信息结构体 func main() {
type UserInfo struct { const round = 5
UserId int var wg sync.WaitGroup
Name string barrier := syncx.NewSharedCalls()
Age int
} wg.Add(round)
for i := 0; i < round; i++ {
// 多个线程同时执行
go func() {
defer wg.Done()
// 可以看到多个线程在同一个key上去请求资源获取资源的实际函数只会被调用一次
val, err := barrier.Do("once", func() (interface{}, error) {
// sleep 1秒为了让多个线程同时取once这个key上的数据
time.Sleep(time.Second)
// 生成了一个随机的id
return stringx.RandId(), nil
})
if err != nil {
fmt.Println(err)
} else {
fmt.Println(val)
}
}()
}
// 用户信息model对象 wg.Wait()
type UserModel struct {
db *sql.DB
rds *redis.Client
barrier syncx.SharedCalls
} }
```
// 利用SharedCalls实现的用户信息获取方法 运行,打印结果为:
func (um *UserModel) GetUserInfoEffectively(userId int) (*UserInfo, error) {
// SharedCalls.Do方法提供两个参数第一个参数是资源获取期间的唯一标识
// 第二个参数是指定一个fun()(interface{},error) 方法来真正获取资源
val, err := um.barrier.Do(fmt.Sprintf("uId:{%d}:applying", userId), func() (interface{}, error) {
userInfo := &UserInfo{}
// 从redis中获取用户信息
userInfoStr, err := um.rds.Get(fmt.Sprintf("uId:{%d}", userId)).Result()
if err != nil && err != redis.Nil {
return nil, err
}
// 缓存不为空,解析缓存中的用户数据
if len(userInfoStr) > 0 {
if err := json.Unmarshal(([]byte)(userInfoStr),userInfo);err!=nil {
return nil, err
}
} else {
// 缓存为空从db中获取用户数据
if err := um.db.QueryRow("select id,name,age from users where id=?", userId).Scan(&userInfo.UserId,&userInfo.Name, &userInfo.Age); err != nil {
return nil, err
}
// 将用户信息保存到缓存
userInfoBytes, err := json.Marshal(userInfo)
if err == nil {
um.rds.Set(fmt.Sprintf("uId:{%d}", userId), userInfoBytes, 5*time.Second)
}
}
return userInfo, nil
})
// 判断获取用户过程中是否发生错误
if err != nil {
return nil, err
}
// 返回用户信息
return val.(*UserInfo), nil
}
``` ```
837c577b1008a0db
837c577b1008a0db
837c577b1008a0db
837c577b1008a0db
837c577b1008a0db
```
可以看出只要是同一个key上的同时发起的请求都会共享同一个结果对获取DB数据进缓存等场景特别有用可以有效防止缓存击穿。
## 关键源码分析 ## 关键源码分析
@ -78,8 +61,8 @@ func (um *UserModel) GetUserInfoEffectively(userId int) (*UserInfo, error) {
```go ```go
// SharedCalls接口提供了Do和DoEx两种方法 // SharedCalls接口提供了Do和DoEx两种方法
type SharedCalls interface { type SharedCalls interface {
Do(key string, fn func() (interface{}, error)) (interface{}, error) Do(key string, fn func() (interface{}, error)) (interface{}, error)
DoEx(key string, fn func() (interface{}, error)) (interface{}, bool, error) DoEx(key string, fn func() (interface{}, error)) (interface{}, bool, error)
} }
``` ```
@ -88,14 +71,14 @@ func (um *UserModel) GetUserInfoEffectively(userId int) (*UserInfo, error) {
```go ```go
// call代表对指定资源的一次请求 // call代表对指定资源的一次请求
type call struct { type call struct {
wg sync.WaitGroup // 用于协调各个请求goroutine之间的资源共享 wg sync.WaitGroup // 用于协调各个请求goroutine之间的资源共享
val interface{} // 用于保存请求的返回值 val interface{} // 用于保存请求的返回值
err error // 用于保存请求过程中发生的错误 err error // 用于保存请求过程中发生的错误
} }
type sharedGroup struct { type sharedGroup struct {
calls map[string]*call calls map[string]*call
lock sync.Mutex lock sync.Mutex
} }
``` ```
@ -108,49 +91,41 @@ func (um *UserModel) GetUserInfoEffectively(userId int) (*UserInfo, error) {
```go ```go
// 当多个请求同时使用Do方法请求资源时 // 当多个请求同时使用Do方法请求资源时
func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) { func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
// 先申请加锁 // 先申请加锁
g.lock.Lock() g.lock.Lock()
// 根据key获取对应的call结果,并用变量c保存 // 根据key获取对应的call结果,并用变量c保存
if c, ok := g.calls[key]; ok { if c, ok := g.calls[key]; ok {
// 拿到call以后释放锁此处call可能还没有实际数据只是一个空的内存占位 // 拿到call以后释放锁此处call可能还没有实际数据只是一个空的内存占位
g.lock.Unlock() g.lock.Unlock()
// 调用wg.Wait判断是否有其他goroutine正在申请资源如果阻塞说明有其他goroutine正在获取资源 // 调用wg.Wait判断是否有其他goroutine正在申请资源如果阻塞说明有其他goroutine正在获取资源
c.wg.Wait() c.wg.Wait()
// 当wg.Wait不再阻塞表示资源获取已经结束可以直接返回结果 // 当wg.Wait不再阻塞表示资源获取已经结束可以直接返回结果
return c.val, c.err return c.val, c.err
} }
// 没有拿到结果则调用makeCall方法去获取资源注意此处仍然是锁住的可以保证只有一个goroutine可以调用makecall // 没有拿到结果则调用makeCall方法去获取资源注意此处仍然是锁住的可以保证只有一个goroutine可以调用makecall
c := g.makeCall(key, fn) c := g.makeCall(key, fn)
// 返回调用结果
// 返回调用结果
return c.val, c.err return c.val, c.err
} }
``` ```
- sharedGroup的DoEx方法 - sharedGroup的DoEx方法
- 和Do方法类似只是返回值中增加了布尔值表示值是调用makeCall方法直接获取的还是取的共享成果 - 和Do方法类似只是返回值中增加了布尔值表示值是调用makeCall方法直接获取的还是取的共享成果
```go ```go
func (g *sharedGroup) DoEx(key string, fn func() (interface{}, error)) (val interface{}, fresh bool, err error) { func (g *sharedGroup) DoEx(key string, fn func() (interface{}, error)) (val interface{}, fresh bool, err error) {
g.lock.Lock() g.lock.Lock()
if c, ok := g.calls[key]; ok { if c, ok := g.calls[key]; ok {
g.lock.Unlock() g.lock.Unlock()
c.wg.Wait() c.wg.Wait()
return c.val, false, c.err return c.val, false, c.err
} }
c := g.makeCall(key, fn) c := g.makeCall(key, fn)
return c.val, true, c.err return c.val, true, c.err
} }
``` ```
@ -159,24 +134,18 @@ func (um *UserModel) GetUserInfoEffectively(userId int) (*UserInfo, error) {
- 该方法由Do和DoEx方法调用是真正发起资源请求的方法。 - 该方法由Do和DoEx方法调用是真正发起资源请求的方法。
```go ```go
//进入makeCall的一定只有一个goroutine因为要拿锁锁住的 // 进入makeCall的一定只有一个goroutine因为要拿锁锁住的
func (g *sharedGroup) makeCall(key string, fn func() (interface{}, error)) *call { func (g *sharedGroup) makeCall(key string, fn func() (interface{}, error)) *call {
// 创建call结构用于保存本次请求的结果 // 创建call结构用于保存本次请求的结果
c := new(call) c := new(call)
// wg加1用于通知其他请求资源的goroutine等待本次资源获取的结束 // wg加1用于通知其他请求资源的goroutine等待本次资源获取的结束
c.wg.Add(1) c.wg.Add(1)
// 将用于保存结果的call放入map中以供其他goroutine获取 // 将用于保存结果的call放入map中以供其他goroutine获取
g.calls[key] = c g.calls[key] = c
// 释放锁这样其他请求的goroutine才能获取call的内存占位 // 释放锁这样其他请求的goroutine才能获取call的内存占位
g.lock.Unlock() g.lock.Unlock()
defer func() { defer func() {
// delete key first, done later. can't reverse the order, because if reverse, // delete key first, done later. can't reverse the order, because if reverse,
// another Do call might wg.Wait() without get notified with wg.Done() // another Do call might wg.Wait() without get notified with wg.Done()
g.lock.Lock() g.lock.Lock()
@ -185,16 +154,13 @@ func (um *UserModel) GetUserInfoEffectively(userId int) (*UserInfo, error) {
// 调用wg.Done通知其他goroutine可以返回结果这样本批次所有请求完成结果的共享 // 调用wg.Done通知其他goroutine可以返回结果这样本批次所有请求完成结果的共享
c.wg.Done() c.wg.Done()
}() }()
// 调用fn方法将结果填入变量c中 // 调用fn方法将结果填入变量c中
c.val, c.err = fn() c.val, c.err = fn()
return c return c
} }
``` ```
## 最后 ## 最后

@ -1,10 +1,12 @@
package main package main
import ( import (
"flag"
"fmt"
"bookstore/api/internal/config" "bookstore/api/internal/config"
"bookstore/api/internal/handler" "bookstore/api/internal/handler"
"bookstore/api/internal/svc" "bookstore/api/internal/svc"
"flag"
"github.com/tal-tech/go-zero/core/conf" "github.com/tal-tech/go-zero/core/conf"
"github.com/tal-tech/go-zero/rest" "github.com/tal-tech/go-zero/rest"
@ -23,5 +25,7 @@ func main() {
defer server.Stop() defer server.Stop()
handler.RegisterHandlers(server, ctx) handler.RegisterHandlers(server, ctx)
fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
server.Start() server.Start()
} }

@ -4,15 +4,16 @@
package main package main
import ( import (
"flag"
"fmt"
"bookstore/rpc/add/internal/config" "bookstore/rpc/add/internal/config"
"bookstore/rpc/add/internal/server" "bookstore/rpc/add/internal/server"
"bookstore/rpc/add/internal/svc" "bookstore/rpc/add/internal/svc"
add "bookstore/rpc/add/pb" add "bookstore/rpc/add/pb"
"flag"
"fmt"
"log"
"github.com/tal-tech/go-zero/core/conf" "github.com/tal-tech/go-zero/core/conf"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/rpcx" "github.com/tal-tech/go-zero/rpcx"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -30,9 +31,7 @@ func main() {
s, err := rpcx.NewServer(c.RpcServerConf, func(grpcServer *grpc.Server) { s, err := rpcx.NewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
add.RegisterAdderServer(grpcServer, adderSrv) add.RegisterAdderServer(grpcServer, adderSrv)
}) })
if err != nil { logx.Must(err)
log.Fatal(err)
}
fmt.Printf("Starting rpc server at %s...\n", c.ListenOn) fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
s.Start() s.Start()

@ -4,15 +4,16 @@
package main package main
import ( import (
"flag"
"fmt"
"bookstore/rpc/check/internal/config" "bookstore/rpc/check/internal/config"
"bookstore/rpc/check/internal/server" "bookstore/rpc/check/internal/server"
"bookstore/rpc/check/internal/svc" "bookstore/rpc/check/internal/svc"
check "bookstore/rpc/check/pb" check "bookstore/rpc/check/pb"
"flag"
"fmt"
"log"
"github.com/tal-tech/go-zero/core/conf" "github.com/tal-tech/go-zero/core/conf"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/rpcx" "github.com/tal-tech/go-zero/rpcx"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -30,9 +31,7 @@ func main() {
s, err := rpcx.NewServer(c.RpcServerConf, func(grpcServer *grpc.Server) { s, err := rpcx.NewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
check.RegisterCheckerServer(grpcServer, checkerSrv) check.RegisterCheckerServer(grpcServer, checkerSrv)
}) })
if err != nil { logx.Must(err)
log.Fatal(err)
}
fmt.Printf("Starting rpc server at %s...\n", c.ListenOn) fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
s.Start() s.Start()

@ -0,0 +1,34 @@
package main
import (
"fmt"
"sync"
"time"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/syncx"
)
func main() {
const round = 5
var wg sync.WaitGroup
barrier := syncx.NewSharedCalls()
wg.Add(round)
for i := 0; i < round; i++ {
go func() {
defer wg.Done()
val, err := barrier.Do("once", func() (interface{}, error) {
time.Sleep(time.Second)
return stringx.RandId(), nil
})
if err != nil {
fmt.Println(err)
} else {
fmt.Println(val)
}
}()
}
wg.Wait()
}

@ -16,11 +16,11 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
"log"
{{.imports}} {{.imports}}
"github.com/tal-tech/go-zero/core/conf" "github.com/tal-tech/go-zero/core/conf"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/rpcx" "github.com/tal-tech/go-zero/rpcx"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -38,9 +38,7 @@ func main() {
s, err := rpcx.NewServer(c.RpcServerConf, func(grpcServer *grpc.Server) { s, err := rpcx.NewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
{{.registers}} {{.registers}}
}) })
if err != nil { logx.Must(err)
log.Fatal(err)
}
fmt.Printf("Starting rpc server at %s...\n", c.ListenOn) fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
s.Start() s.Start()

Loading…
Cancel
Save