likes
comments
collection
share

6. 从单体到微服务:选课系统的架构演进之路

作者站长头像
站长
· 阅读数 56

回顾上期

在上期我们通过一系列的手段终于完成了一个选课的demo,其中我们从并发的角度进行优化,通过引入不同的技术来进行加强项目的并发度,但是在此过程中遇到了许许多多的问题例如:在文1的单体mysql中,为了避免并发安全造成的问题和一致性问题,我们不得不使用mysql提供事务和锁(这里不一定使用mysql的事务和锁机制也可以通过服务串行化的手段来解决这类问题,只不过mysql锁的细粒度更小)来进行确保选课的一致,牺牲性能的情况下保证数据的一致性,显然这样并不是一个并发量高的系统。在文2中我们通过引入redis分布式缓存来进行优化,其中主要是用redis做一个预扣减的动作,避免计算操作在内存,规避跟数据库打交道。这样一来大大了加快了请求响应速度。当在缓存层扣减完成,我们通过后台线程进行入库的处理操作。但是问题来了后台操作无法确保数据的可靠性。为了确保数据可靠性,文3我们引入了消息队列进行异步化处理,且消息队列提供可靠的持久化机制。这样一来我们就能确保消息的可靠性, 但是在此之前我们如何确保消息不丢失呢?在RabbitMQ中提供了发布确认机制,我们尽可能的确保发布期间消息到达交换机进行持久化,若发布消息失败了,但在此之前已经进行预扣减操作了。如果不进行处理异常消息的话,就会造成数据不一致。随后我们引入了死信队列 当消息异常时,我们将消息进行丢入到死信队列进行人工补偿操作。这就完了吗?然而并没有,随着不断的并发测试,问题也是随着显露出来了。我们这里是一个选课的系统,我们必须确保用户的多次操作的结果是最后一次的也就是执行顺序问题。通过查略大量的文章,本文通过消息创建的时间戳来判断操作是否过期。如果过期了就不进行处理,因为此操作并非最后一次请求的操作,就这么我们就解决了消息顺序的问题。在文4我们通过一系列的手段进行优化,例如:本地缓存等等。

GPT总结

在前期,我们成功地完成了一个选课系统的Demo构建。针对并发性能,我们采取了一系列技术优化措施,旨在提升项目并发处理水平。其间,面临了诸多难题,比如在单一MySQL数据库环境中,为防止并发引发的数据不一致,我们采用了MySQL的事务管理和锁定机制来确保选课操作的原子性和一致性。然而,这种做法在保证数据安全的同时,无可避免地影响了系统性能,特别是面对高并发场景。

随后,我们融入了Redis分布式缓存,以实现快速的预减库存操作,这有效绕过了频繁的数据库交互,显著加速了响应速度。紧接着,后台进程负责将缓存中的变更同步至数据库,尽管这引出了数据可靠性的问题。

为解决此问题,我们在接下来的改进中加入了消息队列(比如RabbitMQ),利用其异步处理能力和内置的持久化机制,确保数据处理的可靠执行。我们还实施了消息发布确认策略,以最大程度降低消息丢失风险。对于未能正确发布的消息,我们设计了死信队列来收集这些异常情况,预留了人工干预的回旋余地,从而维护数据一致性。

在深入优化的过程中,我们发现还需处理消息的顺序问题,以确保用户操作按照最后发起的指令执行。通过分析各条消息的时间戳,我们能够识别并忽略非最新的重复操作,由此维护了操作的逻辑顺序。

最终,在文4部分,我们进一步探索了多种优化手段,包括局部缓存策略,不断细化和强化系统性能。

整体而言,这一系列的优化措施逐步克服了并发处理、数据一致性和性能瓶颈等关键挑战,显著提升了选课系统的稳定性和效率。

现存问题

当某个服务处理慢请求或大量并发请求时,会占用较多资源,导致其他服务因资源不足而阻塞或响应变慢。服务的脆弱性,比如一个服务或模块发生故障,整个应用都有可能受到影响,甚至导致服务的整体崩溃,造成这种连锁反应。为了解决这些问题在这期我们引入微服务进行条件和治理。提高项目的可扩展性、故障隔离等。

本文解决

单体服务并发量突发造成的请求阻塞问题

