likes
comments
collection
share

go-sdk的最佳实践

作者站长头像
站长
· 阅读数 9

概要

Go作为一门简单的语言,拿来写命令行工具和sdk都具有清晰易懂的有点,本文选择了kafka的sdk sarama和ali的openAPI http sdk进行简单的展示,代表了tcp和http 2大最常用的通信协议如何在sdk中被设计使用。

sarama(kafka)

sarama是kafka的go-sdk,它支持sync和async的发送方式,其中async的producer充分利用了Go的channel机制,整体实现非常清晰,层次分明,其实是拿来学习Go语言使用的良好案例。

AsyncProducer 根据kafka的特点,将producer抽象为了多层,每一层专注自己的封装即可,例如

  1. AsyncProducer 本身是conf等管理能力的实体,没有过多的sdk能力,比如它的input chan就是无缓存式的;
  2. topicProducer 专注于topic层面的管理,它的input chan开始引入了缓存;
  3. partitionProducer 则起到了实际和kafaka server的交互,强耦合了kafka设计的通信协议;

通过学习asyncProducer,可以很容易的接受几个原则:

  1. sdk类具有统一的conf结构,将所有配置集中化,避免用户在查找配置项时被搞得云里雾里;
  2. Go的channel机制可以把异步功能实现得非常优雅;
  3. 强耦合通信协议的层次或许有利有弊,当中间件的架构发生更改时,sdk可能也会面临大的需改。

asyncProducer

type AsyncProducer interface {
	Input() chan<- *ProducerMessage
	Errors() <-chan *ProducerError
}

func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
	client, err := NewClient(addrs, conf)
	if err != nil {
		return nil, err
	}
	return newAsyncProducer(client)
}

func newAsyncProducer(client Client) (AsyncProducer, error) {
    p := &asyncProducer{
		client:          client,
		conf:            client.Config(),
		errors:          make(chan *ProducerError),
		input:           make(chan *ProducerMessage),
		successes:       make(chan *ProducerMessage),
		retries:         make(chan *ProducerMessage),
		brokers:         make(map[*Broker]*brokerProducer),
		brokerRefs:      make(map[*brokerProducer]int),
		txnmgr:          txnmgr,
		metricsRegistry: newCleanupRegistry(client.Config().MetricRegistry),
	}
	go withRecover(p.dispatcher)
	go withRecover(p.retryHandler)
}

func (p *asyncProducer) dispatcher() {
    handlers := make(map[string]chan<- *ProducerMessage)
    
    for msg := range p.input {
        handler := handlers[msg.Topic]
		if handler == nil {
			handler = p.newTopicProducer(msg.Topic)
			handlers[msg.Topic] = handler
		}

		handler <- msg
    }
}

topicProducer

func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
	input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
	tp := &topicProducer{
		parent:      p,
		topic:       topic,
		input:       input,
		breaker:     breaker.New(3, 1, 10*time.Second),
		handlers:    make(map[int32]chan<- *ProducerMessage),
		partitioner: p.conf.Producer.Partitioner(topic),
	}
	go withRecover(tp.dispatch)
	return input
}

func (tp *topicProducer) dispatch() {
	for msg := range tp.input {
		if msg.retries == 0 {
			if err := tp.partitionMessage(msg); err != nil {
				tp.parent.returnError(msg, err)
				continue
			}
		}

		handler := tp.handlers[msg.Partition]
		if handler == nil {
			handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
			tp.handlers[msg.Partition] = handler
		}

		handler <- msg
	}

	for _, handler := range tp.handlers {
		close(handler)
	}
}

partitionProducer

func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
	input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
	pp := &partitionProducer{
		parent:    p,
		topic:     topic,
		partition: partition,
		input:     input,

		breaker:    breaker.New(3, 1, 10*time.Second),
		retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
	}
	go withRecover(pp.dispatch)
	return input
}

func (pp *partitionProducer) dispatch() {
    pp.brokerProducer.input <- msg
}

alibaba-cloud-sdk

Reuqest

request相比response要复杂一些,除了基本的HTTP协议字段被拆分开,还加入了一些ali内部常用的协议字段,为每种产品的request提供了基础能力。

type baseRequest struct {
	Scheme         string
	Method         string
	Domain         string
	Port           string
	RegionId       string
	ReadTimeout    time.Duration
	ConnectTimeout time.Duration
	isInsecure     *bool

	userAgent map[string]string
	product   string
	version   string

	actionName string

	AcceptFormat string

	QueryParams map[string]string
	Headers     map[string]string
	FormParams  map[string]string
	Content     []byte

	locationServiceCode  string
	locationEndpointType string

	queries string

	stringToSign string

	span opentracing.Span
}

Response

HttpResponse的几大要素齐全了:状态码、头部、Body内容、底层Response

type BaseResponse struct {
	httpStatus         int
	httpHeaders        map[string][]string
	httpContentString  string
	httpContentBytes   []byte
	originHttpResponse *http.Response
}

client

HttpClient当然是原生httpClient的封装,当然它增加了很多必要的能力,例如proxy、元数据、opentracing等。

// Client the type Client
type Client struct {
	SourceIp        string
	SecureTransport string
	isInsecure      bool
	regionId        string
	config          *Config
	httpProxy       string
	httpsProxy      string
	noProxy         string
	logger          *Logger
	userAgent       map[string]string
	signer          auth.Signer
	httpClient      *http.Client
	asyncTaskQueue  chan func()
	readTimeout     time.Duration
	connectTimeout  time.Duration
	EndpointMap     map[string]string
	EndpointType    string
	Network         string
	Domain          string
	isOpenAsync     bool
	isCloseTrace    bool
	rootSpan        opentracing.Span
}

service

真正在sdk中提供接口的是各个service,比如常用的ecs就在service中。每个service中自定义的Client还是在于封装好OpenAPI,方便用户使用。

// Client is the sdk client struct, each func corresponds to an OpenAPI
type Client struct {
	sdk.Client
}

func (client *Client) CancelImagePipelineExecution(request *CancelImagePipelineExecutionRequest) (response *CancelImagePipelineExecutionResponse, err error) {
	response = CreateCancelImagePipelineExecutionResponse()
	err = client.DoAction(request, response)
	return
}