还在篮子里

grpc 源码结构详解 · 语雀

grpc 源码结构详解

DialOptions

DialOptions 是最重要的一环,负责配置每一次 rpc 请求的时候的一应选择。

结构

先来看看这个的结构

链接

 1// dialOptions configure a Dial call. dialOptions are set by the DialOption
 2// values passed to Dial.
 3type dialOptions struct {
 4    unaryInt  UnaryClientInterceptor
 5    streamInt StreamClientInterceptor
 6
 7    chainUnaryInts  []UnaryClientInterceptor
 8    chainStreamInts []StreamClientInterceptor
 9
10    cp          Compressor
11    dc          Decompressor
12    bs          backoff.Strategy
13    block       bool
14    insecure    bool
15    timeout     time.Duration
16    scChan      <-chan ServiceConfig
17    authority   string
18    copts       transport.ConnectOptions
19    callOptions []CallOption
20    // This is used by v1 balancer dial option WithBalancer to support v1
21    // balancer, and also by WithBalancerName dial option.
22    balancerBuilder balancer.Builder
23    // This is to support grpclb.
24    resolverBuilder             resolver.Builder
25    channelzParentID            int64
26    disableServiceConfig        bool
27    disableRetry                bool
28    disableHealthCheck          bool
29    healthCheckFunc             internal.HealthChecker
30    minConnectTimeout           func() time.Duration
31    defaultServiceConfig        *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.
32    defaultServiceConfigRawJSON *string
33}

由于命名非常规范,加上注释很容易看懂每一个 field 配置的哪一条属性。如果掠过看的 大概有 压缩解压器,超时阻塞设置,认证安全转发,负载均衡,服务持久化的信息存储 ,配置,心跳检测等。

其一应函数方法都是设置 其中字段的。

如何设置

这里是 grpc 设计较好的地方,通过函数设置,同时设有生成函数的函数。什么意思呢?首先结合图来理解,这也是整个 grpc 设置的精华部分

grpc-setOperation.svg

这里的意思是 , DialOptions 是一个导出接口,实现函数是 apply 同时接受参数 dialOptions 来修改它。

而实际上,是使用 newFuncDialOption 函数包装一个 修改 dialOptions 的方法给 funcDialOption 结构体,在实际 Dial 调用的时候 是使用闭包 调用 funcDialOption 结构体的 apply 方法。

可以在这里看一下 Dial 方法的源码(Dial 调用的是 DialContext

起作用的就是 opt.apply()

 1func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
 2    cc := &ClientConn{
 3        target:            target,
 4        csMgr:             &connectivityStateManager{},
 5        conns:             make(map[*addrConn]struct{}),
 6        dopts:             defaultDialOptions(),
 7        blockingpicker:    newPickerWrapper(),
 8        czData:            new(channelzData),
 9        firstResolveEvent: grpcsync.NewEvent(),
10    }
11    ···
12    for _, opt := range opts {
13        opt.apply(&cc.dopts)
14    }
15    ···
16}

这里的 options 可以说是 client 发起 rpc 请求的核心中转站。

另一个重要的接口,同时也集中在 dialOptions 结构体中初始化处理的是

callOptions []CallOption

CallOption

CallOption 是一个接口,定义在 rpc_util 包内

结构

 1// CallOption configures a Call before it starts or extracts information from
 2// a Call after it completes.
 3type CallOption interface {
 4    // before is called before the call is sent to any server.  If before
 5    // returns a non-nil error, the RPC fails with that error.
 6    before(*callInfo) error
 7
 8    // after is called after the call has completed.  after cannot return an
 9    // error, so any failures should be reported via output parameters.
10    after(*callInfo)
11}

操作的是 callInfo 结构里的数据,其被包含在 dialOptions  结构体中,

即每一次 dial 的时候进行调用。

callInfo

同时它自身定义很有意思,操作的是 callInfo  结构体

 1// callInfo contains all related configuration and information about an RPC.
 2type callInfo struct {
 3    compressorType        string
 4    failFast              bool
 5    stream                ClientStream
 6    maxReceiveMessageSize *int
 7    maxSendMessageSize    *int
 8    creds                 credentials.PerRPCCredentials
 9    contentSubtype        string
10    codec                 baseCodec
11    maxRetryRPCBufferSize int
12}

可以看到 callInfo 中字段用来表示 单次调用中独有的自定义选项如 压缩,流控,认证,编解码器等。

一个实现

简单看一个 CallOption 接口的实现

 1// Header returns a CallOptions that retrieves the header metadata
 2// for a unary RPC.
 3func Header(md *metadata.MD) CallOption {
 4    return HeaderCallOption{HeaderAddr: md}
 5}
 6
 7// HeaderCallOption is a CallOption for collecting response header metadata.
 8// The metadata field will be populated *after* the RPC completes.
 9// This is an EXPERIMENTAL API.
10type HeaderCallOption struct {
11    HeaderAddr *metadata.MD
12}
13
14func (o HeaderCallOption) before(c *callInfo) error { return nil }
15func (o HeaderCallOption) after(c *callInfo) {
16    if c.stream != nil {
17        *o.HeaderAddr, _ = c.stream.Header()
18    }
19}

重点看到,实际操作是在 before 和 after 方法中执行,它们会在 Client 发起请求的时候自动执行,顾名思义,一个在调用前执行,一个在调用后执行。

