• 还在篮子里

    grpc 源码结构详解 · 语雀

    grpc 源码结构详解

    DialOptions

    DialOptions 是最重要的一环,负责配置每一次 rpc 请求的时候的一应选择。

     

    结构

    先来看看这个的结构

    链接

    // dialOptions configure a Dial call. dialOptions are set by the DialOption
    // values passed to Dial.
    type dialOptions struct {
    	unaryInt  UnaryClientInterceptor
    	streamInt StreamClientInterceptor
    
    	chainUnaryInts  []UnaryClientInterceptor
    	chainStreamInts []StreamClientInterceptor
    
    	cp          Compressor
    	dc          Decompressor
    	bs          backoff.Strategy
    	block       bool
    	insecure    bool
    	timeout     time.Duration
    	scChan      <-chan ServiceConfig
    	authority   string
    	copts       transport.ConnectOptions
    	callOptions []CallOption
    	// This is used by v1 balancer dial option WithBalancer to support v1
    	// balancer, and also by WithBalancerName dial option.
    	balancerBuilder balancer.Builder
    	// This is to support grpclb.
    	resolverBuilder             resolver.Builder
    	channelzParentID            int64
    	disableServiceConfig        bool
    	disableRetry                bool
    	disableHealthCheck          bool
    	healthCheckFunc             internal.HealthChecker
    	minConnectTimeout           func() time.Duration
    	defaultServiceConfig        *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.
    	defaultServiceConfigRawJSON *string
    }

    由于命名非常规范,加上注释很容易看懂每一个 field 配置的哪一条属性。如果掠过看的 大概有 压缩解压器,超时阻塞设置,认证安全转发,负载均衡,服务持久化的信息存储 ,配置,心跳检测等。

     

    其一应函数方法都是设置 其中字段的。

     

    如何设置

    这里是 grpc 设计较好的地方,通过函数设置,同时设有生成函数的函数。什么意思呢?首先结合图来理解,这也是整个 grpc 设置的精华部分

     

    grpc-setOperation.svg

    这里的意思是 , DialOptions 是一个导出接口,实现函数是 apply 同时接受参数 dialOptions 来修改它。

    而实际上,是使用 newFuncDialOption 函数包装一个 修改 dialOptions 的方法给 funcDialOption 结构体,在实际 Dial 调用的时候 是使用闭包 调用 funcDialOption 结构体的 apply 方法。

    可以在这里看一下 Dial 方法的源码(Dial 调用的是 DialContext

    起作用的就是 opt.apply()

    func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
    	cc := &ClientConn{
    		target:            target,
    		csMgr:             &connectivityStateManager{},
    		conns:             make(map[*addrConn]struct{}),
    		dopts:             defaultDialOptions(),
    		blockingpicker:    newPickerWrapper(),
    		czData:            new(channelzData),
    		firstResolveEvent: grpcsync.NewEvent(),
    	}
    	···
    	for _, opt := range opts {
    		opt.apply(&cc.dopts)
    	}
        ···
    }

     

    这里的 options 可以说是 client 发起 rpc 请求的核心中转站。

    另一个重要的接口,同时也集中在 dialOptions 结构体中初始化处理的是

    callOptions []CallOption

     

    CallOption

    CallOption 是一个接口,定义在 rpc_util 包内

     

    结构

    // CallOption configures a Call before it starts or extracts information from
    // a Call after it completes.
    type CallOption interface {
    	// before is called before the call is sent to any server.  If before
    	// returns a non-nil error, the RPC fails with that error.
    	before(*callInfo) error
    
    	// after is called after the call has completed.  after cannot return an
    	// error, so any failures should be reported via output parameters.
    	after(*callInfo)
    }

     

    操作的是 callInfo 结构里的数据,其被包含在 dialOptions  结构体中,

    即每一次 dial 的时候进行调用。

     

    callInfo

    同时它自身定义很有意思,操作的是 callInfo  结构体

    // callInfo contains all related configuration and information about an RPC.
    type callInfo struct {
    	compressorType        string
    	failFast              bool
    	stream                ClientStream
    	maxReceiveMessageSize *int
    	maxSendMessageSize    *int
    	creds                 credentials.PerRPCCredentials
    	contentSubtype        string
    	codec                 baseCodec
    	maxRetryRPCBufferSize int
    }

    可以看到 callInfo 中字段用来表示 单次调用中独有的自定义选项如 压缩,流控,认证,编解码器等。

     

    一个实现

    简单看一个 CallOption 接口的实现

    // Header returns a CallOptions that retrieves the header metadata
    // for a unary RPC.
    func Header(md *metadata.MD) CallOption {
    	return HeaderCallOption{HeaderAddr: md}
    }
    
    // HeaderCallOption is a CallOption for collecting response header metadata.
    // The metadata field will be populated *after* the RPC completes.
    // This is an EXPERIMENTAL API.
    type HeaderCallOption struct {
    	HeaderAddr *metadata.MD
    }
    
    func (o HeaderCallOption) before(c *callInfo) error { return nil }
    func (o HeaderCallOption) after(c *callInfo) {
    	if c.stream != nil {
    		*o.HeaderAddr, _ = c.stream.Header()
    	}
    }

    重点看到,实际操作是在 before 和 after 方法中执行,它们会在 Client 发起请求的时候自动执行,顾名思义,一个在调用前执行,一个在调用后执行。

     

    实现注意

    这里可以看出,这里也是通过函数返回一个拥有这两个方法的结构体,注意这一个设计,可以作为你自己的 Option 设计的时候的参考。

     

    两种方法

    有两种方法让 Client 接受你的 CallOption 设置

    1. 在 Client 使用方法的时候直接作为 参数传递,将刚才所说的函数-返回一个实现了 CallOption 接口的结构体。
    2. 在 生成 Client 的时候就传递设置。具体如下
    1. 通过 dialOptions.go 中的 函数 grpc.WithDefaultCallOptions()
    2. 这个函数会将 CallOption 设置到 dialOptions 中的字段 []CallOption 中。
    // WithDefaultCallOptions returns a DialOption which sets the default
    // CallOptions for calls over the connection.
    func WithDefaultCallOptions(cos ...CallOption) DialOption {
    	return newFuncDialOption(func(o *dialOptions) {
    		o.callOptions = append(o.callOptions, cos...)
    	})
    }

     

    有没有感觉有点不好理解?给你们一个实例

    1. 使用的第一种方法
    response, err := myclient.MyCall(ctx, request, grpc.CallContentSubtype("mycodec"))
    1. 使用第二种方法
    myclient := grpc.Dial(ctx, target, grpc.WithDefaultCallOptions(grpc.CallContentSubtype("mycodec")))

    这里假设 我们设置了一个 mycodec 的译码器。马上下面解释它的设计。

     

    值得注意的是, 我好像只提到了在 Client 调用时设置,callOption  只在客户端设置的情况是不是让大家感到困惑。

    实际上 gRPC server 端会自动检测 callOption 的设置,并检测自己是否支持此项选择,如果不支持则会返回失败。也就是说,在 Server 端注册的所有 Codec 译码器之后,Client 直接使用相应的设置就好了。

     

    Codec

    在 gRPC 中 Codec 有两个接口定义,一个是 baseCodec 包含正常的 Marshal 和 Unmarshal 方法,另一个是拥有名字的 Codec 定义在 encoding 包内,这是由于在注册 registry 的时候会使用到这个方法。

     

    接口

    type Codec interface {
    	// Marshal returns the wire format of v.
    	Marshal(v interface{}) ([]byte, error)
    	// Unmarshal parses the wire format into v.
    	Unmarshal(data []byte, v interface{}) error
    	// String returns the name of the Codec implementation.  This is unused by
    	// gRPC.
    	String() string
    }

    就是这个方法

    // RegisterCodec registers the provided Codec for use with all gRPC clients and
    // servers.
    //
    // The Codec will be stored and looked up by result of its Name() method, which
    // should match the content-subtype of the encoding handled by the Codec.  This
    // is case-insensitive, and is stored and looked up as lowercase.  If the
    // result of calling Name() is an empty string, RegisterCodec will panic. See
    // Content-Type on
    // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
    // more details.
    //
    // NOTE: this function must only be called during initialization time (i.e. in
    // an init() function), and is not thread-safe.  If multiple Compressors are
    // registered with the same name, the one registered last will take effect.
    func RegisterCodec(codec Codec) {
    	if codec == nil {
    		panic("cannot register a nil Codec")
    	}
    	if codec.Name() == "" {
    		panic("cannot register Codec with empty string result for Name()")
    	}
    	contentSubtype := strings.ToLower(codec.Name())
    	registeredCodecs[contentSubtype] = codec
    }

     

    Compressor

    同时 encoding 包中还定义了 Compressor 接口,参照 Codec 理解即可。

    // Compressor is used for compressing and decompressing when sending or
    // receiving messages.
    type Compressor interface {
    	// Compress writes the data written to wc to w after compressing it.  If an
    	// error occurs while initializing the compressor, that error is returned
    	// instead.
    	Compress(w io.Writer) (io.WriteCloser, error)
    	// Decompress reads data from r, decompresses it, and provides the
    	// uncompressed data via the returned io.Reader.  If an error occurs while
    	// initializing the decompressor, that error is returned instead.
    	Decompress(r io.Reader) (io.Reader, error)
    	// Name is the name of the compression codec and is used to set the content
    	// coding header.  The result must be static; the result cannot change
    	// between calls.
    	Name() string
    }
    

     

    MetaData

    这个包对应 context 中的 Value field 也就是 key-value 形式的存储

     

    在其他包中简写是 MD

     

    结构

    type MD map[string][]string

     

    函数

    实现了完善的存储功能,从单一读写到批量(采用 pair 模式,…string 作为参数,len(string)%2==1 时会报错,由于会有孤立的没有配对的元信息。

     

    另外几个函数是实现了从 context 中的读取和写入(这里的写入是 使用 context.WithValue 方法,即生成 parent context 的 copy。

     

    注意⚠️

    • 值得注意的是,在 MetaData 结构体中, value 的结构是 []string 。
    • 同时 key 不可以以 “grpc-” 开头,这是因为在 grpc 的 internal 包中已经保留了。
    • 更为重要的是 在 context 中的读取方式,其实是 MetaData 结构对应的是 context Value 中的 value 值,而 key 值设为 一个空结构体同时区分输入输入
    • type mdIncomingKey struct{}
    • type mdOutgoingKey struct{}

     

    Refer

    gRPC 官方也有 MetaData 的解释

    来源: grpc 源码结构详解 · 语雀

  • 还在篮子里

    做事最忌认真 · 语雀

    做事最忌认真

    小时候的小聪明

    小时看的书多,名人们写的家书、鼓励励志的故事、一些方法论中常常会嘱咐认真做事

    初时奉之若宝,不过口头常挂。儿童心性本就如此。于是就念叨,要认真要认真,然后一有可以偷懒的间隙,必然紧紧抓住,仿佛这才是宝藏一般。

    渐渐尝到甜头,觉得能够轻松糊弄过去的东西就不去做了。模模糊糊的做事情,就模模糊糊的丧失了准确性。然后开始对认真嗤之以鼻了。认为认真是傻头傻脑的人才去做。老实人容易被欺负,不如耍耍小聪明,落得个轻松自在,还能把周围伙伴玩的团团转。

    儿时教育到时常会说,不认真骗的是自己。可叫儿童只顾玩时耍乐,那还信这认真踏实的道理?故直到现在,才能知道认真才是真正的答卷。

    人的价值 – 留下印记

    人这一生应该留下些什么东西,有什么是能经得住时间考验的呢?多少的历史应该回答了你,人心。倾注了人的情感的东西,会流传下来,或许会被后人歪曲,但它至少活着。

    不考虑身后的事,在生活中,但凡认真做的事,没有一件说出来不是理直气壮的。各位可以想想,当自己认真学习了之后考试分数依旧不够理想,初时和父母争辩的时候是否会有一种我已经做到最好的感觉。

    这里谈到做到最好的小哲学问题,我认为人做事做到最好,就像是瓶子里的水全部倒出来,用来填满某一件事,最后你倒到一滴都不剩,自然认为自己做到最好了。但是当这种情况发生的时候,你就渐渐发现,你会换一个更大的瓶子装水了。

    —不超越自己旧的极限,就无法到达新的境界。(等真的无法再进一步的时候,再说我明白了人类是有极限的,jojo!)

    做事一旦真的认真,不仅是自己认可它,最重要的是做这件事的经历你会愿意留下来。

    想想自己出去玩的场景,或者自己想去学习的一些兴趣,如果是在其中认真做事,是否想起来会有一种珍惜之感呢?

    你愿意留下来的,就是丰富你人生的。

    认真带来的超越

    最重要的是,做事认真,很难得有做的不比别人好的。因为人本身的惰性,单单认真你就能超出很多水准,别人可能遇到困难,随意糊弄就走了,进一步的风景都没看到,只能吃不到葡萄空凭臆想了。而你选择了直面而不是虚晃一枪。已经能在环境中脱颖而出。

    所以会说,成功的人做什么事都成功。因为他拥有成功的特质。(当然成功其实很难定义)

    我们用成事代替。

    现在需要成事,不合作不可能,和人合作最重要的问题就是信任。

    单单凭这认真二字,你能获得的信任就不可估量。

    不难理解小时看的书说,精诚所至,金石为开 了 ;这里有人会觉得有幸存者偏差,认为别人做事做好了,是因为恰好成事了之后被人们所知,亦或者恰好有其他因素。

    我想说幸存者偏差人家是 较低概率,而认真做事之后成事总是较高概率,难道这也应该归在幸存者上吗?不过是不去做事的人一块遮羞布罢了。

    注重长期主义的复利

    小的事认真对待,自然大事可以做到认真。

    因为认真有一条对人自身很有利的属性,在上文也提到了。就是积累。

    认真做的事,每一件事都是积累。

    像所谓千里之行;合抱之木;锲而不舍;所讲都是积累的力量。

    有些孩子会想,我从小学了五六年英语,为什么我英语依旧不行。难道我们同龄人上的课差距很大吗?那为什么同校,同班的人也差距这么大。

    关键就在认真二字。对待的仔细了,就是实打实的积累,才会顾及多方面,在英语上可能就是 听说读写。对待的仔细了,就知道自己想要的是什么,针对性的覆盖,就会增强积累相应的能力。

    许多人后悔当初自己没有好好学习,或许应该想想,自己当初是否对事认真,到最后对自己认真,做到对自己负责。或许他们有些人会羞愧难当。可惜,救不了。如果羞愧有用,时间的历史不知要改写多少。

    “聪明人下笨功夫”

    要知道什么地方,要长期的增长,要怎么对的起自己。

    认真倾注生命 – 对于代码或者更多的事

    我是个程序员,清楚一件事。

    代码是永远留存不下来的,它像生命一样有一个生命周期。

    它的生命周期与创作者的能力、时势和倾注程度正相关。同时又符合着 规模的 3/4 幂律(一条关于规模缩放的数学定理)

    认真就像在代码中可调控的生命力。你灌注的越多,它活得越久(它有价值的时间越久)

    更新

    加了小标题,和我对代码的想法(还有小链接)