kratos源码解析(二)--中间件
kratos提供了一个通用的Middleware类型.使得一个这样类型的中间件可以同时用在grpc和http服务中使用
// Middleware is HTTP/gRPC transport middleware.
type Middleware func(Handler) Handler
在官方文档里面有说到
自定义中间件
需要实现
Middleware
接口。 中间件中您可以使用tr, ok := transport.FromServerContext(ctx)
获得Transporter实例以便访问接口相关的元信息
但实际上,对于kratos来说,中间件并不是只有实现Middleware这个接口这个途径.(说时候,kratos的文档真的太随便了,很多设计和功能等都没在文档中体现)
kratos服务中间件详解
我们现在只看kratos-grpc服务的话,中间件可以分为两种.
-
前面提及的,kratos定义的中间件类型,传入一个handler,然后返回一个handler.
// Handler defines the handler invoked by Middleware. type Handler func(ctx context.Context, req interface{}) (intertypeface{}, error) // Middleware is HTTP/gRPC transport middleware. type Middleware func(Handler) Handler
-
被嵌入的golang-grpc-server支持的中间件类型
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error) type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
那么问题来了.kratos-app启动的grpc服务,本质上还是运行golang-grpc-server服务,那到底是个怎样的流程呢?
type Middleware func(Handler) Handler 详解
首先我们可以回到前面说的这个方法:
// 这也是整个kratos调用的grpc服务稳定第一个中间件
func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
......
// 下面是关键,简单来说,这个grpc.UnaryServerInterceptor把kratos-app里面初始化后带有的Middleware和最终调用的handler给串联起来.
if next := s.middleware.Match(tr.Operation()); len(next) > 0 {
h = middleware.Chain(next...)(h)
}
reply, err := h(ctx, req)
if len(replyHeader) > 0 {
_ = grpc.SetHeader(ctx, replyHeader)
}
return reply, err
}
}
上面源码说明,kratos调用的grpc-server中注册了一个grpc.UnaryServerInterceptor.这一个首先运行的中间件主要做了下面的事情:
- 把请求的ctx和kratos的ctx合并起来(前面文章有说)
- 把app信息和请求的metadata等信息放到transport中,然后把transport放到前面合并的ctx中继续往下传递(前面文章有说)
- 最后初始化在kratos-grpc-server的多个Middleware中选出匹配的和最终要执行的handler串联起来成一个handler.
- 最后才是执行这个被串联出来的handler
同时值得注意的是,这个Middleware是会在每个请求方法执行前才串联生成,而不是在初始化server时候就串联.因为kratos的Middleware在前面说过,是支持对对特定路由定制中间件的.而实现这个的主要原因,是这个接口:
// Matcher is a middleware matcher.
type Matcher interface {
// 添加对应的默认Middleware
Use(ms ...middleware.Middleware)
// 添加selector对应的Middleware
Add(selector string, ms ...middleware.Middleware)
// 给出对应的请求operation匹配对应的路由
Match(operation string) []middleware.Middleware
}
// 而在kratos-grpc-server中,存储所有Middleware的,就是这个接口
// Server is a gRPC server wrapper.
type Server struct {
...
middleware matcher.Matcher
...
}
// 而实现这个接口的,是下面这个接口体
// kratos实现这个比较简单,具体源码不贴了.简单来说就是基本的字符串匹配.
type matcher struct {
prefix []string
defaults []middleware.Middleware
matchs map[string][]middleware.Middleware
}
grpc-server带的UnaryServerInterceptor
上面我们可以看到,kratos的Middleware在kratos-grpc-server的unaryServerInterceptor中被串联执行的.
而如果我们有其他的grpc.UnaryServerInterceptor想执行,也是可以的.关键是下面的代码:
// 初始化配置中,可以传入grpc.UnaryServerInterceptor
func UnaryInterceptor(in ...grpc.UnaryServerInterceptor) ServerOption {
return func(s *Server) {
s.unaryInts = in
}
}
func StreamInterceptor(in ...grpc.StreamServerInterceptor) ServerOption {
return func(s *Server) {
s.streamInts = in
}
}
func NewServer(opts ...ServerOption) *Server {
...
// 生成最终的grpc-server时候,除了前面说的kratos的unaryServerInterceptor外,会把传入的grpc.UnaryServerInterceptor给添加上.
if len(srv.unaryInts) > 0 {
unaryInts = append(unaryInts, srv.unaryInts...)
}
if len(srv.streamInts) > 0 {
streamInts = append(streamInts, srv.streamInts...)
}
grpcOpts := []grpc.ServerOption{
grpc.ChainUnaryInterceptor(unaryInts...),
grpc.ChainStreamInterceptor(streamInts...),
}
...
if len(srv.grpcOpts) > 0 {
grpcOpts = append(grpcOpts, srv.grpcOpts...)
}
srv.Server = grpc.NewServer(grpcOpts...)
...
}
所以,在上面的代码中可以看出.kratos有效的兼容了自身定义的Middleware和grpc.UnaryServerInterceptor.
补充:
我们可以看看,grpc的拦截器的grpc.UnaryServerInterceptor一些有意思的地方.
grpc.UnaryServerInterceptor的附加理解
grpc在配置的时,中间件会有两个配置项
type serverOptions struct {
...
unaryInt UnaryServerInterceptor
chainUnaryInts []UnaryServerInterceptor
...
}
一个是单个的拦截器,一个是拦截器列表.那么最终server怎么把这两个配置项目组合起来呢?采用的是一个比较巧妙的用法,我们先看拦截器的签名:
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
我们可以看到,在grpc-server里面,调用拦截器的只有一个地方
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
...
//在最后执行handler的时候,把server的配置项unaryInt传了进去,并没有考虑chainUnaryInts
reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
...
}
而我们可以回头看到,是在NewServer的时候,把多个拦截器组合起来:
// chainUnaryServerInterceptors chains all unary server interceptors into one.
func chainUnaryServerInterceptors(s *Server) {
// Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
// be executed before any other chained interc
interceptors := s.opts.chainUnaryInts
if s.opts.unaryInt != nil {
interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
}
var chainedInt UnaryServerInterceptor
if len(interceptors) == 0 {
chainedInt = nil
} else if len(interceptors) == 1 {
chainedInt = interceptors[0]
} else {
chainedInt = chainUnaryInterceptors(interceptors)
}
s.opts.unaryInt = chainedInt
}
// 关键函数
// 利用了闭包和递归调用的特性
func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
// the struct ensures the variables are allocated together, rather than separately, since we
// know they should be garbage collected together. This saves 1 allocation and decreases
// time/call by about 10% on the microbenchmark.
// 首先我们可以想到,把多个UnaryServerInterceptor组合起来的关键是,要把前一个UnaryServerInterceptor转化成handler
// 所以state.next方法就是一个handler类型的函数,state.i是当前执行UnaryServerInterceptor所在的索引项
var state struct {
// 注意,i初始值为0
i int
next UnaryHandler
}
state.next = func(ctx context.Context, req interface{}) (interface{}, error) {
if state.i == len(interceptors)-1 {
return interceptors[state.i](ctx, req, info, handler)
}
// 丛外到内开始递归调用
state.i++
return interceptors[state.i-1](ctx, req, info, state.next)
}
return state.next(ctx, req)
}
}
上面这个函数看起来有可能一时之间比较难懂,我们可以看看一种写法
// ChainUnaryServer creates a single interceptor out of a chain of many interceptors.
//
// Execution is done in left-to-right order, including passing of context.
// For example ChainUnaryServer(one, two, three) will execute one before two before three, and three
// will see context changes of one and two.
// 这种看起来好明白一点,简单来说也是利用了闭包的特性,用for循环代替了递归调用的那段过程
func ChainUnaryServer(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor {
n := len(interceptors)
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
chainer := func(currentInter grpc.UnaryServerInterceptor, currentHandler grpc.UnaryHandler) grpc.UnaryHandler {
return func(currentCtx context.Context, currentReq interface{}) (interface{}, error) {
return currentInter(currentCtx, currentReq, info, currentHandler)
}
}
chainedHandler := handler
for i := n - 1; i >= 0; i-- {
chainedHandler = chainer(interceptors[i], chainedHandler)
}
return chainedHandler(ctx, req)
}
}
但无论如何,go-grpc-server的多个中间件始终都要组合成一个来调用的.其原理说也简单,本质上还是一个handler的嵌套.
转载自:https://juejin.cn/post/7142489716880277535