likes
comments
collection
share

Codis-proxy实现原理(Redis分布式解决方案1)

作者站长头像
站长
· 阅读数 13

1.概述

前段时间分析了redis高可用的解决方案,本文主要是想介绍下redis的分布式解决方案。掌握了高可用和分布式,能更好的支撑业务的长久发展。redis的分布式方案有很多,本文主要介绍codis的实现。redis官方的cluster暂时没有兴趣观摩。

2.Redis分布式的问题

  • 如何做数据分片
  • 如何保证高可用
  • 可扩展性(扩缩容问题)
  • 数据分片后,对于一些多key操作的命令,如何做兼容

1.其实数据分片主要有两个方案,一个是一致性hash,另一个则是虚拟桶预分片。Redis采用的是后者,具体原因后文有描述。当然本文介绍的也是虚拟桶预分片技术。

2.高可用无非就是主从复制,自动故障转移。这些redis集群本身完全可以保证,我们要做的就是如何去做集成。还有就是如果引入proxy,怎么保证proxy的一致性和高可用。

3.扩缩容和分片方案有着密切关系。

2.一致性hash和虚拟预分片解决方案

一致性hash算法

一致性hash是一个比较不错的负载均衡算法,但也有缺点,比如增加和减少节点需要重新计算节点,之前的数据分配会有影响(原本属于该节点的数据被分配到其他节点),因此扩缩容数据迁移复杂。

Codis-proxy实现原理(Redis分布式解决方案1)

扩容操作:比如在A、B、C、D集群中加入E节点。原本落在C节点的x数据需要迁移到E节点。

虚拟桶预分片算法

引入虚拟桶。桶(slot)和节点进行映射。对key进行hash得到slot,然后查映射表,我们就可以找到这个key对应的数据在哪个节点。只要做到slot和节点之间的映射是均匀即可。该方法可以动态分配slot和节点的映射关系,但是会存在数据分配不均匀的问题。比如大多数数据会聚集在一个slot。典型就是redis分布式场景通过hashtag实现lua脚本。

优点: Key对应的slot不会因为扩缩容而变化。 扩容过程中,给新增节点分配对应范围的slot即可。

因此扩所容也就是slot的迁移而已。至于同步迁移还是异步迁移,这与业务相关

sofa-registy

虽然之前看到sofa-registy采用的也是一致性hash的实现数据分片,据说历史原因。而且对于服务注册发现的场景,是可以通过服务提供方定时注册补偿数据,并且短暂不可用对于系统影响很小。

CRedis

携程的CRedis采用了一致性hash。当然也遇到了水平扩容问题。CRedis的解决方案:不改变原有的hash环。而是进行二次hash,构造出一个树形结构。举个例子:假设data1会hash到A节点。当A节点数据量大,需要扩容的时候,他们会将A节点拆成3个节点。扩容期间三个新节点会拥有全量数据。然后根据key的命中规则从新节点清理掉不属于自己的数据。这样就可以实现无缝扩容。

扩容后我们访问data1数据,先进行一次hash,路由到原有A,然后再进行hash,路由到对应节点。

Codis-proxy实现原理(Redis分布式解决方案1) 缩容过程就不说了,具体参考dbaplus.cn/news-158-36…

Codis为什么采用虚拟桶预分片?

主要就是历史原因,因为redis原生实现就是虚拟桶预分片技术。其次就要谈这两个算法的优缺点了。

对于一致性hash,通过虚拟节点可以保证数据负载均衡,但是对于新增和删除节点,需要重新结算节点,并且原有节点的数据也需要做迁移,这个过程比较复杂。但是对于redis,水平扩缩容是很频繁的。 并且一致性hash范围查询性能很低,因为多个key,数据可能会被分配在不同的分片。

分布式缓存场景,使用一致性hash更合理。

虚拟桶预分片是取模和一致性hash的折中。可认为动态调整均匀分配slot,对于slot的迁移,redis也是有同步迁移和异步迁移支持。

3.Codis架构实现

codis是一个分布式的redis服务。

codis实现非常简单,中间加了一层无状态的代理,分布式逻辑都是写在这层代理上。代理根据key以及数据分布将请求发送到对应codis-server(redis)节点。这些数据分布(slot的划分)存储在zookeeper中。顺序一致的zookeeper能保证数据的一致性。

codis架构图

Codis-proxy实现原理(Redis分布式解决方案1) 虽然图很简陋,但是足够清晰明了。(山不再高,有仙则名) fe、dashboard主要用于对集群就行手动管理。每个codis-group就是一个redis主从集群。核心就是proxy,我们对proxy展开分析。

核心关注点

1.一次客户端请求的处理过程

2.集群扩缩容处理

3.集群监控

Proxy启动