实现注意

这里可以看出,这里也是通过函数返回一个拥有这两个方法的结构体,注意这一个设计,可以作为你自己的 Option 设计的时候的参考。

两种方法

有两种方法让 Client 接受你的 CallOption 设置

  1. 在 Client 使用方法的时候直接作为 参数传递,将刚才所说的函数-返回一个实现了 CallOption 接口的结构体。
  2. 在 生成 Client 的时候就传递设置。具体如下
    1. 通过 dialOptions.go 中的 函数 grpc.WithDefaultCallOptions()
    2. 这个函数会将 CallOption 设置到 dialOptions 中的字段 []CallOption 中。
1// WithDefaultCallOptions returns a DialOption which sets the default
2// CallOptions for calls over the connection.
3func WithDefaultCallOptions(cos ...CallOption) DialOption {
4    return newFuncDialOption(func(o *dialOptions) {
5        o.callOptions = append(o.callOptions, cos...)
6    })
7}

有没有感觉有点不好理解?给你们一个实例

  1. 使用的第一种方法
1response, err := myclient.MyCall(ctx, request, grpc.CallContentSubtype("mycodec"))
  1. 使用第二种方法
1myclient := grpc.Dial(ctx, target, grpc.WithDefaultCallOptions(grpc.CallContentSubtype("mycodec")))

这里假设 我们设置了一个 mycodec 的译码器。马上下面解释它的设计。

值得注意的是, 我好像只提到了在 Client 调用时设置,callOption  只在客户端设置的情况是不是让大家感到困惑。

实际上 gRPC server 端会自动检测 callOption 的设置,并检测自己是否支持此项选择,如果不支持则会返回失败。也就是说,在 Server 端注册的所有 Codec 译码器之后,Client 直接使用相应的设置就好了。

Codec

在 gRPC 中 Codec 有两个接口定义,一个是 baseCodec 包含正常的 Marshal 和 Unmarshal 方法,另一个是拥有名字的 Codec 定义在 encoding 包内,这是由于在注册 registry 的时候会使用到这个方法。

接口

1type Codec interface {
2    // Marshal returns the wire format of v.
3    Marshal(v interface{}) ([]byte, error)
4    // Unmarshal parses the wire format into v.
5    Unmarshal(data []byte, v interface{}) error
6    // String returns the name of the Codec implementation.  This is unused by
7    // gRPC.
8    String() string
9}

就是这个方法

 1// RegisterCodec registers the provided Codec for use with all gRPC clients and
 2// servers.
 3//
 4// The Codec will be stored and looked up by result of its Name() method, which
 5// should match the content-subtype of the encoding handled by the Codec.  This
 6// is case-insensitive, and is stored and looked up as lowercase.  If the
 7// result of calling Name() is an empty string, RegisterCodec will panic. See
 8// Content-Type on
 9// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
10// more details.
11//
12// NOTE: this function must only be called during initialization time (i.e. in
13// an init() function), and is not thread-safe.  If multiple Compressors are
14// registered with the same name, the one registered last will take effect.
15func RegisterCodec(codec Codec) {
16    if codec == nil {
17        panic("cannot register a nil Codec")
18    }
19    if codec.Name() == "" {
20        panic("cannot register Codec with empty string result for Name()")
21    }
22    contentSubtype := strings.ToLower(codec.Name())
23    registeredCodecs[contentSubtype] = codec
24}

Compressor

同时 encoding 包中还定义了 Compressor 接口,参照 Codec 理解即可。

 1// Compressor is used for compressing and decompressing when sending or
 2// receiving messages.
 3type Compressor interface {
 4    // Compress writes the data written to wc to w after compressing it.  If an
 5    // error occurs while initializing the compressor, that error is returned
 6    // instead.
 7    Compress(w io.Writer) (io.WriteCloser, error)
 8    // Decompress reads data from r, decompresses it, and provides the
 9    // uncompressed data via the returned io.Reader.  If an error occurs while
10    // initializing the decompressor, that error is returned instead.
11    Decompress(r io.Reader) (io.Reader, error)
12    // Name is the name of the compression codec and is used to set the content
13    // coding header.  The result must be static; the result cannot change
14    // between calls.
15    Name() string
16}

MetaData

这个包对应 context 中的 Value field 也就是 key-value 形式的存储

在其他包中简写是 MD

结构

1type MD map[string][]string

函数

实现了完善的存储功能,从单一读写到批量(采用 pair 模式,…string 作为参数,len(string)%2==1 时会报错,由于会有孤立的没有配对的元信息。

另外几个函数是实现了从 context 中的读取和写入(这里的写入是 使用 context.WithValue 方法,即生成 parent context 的 copy。

注意⚠️

  • 值得注意的是,在 MetaData 结构体中, value 的结构是 []string 。
  • 同时 key 不可以以 “grpc-” 开头,这是因为在 grpc 的 internal 包中已经保留了。
  • 更为重要的是 在 context 中的读取方式,其实是 MetaData 结构对应的是 context Value 中的 value 值,而 key 值设为 一个空结构体同时区分输入输入
    • type mdIncomingKey struct{}
    • type mdOutgoingKey struct{}

Refer

gRPC 官方也有 MetaData 的解释

来源: grpc 源码结构详解 · 语雀

发表评论

电子邮件地址不会被公开。 必填项已用*标注