能学到什么

  • 单体架构如何过渡到微服务架构
  • 如何进行拆分服务
  • grpc的使用
  • Jaeger进行链路追踪
  • Consul服务中心

涉及技术

  • Grpc
  • Consul
  • Jaeger

架构设计

暂时无法在飞书文档外展示此内容

6. 从单体到微服务:选课系统的架构演进之路

服务拆分

聊聊如何进行服务拆分

其实最常见的是基于业务的服务拆分,将具有相似业务功能的服务组合在一起,如用户服务、订单服务等。对于本项目也就选课服务和用户服务。我们可以将用户服务和选课服务拆分为两个独立的服务,可能得话选课服务会依赖于用户服务,但是这里不能用户服务依赖选课服务,这样会造成服务循环依赖,任意造成整体死亡,就如同了单体架构。

着手

从何开始呢?

针对于一些初始写微服务项目的小白来说(比如我),从何入手是非常关键的,这么多组件该从何入手呢?其实我这里是把基础设施搭建完成再去搭建上层服务,先写被赖服务。由于上期服务我们把基础服务都已经写好了的,这里我们就从网关层开始,到后期如果需要增加其他服务在进行考虑。

网关层

这里网关我们就只单单就做单独的参数校验、服务的调用。那么我们就着手网关层开始。

我们就只进行单独的进行参数校验和用户合法性校验,在网关能做的是尽量在网关层完成。其余的网关服务也类似这里就不一一列出来了。

func GetUserHandler(ctx *gin.Context) {
   

    // valid param
    var req request.UserReq
    if err := ctx.ShouldBind(&req); err != nil {
       logger.Info("GetUserHandler", zap.Error(err))
       resp.ParamErr(ctx)
       return
    }
    logField := []zap.Field{zap.Int64("user_id", req.UserID)}
    // bloom filter
    if !bloom.UserBloom.TestString(fmt.Sprintf("%d", req.UserID)) {
       logger.Info("user not found", logField...)
       resp.Fail(ctx, code.UserNotFound, code.UserNotFoundMsg)
       return
    }
    // call rpc
  
}

服务层

服务拆分

user.proto

syntax = "proto3";
option go_package = "./src/rpc/user;user";
message UserRequest {
  int64 user_id = 1;
}
message UserResponse {
  int32 status_code = 1; // 状态码,0-成功,其他值-失败
  string status_msg = 2; // 返回状态描述
  int64 user_id = 3;
  string user_name = 4;
  string password = 5;
}
service UserService {
  rpc GetUserInfo(UserRequest) returns (UserResponse);
}

// protoc -I .\src\idl\ .\src\idl*.proto --go_out=./ --go-grpc_out=./

course.proto

这里没牵扯到很复杂的服务调用关系,其实这里和上期的没什么区别只是单个服务作为一个服务体。

syntax = "proto3";
option go_package = "./src/rpc/course;course";

// 课程信息消息类型
message Course{
  int64 id = 1;
  string name = 2;
  int64 capacity = 3;

}

// 课程列表响应消息类型
message CourseListRespond{
  int32 status_code = 1; // 状态码,0-成功,其他值-失败
  string status_msg = 2; // 返回状态描述
  repeated Course courses = 3;
}

// 选课请求消息类型
message CourseOptRequest{
  int64 user_id = 1;
  int64 course_id = 2;
}
message CourseOptResponse{
  int32 status_code = 1; // 状态码,0-成功,其他值-失败
  string status_msg = 2; // 返回状态描述
}

// 获取所有课程的服务请求与响应定义
message GetAllCoursesRequest{}

message GetAllCoursesResponse{
  int32 status_code = 1; // 状态码,0-成功,其他值-失败
  string status_msg = 2; // 返回状态描述
  repeated Course courses = 3;
}

// 获取我的课程的服务请求与响应定义
message GetMyCoursesRequest{
  int64 user_id = 1;
}

message GetMyCoursesResponse{
  int32 status_code = 1; // 状态码,0-成功,其他值-失败
  string status_msg = 2; // 返回状态描述
  repeated Course courses = 3;
}

// 扣减课程容量及创建消息的请求与响应可以后续根据实际业务逻辑设计

message EnQueueCourseRequest{
  int64 course_id = 1;
  int64 user_id = 2;
  // 消息创建时间
  int64 create_at = 3;
  // 选课状态
  bool is_select = 4;
}