proxy数据结构
type Proxy struct {
    mu sync.Mutex
    xauth string
    // 这些信息会被存储到coordinator(zk、etcd),具体什么数据后面会说明
    model *models.Proxy
    // 检测退出的chan
    exit struct {
        C chan struct{}
    }
    //是否上线的标志
    online bool
    closed bool
    //整个proxy的配置信息
    config *Config
    //路由表 存储了集群的sharedBackendConnPool以及slot,转发redis请求时使用
    router *Router
    ignore []byte
    // 工作端口(和客户端)
    lproxy net.Listener
    //集群管理的服务端口 
    ladmin net.Listener
    // 所有redis节点
    ha struct {
        monitor *redis.Sentinel
        masters map[int]string
        servers []string
    }
    // 和coordinator(zk、etcd)通信的客户端
    jodis *Jodis
}
proxy.New(初始化proxy实例)

启动流程很简单:读取配置,然后根据配置信息调用proxy.New方法。 主要就是初始化proxy这个数据结构。

核心点:

1.初始化配置

2.启动lproxy、ladmin两个服务端口

go s.serveAdmin()
go s.serveProxy()

这两个方法其实就是初始化服务,serveAdmin借助martini实现http服务。serveProxy则是处理转发请求的长链接服务。

3.启动Jodis,创建zk客户端

if config.JodisAddr != "" {
    c, err := models.NewClient(config.JodisName, config.JodisAddr, config.JodisAuth, config.JodisTimeout.Duration())
    if err != nil {
        return err
    }
    if config.JodisCompatible {
        s.model.JodisPath = filepath.Join("/zk/codis", fmt.Sprintf("db_%s", config.ProductName), "proxy", s.model.Token)
    } else {
        s.model.JodisPath = models.JodisPath(config.ProductName, s.model.Token)
    }
    s.jodis = NewJodis(c, s.model)
}

整个启动过程中,基本就做了这么多事情。此时proxy只是启动成功,并没有加入集群,也就是并没有online。后续通过serveAdmin的api上线proxy。(如果启动的时候设置了—fillslots变量,则会解析该分片,然后自动加入集群开始工作)

Proxy处理客户端请求

serveProxy初始化的时候会启动proxy服务

go func(l net.Listener) (err error) {
    defer func() {
        eh <- err
    }()
    for {
        c, err := s.acceptConn(l)
        if err != nil {
            return err
        }
        NewSession(c, s.config).Start(s.router)
    }
}(s.lproxy)

对于每个连接都会创建一个session,然后调用Start方法去处理对应session。处理逻辑依赖路由表,所以入参为s.router。 session会设置tcp的keepalive,避免死连接。

start方法

1.判断session是否超过最大值,超过最大值返回错误。也就是做多同时存在1000个session。 通过原子变量计算session数量

2.判断路由表是否在线,不在线返回错误

s.Conn.Encode(redis.NewErrorf("ERR router is not online"), true)

3.创建RequestChan(tasks),启动协程执行s.loopWriter和s.loopReader方法。

RequestChan主要用来存储请求处理结果。

s.loopReader是用来读取请求,并转发到redis服务器处理。

s.loopWriter则是合并redis处理结果,返回给业务客户端。

s.loopReader->RequestChan->s.loopWriter

tasks := NewRequestChanBuffer(1024)

go func() {
    s.loopWriter(tasks)
    decrSessions()
}()

go func() {
    s.loopReader(tasks, d)
    tasks.Close()
}()   

接下来我们先看loopReader方法。也就是如何读取请求然后选择连接,发送到后端redis服务。

loopReader方法
for !s.quit {
    //从连接中读取命令,主要是命令解析工作
    multi, err := s.Conn.DecodeMultiBulk()
    if err != nil {
        return err
    }
    if len(multi) == 0 {
        continue
    }
    //统计数据++
    s.incrOpTotal()
    //pipeline长度太长 统计++,用于报警
    if tasks.Buffered() > maxPipelineLen {
        return s.incrOpFails(nil, ErrTooManyPipelinedRequests)
    }
    start := time.Now()
    s.LastOpUnix = start.Unix()
    s.Ops++
    //创建一个请求的request
    r := &Request{}
    //保存所有命令
    r.Multi = multi
    //处理命令的信号
    r.Batch = &sync.WaitGroup{}
    r.Database = s.database
    r.UnixNano = start.UnixNano()
    //处理命令
    if err := s.handleRequest(r, d); err != nil {
        r.Resp = redis.NewErrorf("ERR handle request, %s", err)
        //将结果丢进requestChan
        tasks.PushBack(r)
        if breakOnFailure {
            return err
        }
    } else {
        tasks.PushBack(r)
    }
}

