likes
comments
collection
share

使用Golang和GRPC实现TCC分布式事务(上)

作者站长头像
站长
· 阅读数 10

背景

最近在了解分布式事务,2PC、3PC、XA、TCC、SAGA等等很多概念实在有点难记,为了加深印象和理解,自己实现了一个分布式事务框架,TCC事务模式。现在总结出一个教程,分享给大家。

这是整个项目的代码 tcc-toy,如果你觉得这个项目对你有帮助,欢迎给项目点个star,你的支持是我前进的动力!👏

前言

DTM是go语言实现的一个分布式事务管理服务。具有极易接入、跨语言、使用简单、易部署、易扩展等特点。在学习分布式事务的时候,这个框架给了我很多帮助,里面有介绍各种分布式事务模式等等,大家有兴趣可以去看一下。

分布式事务的概念和基础知识网上有很多,这里就不做详细介绍了,这里着重介绍一下TCC。

概念

TCC有几个比较重要的概念,分别是三个阶段三个角色

三个阶段

这里借用一下DTM对于TCC的描述,用来解释三个阶段:

什么是TCC,TCC是Try、Confirm、Cancel三个词语的缩写,最早是由 Pat Helland 于 2007 年发表的一篇名为《Life beyond Distributed Transactions:an Apostate’s Opinion》的论文提出。

TCC分为3个阶段

  • Try 阶段:尝试执行,完成所有业务检查(一致性), 预留必须业务资源(准隔离性)
  • Confirm 阶段:如果所有分支的Try都成功了,则走到Confirm阶段。Confirm真正执行业务,不作任何业务检查,只使用 Try 阶段预留的业务资源
  • Cancel 阶段:如果所有分支的Try有一个失败了,则走到Cancel阶段。Cancel释放 Try 阶段预留的业务资源。

如果我们要进行一个类似于银行跨行转账的业务,转出(TransOut)和转入(TransIn)分别在不同的微服务里,一个成功完成的TCC事务典型的时序图如下:

使用Golang和GRPC实现TCC分布式事务(上)

三个角色

角色简称描述
ApplicationAP主程序,发起事务的进程
Resource ManagerRM资源管理器,在银行跨行转账的业务中,RM就是管理金额的进程
Transaction ManagerTM事务管理器,负责管理整个事务的流程,负责发起Commit或者Canel请求

架构

在DTM中,TM是一个服务独立于AP和RM,而AP和RM则是以工具集的形式嵌入业务代码里面:

使用Golang和GRPC实现TCC分布式事务(上)

我这里为了方便实现,设计成了Brokerless的网络架构,如下图

使用Golang和GRPC实现TCC分布式事务(上)

因为我是刚开始学习这个,所以不太了解其他其他语言的实现是什么样的,会不会也有这样的实现。目前这个实现比较大的问题有如下这些:

  • 业务代码的服务宕机,相当于TM和AP同时出现问题,如果是第二个阶段,RM会一直等待下去,直到业务系统上线。TM和业务系统合在一起会比单独一个服务时有更大的发生故障的概率
  • 宕机后不能做到重新选举TM的操作,只能等待原服务上线
  • 待补充

优点也是有的:方便实现,减少部署成本

但是目前这个肯定是只适合作为一个玩具来玩玩的,切勿线上使用,线上可以考虑DTM、Seata等已经充分经历过考验的框架。

实现

考虑到TCC是跨服务的事务模式,所以网络通讯这部分希望是开源的并且大部分语言都有实现的,例如HTTP+Json、GRPC等。考虑到性能方面,GRPC大部分语言都有实现,并且性能还可以,所以选择了GRPC。

协议

首先在项目根目录建立一个pb文件夹,存放我们的protobuf文档。

pb/tcc.proto:

syntax = "proto3";

option go_package = "github.com/if-nil/tcc-toy/pb";

package pb;

// 定义tcc的资源管理器
service ResourceManager {
  rpc Try (TryRequest) returns (TryReply) {}
  rpc Commit (CommitRequest) returns (CommitReply) {}
  rpc Cancel (CancelRequest) returns (CancelReply) {}
}

message TryRequest {
  string xid = 1;
  string param = 2;
}

message TryReply {
  string message = 1;
}

message CommitRequest {
  string xid = 1;
  string param = 2;
}

message CommitReply {
  string message = 1;
}

message CancelRequest {
  string xid = 1;
  string param = 2;
}

message CancelReply {
  string message = 1;
}

首先定义了三个接口,分别代表了TCC的三个阶段,这里面的每个接口都会在所属阶段的时间里,由TM/AP往RM发送。然后生成go的客户端和服务端代码。在生成代码之前需要先根据GRPC官网的quickstart安装所需的工具,包括proto编译和go代码生成工具。

生成代码:

cd pb
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative tcc.proto

这是生成后的文件

使用Golang和GRPC实现TCC分布式事务(上)

RM(资源管理器)

开始实现RM:

resource_manager/manager.go:

type Manager struct {
    port         int
    address      string       // 服务地址
    uuid         string       // 唯一id,用来分辨服务器
    pb.ResourceManagerServer  // RM的Interface
}

type Option func(*Manager)

func WithPort(port int) Option {
    return func(m *Manager) {
       m.port = port
    }
}

func WithAddress(address string) Option {
    return func(m *Manager) {
       m.address = address
    }
}