service CourseService{
  // 获取所有课程列表
  rpc GetAllCourses(GetAllCoursesRequest) returns (GetAllCoursesResponse) {}

  // 获取指定用户的课程列表(我的课程)
  rpc GetMyCourses(GetMyCoursesRequest) returns (GetMyCoursesResponse) {}

  // 选课操作
  rpc SelectCourse(CourseOptRequest) returns (CourseOptResponse) {}
  // 退课操作
  rpc BackCourse(CourseOptRequest) returns (CourseOptResponse) {}
  // 其中选课操作和退课操作都存在在redis预创建,如何丢入到消息队列进行处理操作,消息队列处理操作为:扣减课程,修改用户课程表,添加用户选课记录操作。
  // 丢入消息队列操作
  rpc EnQueueCourse(EnQueueCourseRequest) returns (CourseOptResponse) {}
}

服务注册

这里是基于consul做服务中心

Docker-compose

version: "3.9"
services:
  consul:
    image: consul
    command: [
      "agent",
      "-dev",
      "-client=0.0.0.0"
    ]
    ports:
      - "8500:8500"
    restart: unless-stopped
    healthcheck:
      test: [ "CMD", "curl", "-f", "http://localhost:8500/v1/status/leader" ]
      interval: 10s
      timeout: 5s
      retries: 3
    networks:
      - Course

Register


type ConsulDiscovery struct {
    prefix    string
    Address   string
    client    *capi.Client
    serviceID string
}

func (c *ConsulDiscovery) Register(ctx context.Context, service Service) error {
    parsePort, err := strconv.Atoi(service.Port[1:])
    if err != nil {
       return err
    }
    serviceID := fmt.Sprintf("%s-%s-%s:%d", c.prefix, service.Name, config2.EnvCfg.BaseHost, parsePort)
    c.serviceID = serviceID
    reg := &capi.AgentServiceRegistration{
       ID:      serviceID,
       Name:    service.Name,
       Address: config2.EnvCfg.BaseHost,
       Port:    parsePort,
       // Http检查

    }
    if config2.EnvCfg.ProjectMode == "prod" {
       reg.Check = &capi.AgentServiceCheck{
          Interval:                       "5s",
          Timeout:                        "5s",
          GRPC:                           fmt.Sprintf("%s:%d", config2.EnvCfg.BaseHost, parsePort),
          GRPCUseTLS:                     false,
          DeregisterCriticalServiceAfter: "30s", // 30s
       }

    }
    if err := c.client.Agent().ServiceRegister(reg); err != nil {
       return err
    }
    logger.LogService(service.Name).Debug("register service success",
       zap.String("address", fmt.Sprintf("%s:%s", config2.EnvCfg.BaseHost, service.Port)),
    )
    return nil
}

DeRegister

func (c *ConsulDiscovery) Deregister(ctx context.Context, name string) error {
    if c.serviceID == "" {
       return nil
    }
    if err := c.client.Agent().ServiceDeregister(c.serviceID); err != nil {
       return err
    }
    logger.LogService(name).Debug("deregister service success", zap.String("id", c.serviceID))
    return nil
}

GetService

func (c *ConsulDiscovery) GetService(ctx context.Context, name string) (string, error) {

    return fmt.Sprintf("consul://%s/%s?wait=15s", c.Address, name), nil
}

调用服务

这里我们进行封装Connect方法进行创建ClientConn对象,设置一些可选项负载均衡、keepalive等。通过传入服务名称在consul服务中心获取可用服务连接。

package grpc


func Dial(addr string) (*grpc.ClientConn, error) {
    kacp := keepalive.ClientParameters{
       Time:                10 * time.Second, // send pings every 10 seconds if there is no activity
       Timeout:             time.Second,      // wait 1 second for ping ack before considering the connection dead
       PermitWithoutStream: false,            // send pings even without active streams
    }
    return grpc.Dial(
       addr,
       grpc.WithInsecure(),
       grpc.WithTransportCredentials(insecure.NewCredentials()),
       grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
       grpc.WithKeepaliveParams(kacp)
    )
}

