likes
comments
collection
share

使用Go从零实现一个Redis(二):创建TCP服务器

作者站长头像
站长
· 阅读数 37

此代码参考了大佬的godis。godis是一个非常值得学习的项目!!!

在Redis6中,网络模型从单线程优化成了多线程。

多线程的网络模型是指在处理客户端请求时,使用多个线程来并发地处理多个连接,从而提高系统的并发处理能力。相比于单线程模型,多线程模型可以同时处理多个请求,提高了并发处理能力。

在单线程模型中,所有的请求都必须在同一个线程中处理,一个连接上的请求必须等待前一个请求处理完成后才能处理,无法并行处理多个连接上的请求,因此在高并发的场景下容易出现性能瓶颈。而在多线程模型中,每个线程都可以独立处理一个连接上的请求,不会因为前一个请求的处理时间过长而影响其他连接的处理,因此可以有效提高系统的并发处理能力。但是多线程模型也存在一些问题,比如线程切换开销、线程安全问题等,需要通过一些技术手段来解决。

请注意,redis6的多线程是指网络多线程,也就是可以同时处理多个请求,但是命令在执行的时候仍然是单线程执行的。Redis6的网络模型主要在ae.h,ae.c,networking.c文件中

由于篇幅原因,这里就不贴源码了,有兴趣的小伙伴可以直接查看Redis源码中对应的文件。

在ae.c文件中,主要实现了多线程的事件循环和事件处理机制:

  1. aeCreateEventLoop:创建事件循环。
  2. aeCreateFileEvent:创建文件事件。
  3. aeDeleteFileEvent:删除文件事件。
  4. aeGetFileEvents:获取文件事件。
  5. aeCreateTimeEvent:创建时间事件。
  6. aeDeleteTimeEvent:删除时间事件。
  7. aeProcessEvents:处理事件。
  8. aeWait:等待事件。
  9. aeCreateThread:创建线程。
  10. aeGetApiName:获取事件处理器的名字。

在networking.c文件中主要实现了多线程的网络处理:

  1. createClient:创建客户端连接。
  2. freeClient:释放客户端连接。
  3. acceptTcpHandler:处理新的 TCP 连接。
  4. readQueryFromClient:从客户端读取请求。
  5. addReply:向客户端发送响应。
  6. sendReplyToClient:向客户端发送响应。

在MiniRedis中使用如下方式实现TCP服务器

Handler

Handler用于处理TCP服务,其中包括了Handle方法和Close方法。

(interface/tcp/handler.go)

// HandleFunc 代表处理方法,ctx表示请求携带的相关数据,conn表示一个网络连接,用于客户端和服务端之间传递数据
type HandleFunc func(ctx context.Context, conn net.Conn)

// Handler 用于处理tcp的服务
type Handler interface {
   Handle(ctx context.Context, conn net.Conn)
   Close() error
}

ListenAndServe

  • 在Go语言中,可以使用net.Listen创建一个监听器,并且使用监听器接收连接对象
  • 使用开销更小的协程处理客户端的连接,实现了Redis6中的多线程并发执行,但只是异步处理连接,在操作数据库的时候仍然是同步进行的,保证了Redis命令执行的原子性。
  • 如果在执行请求的时候出现了错误,需要对错误进行记录,并且执行关闭监听器、连接等收尾工作,这称之为优雅关闭,在Go中使用channel实现了主线程和协程之间的信息交互从而记录了错误信息并且可以进行处理

(tcp/server.go)

// Config 保存了创建TCP连接的配置信息
type Config struct {
   Address    string        `yaml:"address"`
   MaxConnect uint32        `yaml:"max-connect"`
   Timeout    time.Duration `yaml:"timeout"`
}

// ClientCounter 用于记录连接到miniRedis的客户端数量
var ClientCounter int
// ListenAndServeWithSignal 用于监听和处理请求,并且携带信号量用于处理异常,例如请求关闭等情况
func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
   // 创建一个管道,记录请求关闭信号
   closeChan := make(chan struct{})
   // 创建一个管道,接受操作系统发送的信号
   sigCh := make(chan os.Signal)
   signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
   // 开启一个新的协程等待操作系统的信号
   go func() {
      sig := <-sigCh
      switch sig {
      case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
         closeChan <- struct{}{}
      }
   }()

   //开始监听,返回一个TCP监听器
   listener, err := net.Listen("tcp", cfg.Address)
   if err != nil {
      return err
   }

   logger.Info(fmt.Sprintf("bind: %s, start listening...", cfg.Address))
   ListenAndServe(listener, handler, closeChan)
   return nil
}
// ListenAndServe 绑定端口并处理请求,持续阻塞直到关闭
func ListenAndServe(listener net.Listener, handler tcp.Handler, closeChan <-chan struct{}) {
   errCh := make(chan error, 1)
   defer close(errCh)
   // 开启一个协程处理关闭和错误信息
   go func() {
      select { // 阻塞接受
      case <-closeChan:
         logger.Info("get exit signal")
      case er := <-errCh:
         logger.Info(fmt.Sprintf("accept error: %s", er.Error()))
      }
      logger.Info("shutting down...")
      _ = listener.Close() // 关闭监听器
      _ = handler.Close()  // 关闭连接
   }()

   ctx := context.Background() // 获取请求上下文

   var waitDone sync.WaitGroup // 用于等待所有的协程执行结束后才执行 listener.Close() 和 handler.Close()
   for {

      // 如果接受错误则写入到管道中
      conn, err := listener.Accept()
      if err != nil {
         errCh <- err
         break
      }

      logger.Info("accept link")
      ClientCounter++
      waitDone.Add(1)
      //异步执行
      go func() {
         defer func() {
            waitDone.Done()
            ClientCounter--
         }()
         //handle是对整个连接的
         handler.Handle(ctx, conn)
      }()
   }
   waitDone.Wait()
}

传送门下一篇:

转载自:https://juejin.cn/post/7220715429243682876
评论
请登录