func New(server pb.ResourceManagerServer, opts ...Option) *Manager {
    m := &Manager{
       port:                  9020,
       address:               "localhost",
       uuid:                  uuid.New().String(),
       ResourceManagerServer: server,
    }
    for _, opt := range opts {
       opt(m)
    }
    return m
}

func (m *Manager) Run() error {
    lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", m.address, m.port))
    if err != nil {
       return fmt.Errorf("failed to listen: %v", err)
    }
    s := grpc.NewServer()
    pb.RegisterResourceManagerServer(s, m)
    if err := s.Serve(lis); err != nil {
       return fmt.Errorf("failed to serve: %v", err)
    }
    return nil
}

代码里定义了一个Manager结构体,包含了上面proto文件里面定义的服务接口,然后利用选项模式来修改配置,还包括了一个Run方法,用来启动GRPC服务。

由于RM的具体实现还跟我们的实际业务有关,例如金融相关的就是管理金额,活动相关的就是活动名额之类的。所以接口需要在实际的业务编写的时候去实现,我们的框架提供了以上的功能的话,其实可以说已经完成了一大半了。

但是假如接口实现的过程中,不做任何的处理的话,会出现以下的问题:

一般情况下,一个TCC回滚时的执行顺序是,先执行完Try,再执行Cancel,但是由于N,则有可能Try的网络延迟大,导致先执行Cancel,再执行Try。

这种情况就引入了分布式事务中的两个难题:

  • 空补偿:  Cancel执行时,Try未执行,事务分支的Cancel操作需要判断出Try未执行,这时需要忽略Cancel中的业务数据更新,直接返回
  • 悬挂:  Try执行时,Cancel已执行完成,事务分支的Try操作需要判断出Cancel已执行,这时需要忽略Try中的业务数据更新,直接返回

分布式事务还有一类需要处理的常见问题,就是重复请求

  • 幂等:  由于任何一个请求都可能出现网络异常,出现重复请求,所有的分布式事务分支操作,都需要保证幂等性

具体的可以查看DTM文档:异常与子事务屏障

这个如果留给业务方处理的话难免会出现纰漏,所以这个还是由框架来做,然后提供mod给业务方调用。

这里参考DTM的子事务屏障,利用redis lua脚本的串行执行的特性来实现幂等、防止空补偿和悬挂:

-- commit 脚本
local key = KEYS[1]
local oldValue = ARGV[1]
local newValue = ARGV[2]

if redis.call('exists', key) == 1 and redis.call('get', key) == oldValue then
  redis.call('set', key, newValue)
  return 0
else
  return 1
end
-- cancel 脚本
local key = KEYS[1]
local oldValue = ARGV[1]
local newValue = ARGV[2]

if redis.call('exists', key) == 0 then
  return 0
elseif redis.call('get', key) == oldValue then
  redis.call('set', key, newValue)
  return 1
else
  return 2
end
func (m *Manager) Try(ctx context.Context, req *pb.TryRequest) (*pb.TryReply, error) {
    key := fmt.Sprintf("xid:%s:%s", m.uuid, req.Xid)
    isSet, err := m.rdb.SetNX(ctx, key, tried, redis.KeepTTL).Result() // 幂等防悬挂,判断事务是否已经处理了
    if err != nil {
       return nil, status.Errorf(codes.Internal, "failed to setnx: %v", err)
    }
    if !isSet {
       return nil, status.Errorf(codes.AlreadyExists, "xid %s already exists", req.Xid)
    }
    return m.ResourceManagerServer.Try(ctx, req)
}

func (m *Manager) Commit(ctx context.Context, req *pb.CommitRequest) (*pb.CommitReply, error) {
    key := fmt.Sprintf("xid:%s:%s", m.uuid, req.Xid)
    res, err := m.commitScript.Run(ctx, m.rdb, []string{key}, tried, committed).Result()
    if err != nil {
       return nil, status.Errorf(codes.Internal, "failed to run commit script: %v", err)
    }
    if res.(int64) != 0 { 
       return &pb.CommitReply{
          Message: fmt.Sprintf("xid %s already committed", req.Xid),
       }, nil
    }
    return m.ResourceManagerServer.Commit(ctx, req)
}

func (m *Manager) Cancel(ctx context.Context, req *pb.CancelRequest) (*pb.CancelReply, error) {
    key := fmt.Sprintf("xid:%s:%s", m.uuid, req.Xid)
    res, err := m.cancelScript.Run(ctx, m.rdb, []string{key}, tried, cancelled).Result() // redis lua脚本,用来判断是否发生了空补偿
    if err != nil {
       return nil, status.Errorf(codes.Internal, "failed to run commit script: %v", err)
    }
    switch res.(int64) { // 空补偿
    case 0:
       return &pb.CancelReply{Message: "tx is not exist"}, nil
    case 1:
       return m.ResourceManagerServer.Cancel(ctx, req)
    case 2:
       return &pb.CancelReply{
          Message: fmt.Sprintf("xid %s already committed", req.Xid),
       }, nil
    }
    return nil, nil
}

我们给Maneger增加了三个方法,分别是Try、Commit、Cancel,当我们代码运行的时候,是先运行我们自己实现的这三个方法,再执行业务方的代码。当我们的代码发现问题时,直接做处理并返回给AP/TM,就不会再往业务方代码传递了。

总结

我们到这里就实现了通讯协议和RM的实现了。本来想着一篇文章就搞定了整个教程了,没想到写着写着就这么晚了,于是这篇就变成了上集。下次一定补上下一集

未完待续。。。