func Connect(ctx context.Context, serviceName string) *grpc.ClientConn {
    addr, err := discovery.Consul.GetService(ctx, serviceName)
    if err != nil {
       logger.LogService(serviceName).Error("get service error", zap.Error(err))
       panic(err)
    }
    conn, err := Dial(addr)
    if err != nil {
       logger.LogService(serviceName).Error("dial error", zap.Error(err))
       panic(err)
    }
    return conn
}

其实在此之前还使用了grpc-pool这个库,进行创建一定量的grpc连接。但是经过测试使用单独conn实例的对象和使用从pool里拿取的conn实例执行效率差不多,就决定不使用这个库了。(这里看有没有大佬有更好的库推荐呢?)

注册服务成功

6. 从单体到微服务:选课系统的架构演进之路

6. 从单体到微服务:选课系统的架构演进之路

链路追踪

为了完善的链路,我们引入otelgrpc进行链路追踪、监控、故障排查和性能优化问题,多个服务调用完整调用链路视图。我们这里使用Jaeger做分布式追踪系统。

package tracing



func GetConf(serverName string) config.Configuration {
    cfg := config.Configuration{
       Sampler: &config.SamplerConfig{
          Type:  jaeger.SamplerTypeConst,
          Param: 1,
       },
       Reporter: &config.ReporterConfig{
          LogSpans:           true,
          LocalAgentHostPort: fmt.Sprintf("%s:%d", config2.EnvCfg.JaegerHost, config2.EnvCfg.JaegerPort),
       },
       ServiceName: serverName,
    }
    return cfg
}

// Init  instance 
func Init(service string) (opentracing.Tracer, io.Closer) {
    cfg := GetConf(service)
    tracer, closer, err := cfg.NewTracer(config.Logger(jaeger.StdLogger))
    if err != nil {
       panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
    }

    return tracer, closer
}

// StartSpan 开启一个span,如果传入的span不为空,那么就是子span
func StartSpan(ctx context.Context, name string) opentracing.Span {
    span := opentracing.SpanFromContext(ctx)
    if span == nil {
       span = opentracing.StartSpan(name)
    } else {
       spContext := span.Context()
       span = opentracing.StartSpan(name, opentracing.ChildOf(spContext))
    }
    return span
}

func RecordWithIP(span opentracing.Span, ip string) {
    span.SetTag("ip", ip)
    span.LogFields(
       log.String("event", "ip"),
       log.String("ip", ip),
    )

}

// RecordError 记录错误
func RecordError(span opentracing.Span, err error) {
    span.SetTag("error", true)
    span.LogFields(
       log.String("event", "error"),
       log.String("message", err.Error()),
    )
}
Grpc注入
return grpc.Dial(
    addr,
    grpc.WithInsecure(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
    grpc.WithKeepaliveParams(kacp),
    grpc.WithChainUnaryInterceptor(
       func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
          ctx = opentracing.ContextWithSpan(ctx, opentracing.SpanFromContext(ctx))
          return invoker(ctx, method, req, reply, cc, opts...)
       },
       otelgrpc.UnaryClientInterceptor(),
       //注入trace
       grpc_opentracing.UnaryClientInterceptor(),
    ),
)
Gin中间件注入
// go get go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin@v0.42.0
v1.Use(otelgin.Middleware(services.WebServiceName))
查看链路

6. 从单体到微服务:选课系统的架构演进之路

6. 从单体到微服务:选课系统的架构演进之路

可以看到记录了每个链路请求时长和请求信息

编写服务

由于服务逻辑在上期中写过,这里就省略了。

启动

构建与打包

这里依赖于Docker容器进行打包与运行服务

FROM golang:1.20 as builder

WORKDIR /build

ENV CGO_ENABLED 0
ENV GOPROXY https://goproxy.cn/,direct

COPY . .
RUN go mod download
RUN go mod tidy

RUN bash ./scripts/build-all.sh
# 多阶构建。

FROM alpine:latest

ENV TZ Asia/Shanghai

WORKDIR /project
ENV PROJECT_MODE prod

COPY --from=builder /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
COPY --from=builder /build/output .
COPY --from=builder /build/.env .
#CMD ["./app"]

build-all.sh

 #!/usr/bin/env bash

echo "Building..."

if [ -d "output" ]; then
    echo "Cleaning output..."
    rm -rf output
fi

mkdir -p /build/output/services
cd /build/demo5/src || exit
# build gateway

