Metrics 插件
- 示例: metrics
- Metrics 插件使用流行的go-metrics 来计算服务的指标。
- 它包含多个统计指标:
- serviceCounter
- clientMeter
- “service_”+servicePath+”.”+serviceMethod+”Read Qps”
- “service_”+servicePath+”.”+serviceMethod+”Write Qps”
- “service_”+servicePath+”.”+serviceMethod+”_CallTime”
- 你可以将metrics输出到graphite中,通过grafana来监控。
限流
- 实例: rate-limiting
- 限流是一种保护错误,避免服务被突发的或者大量的请求所拖垮。
- 这个插件使用 juju/ratelimit来限流。
- 使用
func NewRateLimitingPlugin(fillInterval time.Duration, capacity int64) *RateLimitingPlugin
t来创建这个插件。
别名
- 示例: alias
- 这个插件可以为一个服务方法设置一个别名。
- 下面的代码使用
Arith
的别名a.b.c.d
, 为Mul
设置别名Times
。func main() { flag.Parse() a := serverplugin.NewAliasPlugin() a.Alias("a.b.c.d", "Times", "Arith", "Mul") s := server.NewServer() s.Plugins.Add(a) s.RegisterName("Arith", new(example.Arith), "") err := s.Serve("reuseport", *addr) if err != nil { panic(err) } }
- 客户端可以使用别名来调用服务:
func main() { flag.Parse() d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "") option := client.DefaultOption option.ReadTimeout = 10 * time.Second xclient := client.NewXClient("a.b.c.d", client.Failtry, client.RandomSelect, d, option) defer xclient.Close() args := &example.Args{ A: 10, B: 20, } reply := &example.Reply{} err := xclient.Call(context.Background(), "Times", args, reply) if err != nil { log.Fatalf("failed to call: %v", err) } log.Printf("%d * %d = %d", args.A, args.B, reply.C) }
身份认证
- 示例: auth
- 出于安全的考虑, 很多场景下只有授权的客户端才可以调用服务。
- 客户端必须设置一个
token
, 这个token
可以从其他的 OAuth/OAuth2 服务器获得,或者由服务提供者分配。 - 服务接收到请求后,需要验证这个
token
。如果是token
是从 OAuth2服务器中申请到,则服务需要到OAuth2服务中去验证, 如果是自己分配的,则需要和自己的记录进行对别。 - 因为rpcx提供的是一个身份验证的框架,所以具体的身份验证需要自己集成和验证。
func main() { flag.Parse() s := server.NewServer() s.RegisterName("Arith", new(example.Arith), "") s.AuthFunc = auth s.Serve("reuseport", *addr) } func auth(ctx context.Context, req *protocol.Message, token string) error { if token == "bearer tGzv3JOkF0XG5Qx2TlKWIA" { return nil } return errors.New("invalid token") }
- 服务器必须定义
AuthFunc
来验证token。在上面的例子中, 只有token为bearer tGzv3JOkF0XG5Qx2TlKWIA
才是合法的客户端。 - 客户端必须设置这个
toekn
:func main() { flag.Parse() d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "") option := client.DefaultOption option.ReadTimeout = 10 * time.Second xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, option) defer xclient.Close() //xclient.Auth("bearer tGzv3JOkF0XG5Qx2TlKWIA") xclient.Auth("bearer abcdefg1234567") args := &example.Args{ A: 10, B: 20, } reply := &example.Reply{} ctx := context.WithValue(context.Background(), share.ReqMetaDataKey, make(map[string]string)) err := xclient.Call(ctx, "Mul", args, reply) if err != nil { log.Fatalf("failed to call: %v", err) } log.Printf("%d * %d = %d", args.A, args.B, reply.C) }
- 注意: 你必须设置
map[string]string
为share.ReqMetaDataKey
的值,否则调用会出错
插件开发
rpcx为服务器和客户端定义了几个插件接口,在一些处理点上可以调用插件。
Server Plugin
- 示例: trace
- go doc
type PostConnAcceptPlugin type PostConnAcceptPlugin interface { HandleConnAccept(net.Conn) (net.Conn, bool) } type PostReadRequestPlugin type PostReadRequestPlugin interface { PostReadRequest(ctx context.Context, r *protocol.Message, e error) error } PostReadRequestPlugin represents . type PostWriteResponsePlugin type PostWriteResponsePlugin interface { PostWriteResponse(context.Context, *protocol.Message, *protocol.Message, error) error } PostWriteResponsePlugin represents . type PreReadRequestPlugin type PreReadRequestPlugin interface { PreReadRequest(ctx context.Context) error } PreReadRequestPlugin represents . type PreWriteResponsePlugin type PreWriteResponsePlugin interface { PreWriteResponse(context.Context, *protocol.Message) error } PreWriteResponsePlugin represents .
Client Plugin
- go doc
type PluginContainer type PluginContainer interface { Add(plugin Plugin) Remove(plugin Plugin) All() []Plugin DoPreCall(ctx context.Context, servicePath, serviceMethod string, args interface{}) error DoPostCall(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, err error) error } type PostCallPlugin type PostCallPlugin interface { DoPostCall(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, err error) error } PostCallPlugin is invoked after the client calls a server. type PreCallPlugin type PreCallPlugin interface { DoPreCall(ctx context.Context, servicePath, serviceMethod string, args interface{}) error } PreCallPlugin is invoked before the client calls a server.