1.循环读取命令并处理(直到读取到QUIT结束)

2.如果pipeline的长度太长,计算监控指标

3.创建sync.WaitGroup,命令处理结束后,发送信号。以便于另外一个写协程将命令写会客户端。后面细说。

4.调用s.handleRequest处理命令

handleRequest方法

1.解析命令,如果是quit,则设置quit标志位然后退出。如果是auth则进行认证。

2.如果没有需要认证,但没有认证成功返回失败

3.处理其他指令,对特殊命令需要特殊处理。比如多key操作

switch opstr {
case "SELECT":
    return s.handleSelect(r)
case "PING":
    return s.handleRequestPing(r, d)
case "INFO":
    return s.handleRequestInfo(r, d)
case "MGET":
    return s.handleRequestMGet(r, d)
case "MSET":
    return s.handleRequestMSet(r, d)
case "DEL":
    return s.handleRequestDel(r, d)
case "EXISTS":
    return s.handleRequestExists(r, d)
case "SLOTSINFO":
    return s.handleRequestSlotsInfo(r, d)
case "SLOTSSCAN":
    return s.handleRequestSlotsScan(r, d)
case "SLOTSMAPPING":
    return s.handleRequestSlotsMapping(r, d)
default:
    return d.dispatch(r)
}

其实多key操作就是分别进行处理。因为每个key所在的group可能不同。但最终都会调用d.dispatch方法进行处理。

该方法主要就是获取key的hash值。通过crc32算法计算并选取一个slot对请求进行处理。

当然这里对hashtag进行了特殊处理。(保证相同hashtag的key会被路由到同一节点,避免lua执行失败)

func (s *Router) dispatch(r *Request) error {
    hkey := getHashKey(r.Multi, r.OpStr)
    var id = Hash(hkey) % MaxSlotNum
    slot := &s.slots[id]
    return slot.forward(r, hkey)
}
slot.forward方法

具体实现在forward.go中,有forwardSync和forwardSemiAsync两种实现

func (d *forwardSync) Forward(s *Slot, r *Request, hkey []byte) error {
    s.lock.RLock()
    bc, err := d.process(s, r, hkey)
    s.lock.RUnlock()
    if err != nil {
        return err
    }
    bc.PushBack(r)
    return nil
}

该方法会调用d.process去执行,返回一个BackendConn(真正用来访问redis的连接)。

d.process方法
func (d *forwardSync) process(s *Slot, r *Request, hkey []byte) (*BackendConn, error) {
    if s.backend.bc == nil {
        log.Debugf("slot-%04d is not ready: hash key = '%s'",
            s.id, hkey)
        return nil, ErrSlotIsNotReady
    }
    if s.migrate.bc != nil && len(hkey) != 0 {
        if err := d.slotsmgrt(s, hkey, r.Database, r.Seed16()); err != nil {
            log.Debugf("slot-%04d migrate from = %s to %s failed: hash key = '%s', database = %d, error = %s",
                s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, r.Database, err)
            return nil, err
        }
    }
    r.Group = &s.refs
    r.Group.Add(1)
    return d.forward2(s, r), nil
}

这里先说一下slot的结构

type Slot struct {
    id   int
    lock struct {
        hold bool
        sync.RWMutex
    }
    refs sync.WaitGroup

    switched bool

    backend, migrate struct {
        id int
        bc *sharedBackendConn
    }
    replicaGroups [][]*sharedBackendConn
    method forwardMethod
}

主要就是backend和migrate:

  • backend主要是处理redis读写操作的后端连接,
  • migrate是用来进行key迁移的连接。codis对redis进行了部分修改,支持了一些key迁移命令

replicaGroups则是对应从节点的连接,如果设置允许读取从的话,读取命令会被发送的从节点。

再回到forwardSync的process方法:

1.如果slot的backend为空,则代表没有节点,直接返回

2.如果migrate不为空,正在迁移。那么调用d.slotsmgrt方法强制迁移。

3.调用forward2获取最终的BackendConn

forward2(连接选取)
func (d *forwardHelper) forward2(s *Slot, r *Request) *BackendConn {
    var database, seed = r.Database, r.Seed16()
    if s.migrate.bc == nil && !r.IsMasterOnly() && len(s.replicaGroups) != 0 {
        for _, group := range s.replicaGroups {
            var i = seed
            for range group {
                i = (i + 1) % uint(len(group))
                if bc := group[i].BackendConn(database, seed, false); bc != nil {
                    return bc
                }
            }
        }
    }
    return s.backend.bc.BackendConn(database, seed, true)
}

1.选取一个节点。如果是读操作并且可读从,从s.replicaGroups随机选择一个,否则选择s.backend.bc。

2.调用BackendConn方法。该方法就是选取一个连接(BackendConn)。