go build -o /build/output/app
echo "build gateway success"

#build service
pushd services || exit # enter service dir
for i in *; do
  name="$i"
  capName="${name^}"
  cd "$i" || exit
  go build -o "/build/output/services/$i/${capName}Service"
  echo "build $i success"
  cd ..
done
echo "OK!"

docker-compose 编排服务

version: "3.9"
services:
  redis:
    image: redis:6-alpine
    volumes:
      - ./redis.conf:/etc/redis/redis.conf
    ports:
      - "6379:6379"
    restart: unless-stopped
    networks:
      - Course
    env_file:
      - .env
    command: [ "redis-server", "/etc/redis/redis.conf" ]
  mysql:
    image: mysql:8.0-debian
    restart: unless-stopped
    ports:
      - "3306:3306"
    volumes:
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    env_file:
      - .env
    networks:
      - Course
  rabbitmq:
    image: rabbitmq:3.8-management-alpine
    ports:
      - "5672:5672"
      - "15672:15672"
    networks:
      - Course
    env_file:
      - .env
    restart: unless-stopped
    volumes:
      - Course-Rabbitmq:/var/lib/rabbitmq
  nginx:
    image: nginx
    restart: unless-stopped
    ports:
      - "80:80"
    volumes:
      #      - ./nginx.balancer.conf:/etc/nginx/nginx.conf
- ./nginx.conf:/etc/nginx/nginx.conf
      - ./logs/nginx:/var/log/nginx
    networks:
      - Course
  jaeger:
    image: jaegertracing/all-in-one
    environment:
      - COLLECTOR_OTLP_ENABLED=true
    ports:
      - "5775:5775/udp"
      - "6831:6831/udp"
      - "6832:6832/udp"
      - "5778:5778"
      - "16686:16686"
      - "14250:14250"
      - "14268:14268"
      - "14269:14269"
      - "9411:9411"
    healthcheck:
      test: [ "CMD-SHELL", "wget --spider -q http://localhost:16686/search || exit 1" ]
      interval: 10s
      timeout: 5s
      retries: 3
    networks:
      - Course
  consul:
    image: consul
    command: [
      "agent",
      "-dev",
      "-client=0.0.0.0"
    ]
    ports:
      - "8500:8500"
    restart: unless-stopped
    healthcheck:
      test: [ "CMD", "curl", "-f", "http://localhost:8500/v1/status/leader" ]
      interval: 10s
      timeout: 5s
      retries: 3
    networks:
      - Course
  user:
    image: select-course
    restart: unless-stopped
    ports:
      - "10000:10000"
    command: [ "sh", "-c", "export BASE_HOST=`hostname -i` && ./services/user/UserService" ]
    depends_on:
      - jaeger
      - consul
      - redis
      - mysql
      - rabbitmq
    env_file:
      - .env
    networks:
      - Course
    volumes:
      - ./logs:/project/logs
  course:
    image: select-course
    restart: unless-stopped
    ports:
      - "10001:10001"
    command: [ "sh", "-c", "export BASE_HOST=`hostname -i` && ./services/course/CourseService" ]
    depends_on:
      - jaeger
      - consul
      - redis
      - mysql
      - rabbitmq
    env_file:
      - .env
    networks:
      - Course
    volumes:
      - ./logs:/project/logs
  app:
    image: select-course
    restart: unless-stopped
    ports:
      - "8888:8888"
    command: [ "sh", "-c", "export BASE_HOST=`hostname -i` && ./app" ]
    env_file:
      - .env
    networks:
      - Course
    volumes:
      - ./logs:/project/logs
    depends_on:
      - user
      - course
      - jaeger
      - consul
      - redis
      - mysql
      - rabbitmq

networks:
  Course:
    driver: bridge
volumes:
  Course-RDB:
  Course-Mysql:
  Course-Rabbitmq:

到这里的话就可以基于docker-compose启动项目了。

总结

微服务这一块的话我也没有什么好说的,只能是现学现卖。如果有错的地方还请大佬指导指导,受教了。

本期代码请见:github.com/bbz1024/sel…

若阁下感兴趣可以clone下来玩一玩。

git clone --branch demo5 https://github.com/bbz1024/select-course.git
转载自:https://juejin.cn/post/7380645714932645951
评论
请登录