[K8S] Envoy XDS的使用 Golang 版本 (3)
0. 前言
-
本篇文章主要介绍 XDS 如何在 Envoy 中使用. XDS 是一组用于配置管理的协议,特别是在云原生环境中,XDS 中的 "X" 代表不同的配置类型,例如 LDS(监听器配置)、RDS(路由配置)、CDS(集群配置)、EDS(端点配置)等。这些协议被广泛用于管理服务间的通信、负载均衡、流量控制等配置。
-
这些协议的设计是为了解决在现代微服务架构中的一些挑战,比如动态配置的需求、服务发现、负载均衡和安全等问题。Envoy 是一个开源的高性能代理,可以作为边车代理(sidecar proxy)与应用程序部署在一起,协助管理这些配置。
以下是 xDS 协议中一些关键的配置类型:
- LDS(Listener Discovery Service) :该配置用于定义 Envoy 监听的网络地址和协议。监听器监听来自外部客户端的连接,并将连接请求转发给后端的服务。
- RDS(Route Discovery Service) :RDS 用于配置 Envoy 的路由规则。它定义了如何根据请求的内容将流量路由到不同的集群或服务实例。
- CDS(Cluster Discovery Service) :CDS 用于配置 Envoy 的集群。一个集群定义了一组具有相同服务的后端实例,Envoy 会将流量负载均衡到这些实例上。
- EDS(Endpoint Discovery Service) :EDS 配置用于告知 Envoy 关于集群中实际后端服务实例的信息,包括它们的网络地址和健康状态。
- SDS(Secret Discovery Service) :SDS 是一种用于管理 Envoy 所需的安全证书和密钥的配置。
这些配置类型可以根据实际需求动态地进行更新,从而使得服务的配置更具灵活性,适应快速变化的需求。
1. 预热
1.1 构建实验 APP
-
部署 nginx 作为我们需要代理的目标服务. 需要使用 docker 作为我们的基础
docker run --name ngxv1 -d nginx:1.18-alpine
-
获取 nginx ip 用作 envoy 最终反向代理的目标;
docker inspect ngxv1
-
将 nginx 的容器ip 写入到 envoy 配置中, envoy.yaml (不太理解 envoy 配置的请翻看之前的文章);
admin: address: socket_address: { address: 0.0.0.0, port_value: 9901 } static_resources: listeners: - name: listener_0 # 监听地址 0.0.0.0:8080 address: socket_address: { address: 0.0.0.0, port_value: 8080 } filter_chains: - filters: - name: envoy.filters.network.http_connection_manager typed_config: "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager stat_prefix: ingress_http codec_type: AUTO route_config: name: shadow_route virtual_hosts: - name: myhost domains: ["*"] routes: - match: {prefix: "/"} route: {cluster: shadow_yd_cluster} http_filters: - name: envoy.filters.http.router clusters: - name: shadow_yd_cluster connect_timeout: 1s type: Static dns_lookup_family: V4_ONLY lb_policy: ROUND_ROBIN load_assignment: cluster_name: shadow_yd_cluster endpoints: - lb_endpoints: - endpoint: address: socket_address: # 需要修改 lb address , 这便是 ngxv1 address: 172.17.0.5 # 代理端口 port_value: 80
-
启动 envoy 容器作为 ngxv1 的反向代理, 注意需要挂载envoy 配置;
docker run --name=envoy -d -p 9901:9901 -p 8080:8080 -v /opt/envoy/envoy.yaml:/etc/envoy/envoy.yaml envoyproxy/envoy-alpine:v1.21.0
-
测试 ngxv1 是否能通过 envoy 的 8080 端口访问, 由于我开启了docker 的 nat 所以我可以访问本机的 8080 端口就能访问 envoy 监听的 8080 端口. 9901 端口是 envoy 的 admin 端口.
# curl http://127.0.0.1:8080 # v1 访问成功 # curl http://127.0.0.1:9901 # ...
1.2 通过文件进行服务发现
-
本小节主要是演示如何通过文件进行 XDS 配置, 其实也是对 envoy 配置的一些拆分认知. 下面主要介绍 LDS 和 CDS. 留意一下配置主要有个印象即可;
-
/opt/envoy/envoy.yaml 初始化配置
admin: address: socket_address: { address: 0.0.0.0, port_value: 9901 } node: # envoy 节点名字 cluster: mynode # envoy 节点的唯一标识 id: node01 dynamic_resources: cds_config: path: /etc/envoy/cds.yaml lds_config: path: /etc/envoy/lds.yaml
-
/opt/envoy/lds.yaml LDS配置
resources: - name: listener_0 # 指定当前配置是 listener 对象 "@type": type.googleapis.com/envoy.config.listener.v3.Listener address: socket_address: { address: 0.0.0.0, port_value: 8080 } filter_chains: - filters: - name: envoy.filters.network.http_connection_manager typed_config: "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager stat_prefix: ingress_http codec_type: AUTO route_config: name: jtroute virtual_hosts: - name: myhost domains: [ "*" ] routes: - match: { prefix: "/" } route: { cluster: shadow_yd_cluster } http_filters: - name: envoy.filters.http.router
-
/opt/envoy/lds.yaml CDS配置
resources: - name: shadow_yd_cluster # 指定当前配置是 cluster 对象 "@type": type.googleapis.com/envoy.config.cluster.v3.Cluster connect_timeout: 1s type: Static dns_lookup_family: V4_ONLY lb_policy: ROUND_ROBIN load_assignment: cluster_name: shadow_yd_cluster endpoints: - lb_endpoints: - endpoint: address: socket_address: address: 172.17.0.5 port_value: 80
-
从上面配置可以看出, 它看起来就像做了配置的拆分. 其实 xds 主要就是在做这个事情. 上面的配置如何测试? 删除之前的 envoy 容器, 重新启动并挂载即可.
docker run --name=envoy -d -p 9901:9901 -p 8080:8080 -v /opt/envoy/:/etc/envoy envoyproxy/envoy-alpine:v1.21.0
2. XDS 使用
-
这次我们将使用 Golang 代码来演示如何进行配置版本的下发。我们会使用一个名为 github.com/envoyproxy/go-control-plane 的库,这个库是 Envoy Proxy 的官方控制面板库。
-
在后续示例中,我们将展示如何使用该库来管理配置版本的下发。这个库为我们提供了工具,帮助我们构建一个与 Envoy 控制面通信的接口,以便将配置更改传递给 Envoy 实例。通过这种方式,我们可以在代码级别管理配置的变更和下发过程,实现对服务网格的动态控制.
2.1 构建XDS ControlPanel
-
XDS 服务要定
- 内部维护了一个Cache, 用于存储配置
- 通过 一个 snapshot 对象保证每次变更的一致性;
- 所以如果我们通过变更snapshot, 并设置回 cache 就相当于我们修改了配置;
-
查看代码目录结构, 由于代码量并不多我就不传 github 了.
|____controlpanel | |____cmd | | |____control-panel | | |____client.go | | |____main.go | |____utils | | |____mylogger.go | | |____callback.go | | |____resources.go
-
首先看看 cmd/main.go, 这是control-panel 的核心部分, 留意代码注释的解析
package main import ( "context" "fmt" clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3" listenerservice "github.com/envoyproxy/go-control-plane/envoy/service/listener/v3" routeservice "github.com/envoyproxy/go-control-plane/envoy/service/route/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/gin-gonic/gin" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" "log" "net" "os" "study-envoy/4_discoverywithXDS/4_3_updateCache/controlpanel/utils" "time" ) func main() { var grpcOptions []grpc.ServerOption grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(1000), //一条GRPC连接允许并发的发送和接收多个Stream grpc.KeepaliveParams(keepalive.ServerParameters{ Time: time.Second * 30, //连接超过多少时间 不活跃,则会去探测 是否依然alive Timeout: time.Second * 5, }), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ MinTime: time.Second * 30, //发送ping之前最少要等待 时间 PermitWithoutStream: true, //连接空闲时仍然发送PING帧监测 }), ) //创建grpc 服务 grpcServer := grpc.NewServer(grpcOptions...) //日志 llog := utils.MyLogger{} //创建缓存系统 c := cache.NewSnapshotCache(false, cache.IDHash{}, llog) // envoy 配置的缓存快照, 1 其实是版本号, 通过版本号的变更进行配置更新 // 该函数后续后续会有完整的文件内容 snapshot := utils.GenerateSnapshot("1") if err := snapshot.Consistent(); err != nil { llog.Errorf("snapshot inconsistency: %+v\n%+v", snapshot, err) os.Exit(1) } // Add the snapshot to the cache // nodeID 必须要设置 nodeID := "test1" if err := c.SetSnapshot(context.Background(), nodeID, snapshot); err != nil { os.Exit(1) } // 请求回调, 回调类似于中间件 cb := utils.Callbacks{Debug: llog.Debug} // 官方提供的控制面 server srv := server.NewServer(context.Background(), c, &cb) //注册 集群服务---只 写了一个 clusterservice.RegisterClusterDiscoveryServiceServer(grpcServer, srv) // 注册 listener listenerservice.RegisterListenerDiscoveryServiceServer(grpcServer, srv) // 由于在 listener 下的需要创建路由, 所以需要加入 routeservice.RegisterRouteDiscoveryServiceServer(grpcServer, srv) errCh := make(chan error) // 启动 grpc 服务 go func() { // 我们的grpc 服务是9090 lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 9090)) if err != nil { errCh <- err return } //启动grpc 服务 if err = grpcServer.Serve(lis); err != nil { errCh <- err } }() // 启动动态测试服务, 你可以通过请求 /test 进行版本更替 go func() { r := gin.New() r.GET("/test", func(ctx *gin.Context) { // 如果你部署 2个 nginx 容器, 可以通过这个 IP 的调整测试出是否成功从代理 v1 nginx 转成代理 v2 nginx utils.UpstreamHost = "172.17.0.7" // 通过版本控制snapshot 的更新 ss := utils.GenerateSnapshot("2") if err := c.SetSnapshot(ctx, nodeID, ss); err != nil { ctx.String(400, err.Error()) return } ctx.String(200, "OK") }) if err := r.Run("0.0.0.0:18000"); err != nil { errCh <- err } }() err := <-errCh log.Fatal(err) }
-
接下来是 utils/callback.go, 类似于在 http 架构的中间件作用
package utils import ( "context" "fmt" "log" "sync" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" ) type Callbacks struct { Signal chan struct{} Debug bool Fetches int Requests int DeltaRequests int DeltaResponses int mu sync.Mutex } var _ server.Callbacks = &Callbacks{Debug: true} func (cb *Callbacks) Report() { cb.mu.Lock() defer cb.mu.Unlock() log.Printf("server callbacks fetches=%d requests=%d\n", cb.Fetches, cb.Requests) } func (cb *Callbacks) OnStreamOpen(_ context.Context, id int64, typ string) error { if cb.Debug { log.Printf("stream %d open for %s\n", id, typ) } return nil } func (cb *Callbacks) OnStreamClosed(id int64, node *core.Node) { if cb.Debug { log.Printf("stream %d of node %s closed\n", id, node.Id) } } func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string) error { if cb.Debug { log.Printf("delta stream %d open for %s\n", id, typ) } return nil } func (cb *Callbacks) OnDeltaStreamClosed(id int64, node *core.Node) { if cb.Debug { log.Printf("delta stream %d of node %s closed\n", id, node.Id) } } func (cb *Callbacks) OnStreamRequest(int64, *discovery.DiscoveryRequest) error { fmt.Println(1111) cb.mu.Lock() defer cb.mu.Unlock() cb.Requests++ if cb.Signal != nil { close(cb.Signal) cb.Signal = nil } return nil } func (cb *Callbacks) OnStreamResponse(context.Context, int64, *discovery.DiscoveryRequest, *discovery.DiscoveryResponse) { fmt.Println(2222) } func (cb *Callbacks) OnStreamDeltaResponse(id int64, req *discovery.DeltaDiscoveryRequest, res *discovery.DeltaDiscoveryResponse) { fmt.Println(3) cb.mu.Lock() defer cb.mu.Unlock() cb.DeltaResponses++ } func (cb *Callbacks) OnStreamDeltaRequest(id int64, req *discovery.DeltaDiscoveryRequest) error { fmt.Println(4) cb.mu.Lock() defer cb.mu.Unlock() cb.DeltaRequests++ if cb.Signal != nil { close(cb.Signal) cb.Signal = nil } return nil } func (cb *Callbacks) OnFetchRequest(_ context.Context, req *discovery.DiscoveryRequest) error { fmt.Println(5) cb.mu.Lock() defer cb.mu.Unlock() cb.Fetches++ if cb.Signal != nil { close(cb.Signal) cb.Signal = nil } return nil } func (cb *Callbacks) OnFetchResponse(*discovery.DiscoveryRequest, *discovery.DiscoveryResponse) { fmt.Println(6) }
-
接下来是 utils/mylogger.go, 仅仅为了 control-panel 的需要所以构建一个 mylogger.go
package utils import "log" type MyLogger struct { Debug bool } func (logger MyLogger) Debugf(format string, args ...interface{}) { if logger.Debug { log.Printf(format+"\n", args...) } } // Log to stdout only if Debug is true. func (logger MyLogger) Infof(format string, args ...interface{}) { if logger.Debug { log.Printf(format+"\n", args...) } } // Log to stdout always. func (logger MyLogger) Warnf(format string, args ...interface{}) { log.Printf(format+"\n", args...) } // Log to stdout always. func (logger MyLogger) Errorf(format string, args ...interface{}) { log.Printf(format+"\n", args...) }
-
utils/resources.go, 这个便是真正存放不同 xds 配置的文件, 通过 snapshot 构建出不同资源类型的服务发现资源. (其实就是构造不同的配置结构体)
package utils import ( cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/envoyproxy/go-control-plane/pkg/wellknown" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/durationpb" "time" ) const ( ClusterName = "shadow_yd_cluster" RouteName = "local_route" ListenerName = "listener_0" ListenerPort = 8080 // 被代理的目标容器地址, 也就是之前提到 ngxv1 UpstreamHost = "172.17.0.5" UpstreamPort = 80 ) // 集群配置 func makeCluster(clusterName string) *cluster.Cluster { return &cluster.Cluster{ Name: clusterName, ConnectTimeout: durationpb.New(5 * time.Second), ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_LOGICAL_DNS}, LbPolicy: cluster.Cluster_ROUND_ROBIN, LoadAssignment: makeEndpoint(clusterName), DnsLookupFamily: cluster.Cluster_V4_ONLY, } } func makeEndpoint(clusterName string) *endpoint.ClusterLoadAssignment { return &endpoint.ClusterLoadAssignment{ ClusterName: clusterName, Endpoints: []*endpoint.LocalityLbEndpoints{{ LbEndpoints: []*endpoint.LbEndpoint{{ HostIdentifier: &endpoint.LbEndpoint_Endpoint{ Endpoint: &endpoint.Endpoint{ Address: &core.Address{ Address: &core.Address_SocketAddress{ SocketAddress: &core.SocketAddress{ Protocol: core.SocketAddress_TCP, Address: UpstreamHost, PortSpecifier: &core.SocketAddress_PortValue{ PortValue: UpstreamPort, }, }, }, }, }, }, }}, }}, } } func makeRoute(routeName string, clusterName string) *route.RouteConfiguration { return &route.RouteConfiguration{ Name: routeName, VirtualHosts: []*route.VirtualHost{{ Name: "myhost", Domains: []string{"*"}, Routes: []*route.Route{{ Match: &route.RouteMatch{ PathSpecifier: &route.RouteMatch_Prefix{ Prefix: "/", }, }, Action: &route.Route_Route{ Route: &route.RouteAction{ ClusterSpecifier: &route.RouteAction_Cluster{ Cluster: clusterName, }, HostRewriteSpecifier: &route.RouteAction_HostRewriteLiteral{ HostRewriteLiteral: UpstreamHost, }, }, }, }}, }}, } } func makeHTTPListener(listenerName string, route string) *listener.Listener { // HTTP filter configuration manager := &hcm.HttpConnectionManager{ CodecType: hcm.HttpConnectionManager_AUTO, StatPrefix: "http", RouteSpecifier: &hcm.HttpConnectionManager_Rds{ Rds: &hcm.Rds{ ConfigSource: makeConfigSource(), RouteConfigName: route, }, }, HttpFilters: []*hcm.HttpFilter{{ Name: wellknown.Router, }}, } pbst, err := anypb.New(manager) if err != nil { panic(err) } return &listener.Listener{ Name: listenerName, Address: &core.Address{ Address: &core.Address_SocketAddress{ SocketAddress: &core.SocketAddress{ Protocol: core.SocketAddress_TCP, Address: "0.0.0.0", PortSpecifier: &core.SocketAddress_PortValue{ PortValue: ListenerPort, }, }, }, }, FilterChains: []*listener.FilterChain{{ Filters: []*listener.Filter{{ Name: wellknown.HTTPConnectionManager, ConfigType: &listener.Filter_TypedConfig{ TypedConfig: pbst, }, }}, }}, } } func makeConfigSource() *core.ConfigSource { source := &core.ConfigSource{} source.ResourceApiVersion = resource.DefaultAPIVersion source.ConfigSourceSpecifier = &core.ConfigSource_ApiConfigSource{ ApiConfigSource: &core.ApiConfigSource{ TransportApiVersion: resource.DefaultAPIVersion, ApiType: core.ApiConfigSource_GRPC, SetNodeOnFirstMessageOnly: true, GrpcServices: []*core.GrpcService{{ TargetSpecifier: &core.GrpcService_EnvoyGrpc_{ EnvoyGrpc: &core.GrpcService_EnvoyGrpc{ClusterName: "xds_cluster"}, }, }}, }, } return source } //创建 缓存快照 func GenerateSnapshot() *cache.Snapshot { snap, _ := cache.NewSnapshot("1", map[resource.Type][]types.Resource{ resource.ClusterType: {makeCluster(ClusterName)}, resource.RouteType: {makeRoute(RouteName, ClusterName)}, resource.ListenerType: {makeHTTPListener(ListenerName, RouteName)}, }, ) return snap }
2.2 XDS Client
-
我们可以使用 xds 的 client 来测试上述服务是否正常工作, 同时我们也可以了解一下通过 golang 代码实现通过 GRPC 获取 XDS 配置
package main import ( "context" "fmt" envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "log" "time" ) func main() { gopts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), } // XDS 服务地址 addr := "localhost:9090" ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() conn, err := grpc.DialContext(ctx, addr, gopts...) if err != nil { log.Fatalln(err) } client := clusterservice.NewClusterDiscoveryServiceClient(conn) req := &discovery.DiscoveryRequest{ Node: &envoy_config_core_v3.Node{ Id: "test1", }, } rsp, err := client.FetchClusters(context.Background(), req) if err != nil { log.Fatalln(err) } getResource := rsp.GetResources()[0] cluster := &envoy_config_cluster_v3.Cluster{} err = getResource.UnmarshalTo(cluster) if err != nil { log.Fatalln(err) } fmt.Println(cluster) }
-
代码测试, 请自行复制代码测试, 按着结构复制进行 go mod init 及 go mod tidy 即可.
* 启动 xds server # go run cmd/main.go * 启动 xds client # go run cmd/client.go
3. 结合 Envoy 代理使用 XDS 进行服务发现
- 我们需要编译一下 main.go 为 control-panel
# go build -o control-panel cmd/main.go
- 最好部署 2 个 nginx 容器, 方便我们请求 control-panel 的 /test URI 进行代理 IP 的切换;
- 我们需要启动 一个 envoy 代理, 这里我给出 envoy 正确的 XDS 配置, 启动方式参照上方的docker 启动方式;
admin: address: socket_address: { address: 0.0.0.0, port_value: 9901 } node: # 节点名称 cluster: mynode # 唯一标识, 服务发现时通过唯一标识去识别节点 id: test1 dynamic_resources: cds_config: # 使用的资源版本号 resource_api_version: V3 api_config_source: # 连接类型 api_type: GRPC transport_api_version: V3 refresh_delay: 5s # grpc 地址 grpc_services: - envoy_grpc: cluster_name: xds_cluster lds_config: resource_api_version: V3 api_config_source: api_type: GRPC transport_api_version: V3 grpc_services: - envoy_grpc: cluster_name: xds_cluster # 通过静态配置一个 xds_cluster 服务用于做控制面 static_resources: clusters: - connect_timeout: 1s load_assignment: cluster_name: xds_cluster endpoints: - lb_endpoints: - endpoint: address: socket_address: # control-plane 服务地址 address: 10.0.12.2 port_value: 9090 http2_protocol_options: {} name: xds_cluster
4. 写在最后
- 最近大家都在争做最后的打工人, 被公司的政治斗争缠绕, 抢活都抢到基础架构部门了. 确实心累, 上个月工作和生活上的变故脱更了一个月, 坚持更新又快一年了.
- 下篇更新 用 golang 编写 wasm.
- 下个季度应该会连续, 如何通过编写 ApiServer 代理进行多集群代理, 多集群权限管理及如何通过代理一次性查询所有集群的资源和多集群资源分发;
- 需要的朋友请 点赞👍 关注➕
转载自:https://juejin.cn/post/7270028440036065335