如果每个db只有一个连接,那么根据db(0~16)选取对应连接返回。(具体可以看sharedBackendConn)。 成功后返回。

PushBack

一路返回最终回调用PushBack方法。

这个方法其实就是将Request丢进bc.input,等待处理。

func (bc *BackendConn) PushBack(r *Request) {
    if r.Batch != nil {
        r.Batch.Add(1)
    }
    bc.input <- r
}

总结:其实就是根据请求选择连接的流程,最终会丢到对应连接的input管道。然后返回。

上层request加入到RequestChan的data数组里面,为后面session的loopWriter使用,因为request的结果也会被丢进request结构的resp中。所以这个request从头到尾都是有价值的。

func (c *RequestChan) PushBack(r *Request) int {
    c.lock.Lock()
    n := c.lockedPushBack(r)
    c.lock.Unlock()
    return n
}

我们来看下面这张图,其实上面就说了1~4过程

Codis-proxy实现原理(Redis分布式解决方案1)

具体什么时候执行6~11,我们接下来再来研究。

fillslots的时候会创建newSharedBackendConn,也就是创建连接池。 创建连接池时候会创建连接BackendConn。

创建完成后会启动协程调用backendConn的run方法。刚方法会调用loopwrite方法。

上面细节会在集群配置中细说。

backendConn的loopwriter

1.loopwrite先调用backendConn的newBackendReader方法。

func (bc *BackendConn) newBackendReader(round int, config *Config) (*redis.Conn, chan<- *Request, error) {
    c, err := redis.DialTimeout(bc.addr, time.Second*5,
        config.BackendRecvBufsize.AsInt(),
        config.BackendSendBufsize.AsInt())
    if err != nil {
        return nil, nil, err
    }
    c.ReaderTimeout = config.BackendRecvTimeout.Duration()
    c.WriterTimeout = config.BackendSendTimeout.Duration()
    c.SetKeepAlivePeriod(config.BackendKeepAlivePeriod.Duration())


    if err := bc.verifyAuth(c, config.ProductAuth); err != nil {
        c.Close()
        return nil, nil, err
    }
    if err := bc.selectDatabase(c, bc.database); err != nil {
        c.Close()
        return nil, nil, err
    }

    tasks := make(chan *Request, config.BackendMaxPipeline)
    go bc.loopReader(tasks, c, round)

    return c, tasks, nil
}

(1)创建redis连接

(2)创建一个request chan去存放所有request。

(3)执行bc.loopReader,该方法主要就是遍历tasks。然后转发到redis执行。成功后调用setResponse方法

func (bc *BackendConn) setResponse(r *Request, resp *redis.Resp, err error) error {
    r.Resp, r.Err = resp, err
    if r.Group != nil {
        r.Group.Done()
    }
    if r.Batch != nil {
        r.Batch.Done()
    }
    return err
}

这里需要说一下 ,这是协程去执行的。此时tasks还没有填充任务。

2.接下来才是填充任务到tasks,上面的协程才会处理。 这里就是遍历之前丢进input管道的request。然后将其丢进tasks

for r := range bc.input {
    if r.IsReadOnly() && r.IsBroken() {
        bc.setResponse(r, nil, ErrRequestIsBroken)
        continue
    }
    if err := p.EncodeMultiBulk(r.Multi); err != nil {
        return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
    }
    if err := p.Flush(len(bc.input) == 0); err != nil {
        return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
    } else {
        tasks <- r
    }
}
return nil

上面的逻辑有点绕。其实最终就是调用setResponse方法。激活之前的waitGroup。这样session的loopwriter就会从阻塞中接触,然后返回给业务客户端。

session的loopWrite

该方法调用handleResponse处理结果。首先就是wait,等待上面的done,如果done,代表请求处理完成。 返回resp,最后写会给客户端。

func (s *Session) handleResponse(r *Request) (*redis.Resp, error) {
    r.Batch.Wait()
    if r.Coalesce != nil {
        if err := r.Coalesce(); err != nil {
            return nil, err
        }
    }
    if err := r.Err; err != nil {
        return nil, err
    } else if r.Resp == nil {
        return nil, ErrRespIsRequired
    }
    return r.Resp, nil
}

总结

本文先说了业界redis分布式解决方案,然后对codis-proxy启动,以及如何处理客户端请求源码进行追踪。启动其实就是加载配置启动服务。处理请求流程通过协程、waitGroup、chan实现。核心就是读取请求,创建request。然后完全围绕request去执行。根据request选择连接,然后访问redis。最后将结果写入request中,更新waitGroup。通知将结果返回给客户端。

后面重点关注集群管理(slot分配、迁移),server、proxy加入集群,sentinel的支持。