likes
comments
collection
share

【Kubernetes】由sgp引申的controller-runtime 客户端分析

作者站长头像
站长
· 阅读数 13

1. 前言

接上篇sgp失效分析,由于amazon-vpc-resource-controller-k8s使用的客户端是由controller-run提供的,所以今天就对controller-runtime 客户端进行简单分析。

2. ControllerRuntimeClient

Operator SDK和Kuberbuilder都是以controller-runtime项目为基础的,这种客户端是单一实例,可用于处理任何在指定Scheme中注册成的Kind,这与Dynamic-client类似。它使用APIServer提供的服务发现信息来把不同的Kind映射到不同的HTTP路径。Golang类型转换为GVK、GVR和HTTP路径的关系如下图所示:

【Kubernetes】由sgp引申的controller-runtime 客户端分析

我们以一个demo把整个流程串一下

  • 16-26行代码主要是构造rest.Config类型的config,首先从命令行可选参数去读取配置文件,如果没有;会调用rest.InClusterConfig()获取,该方法适合InCluster方式,也就是跑在Kubernetes的pod中;
  • 28-29行初始化了一个Scheme,并把Kubernetes的核心资源(也就是/api路径下的资源)注册进scheme;
  • 33-41行构建Client接口类型的客户端,client类型实现了Client接口; 28-29的Scheme以及现在Mapper都用来填充client的Option,Mapper是如何进行优化的,后文就详细说明;
type Client interface {
    Reader
    Writer
    StatusClient
    SubResourceClientConstructor
    Scheme() *runtime.Scheme
    RESTMapper() meta.RESTMapper
    GroupVersionKindFor(obj runtime.Object) (schema.GroupVersionKind, error)
    IsObjectNamespaced(obj runtime.Object) (bool, error)
}


type client struct {
typedClient        typedClient
unstructuredClient unstructuredClient
metadataClient     metadataClient
scheme             \*runtime.Scheme
mapper             meta.RESTMapper

cache             Reader
uncachedGVKs      map\[schema.GroupVersionKind]struct{}
cacheUnstructured bool
}

  • 43-58行利用上文的config和option New一个新的client,并list出这个集群所有的Pod。

err = k8sClient.List(context.Background(), podList)这行代码在内部实现中,它使用指定的Scheme把*corev1.PodList这个Go类型转映射到GVK上,然后List()方法使用服务发现信息来获取Pod的GVR,即scheme.GroupVersionResource{"", "v1", "pods"},然后就可以通过访问/api/v1/pods获取所有命名空间下的pod。

3. RestMapper源码浅酌

上文中controller-runtime的issuer主要是针对使用服务发现信息来获取Pod的GVR这部分的优化,接下来我们就走读下这部分的代码,代码位于sigs.k8s.io/controllerruntime@v0.15.0/pkg/client/apiutil/restmapper.go 这个文件中。

上面demo中调用了NewDynamicRESTMapper这个方法来生成实现RESTMapper接口类型的mapper, 获取GVR主要是通过调用ResourceFor或者ResourcesFor来实现的,而它们的核心都是通过 addKnownGroupAndReload这个方法去实现的

type RESTMapper interface {
    KindFor(resource schema.GroupVersionResource) (schema.GroupVersionKind, error)
    KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error)
    ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error)
    ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error)
    RESTMapping(gk schema.GroupKind, versions ...string) (*RESTMapping, error)
    RESTMappings(gk schema.GroupKind, versions ...string) ([]*RESTMapping, error)
    ResourceSingularizer(resource string) (singular string, err error)
}

type mapper struct {
   mapper      meta.RESTMapper
   client      *discovery.DiscoveryClient
   knownGroups map[string]*restmapper.APIGroupResources
   apiGroups   map[string]*metav1.APIGroup

   // mutex to provide thread-safe mapper reloading.
   mu sync.RWMutex
}

addKnownGroupAndReload代码如下


func (m *mapper) addKnownGroupAndReload(groupName string, versions ...string) error {
   // If no specific versions are set by user, we will scan all available ones for the API group.
   // This operation requires 2 requests: /api and /apis, but only once. For all subsequent calls
   // this data will be taken from cache.
   if len(versions) == 0 {
      apiGroup, err := m.findAPIGroupByName(groupName)
      if err != nil {
         return err
      }
      for _, version := range apiGroup.Versions {
         versions = append(versions, version.Version)
      }
   }

   m.mu.Lock()
   defer m.mu.Unlock()

   // Create or fetch group resources from cache.
   groupResources := &restmapper.APIGroupResources{
      Group:              metav1.APIGroup{Name: groupName},
      VersionedResources: make(map[string][]metav1.APIResource),
   }
   if _, ok := m.knownGroups[groupName]; ok {
      groupResources = m.knownGroups[groupName]
   }

   // Update information for group resources about versioned resources.
   // The number of API calls is equal to the number of versions: /apis/<group>/<version>.
   groupVersionResources, err := m.fetchGroupVersionResources(groupName, versions...)
   if err != nil {
      return fmt.Errorf("failed to get API group resources: %w", err)
   }
   for version, resources := range groupVersionResources {
      groupResources.VersionedResources[version.Version] = resources.APIResources
   }

   // Update information for group resources about the API group by adding new versions.
   // Ignore the versions that are already registered.
   for _, version := range versions {
      found := false
      for _, v := range groupResources.Group.Versions {
         if v.Version == version {
            found = true
            break
         }
      }

      if !found {
         groupResources.Group.Versions = append(groupResources.Group.Versions, metav1.GroupVersionForDiscovery{
            GroupVersion: metav1.GroupVersion{Group: groupName, Version: version}.String(),
            Version:      version,
         })
      }
   }

   // Update data in the cache.
   m.knownGroups[groupName] = groupResources

   // Finally, update the group with received information and regenerate the mapper.
   updatedGroupResources := make([]*restmapper.APIGroupResources, 0, len(m.knownGroups))
   for _, agr := range m.knownGroups {
      updatedGroupResources = append(updatedGroupResources, agr)
   }

   m.mapper = restmapper.NewDiscoveryRESTMapper(updatedGroupResources)
   return nil
}

  • 通过入参数我们知道可以指定version达到部分更新的目的,如果version没有指定则会遍历这个Group下所有的version;

  • 判断该group是否在m.knownGroups的map中,如果存在直接通过map获取groupResource;

  • 然后通过调用m.fetchGroupVersionResources(groupName, versions...)去填充 groupResources的VersionedResources字段,更新组资源有关版本控制资源的信息,它调用api的次数和version的数量相当;

  • 通过添加新版本更新有关API组的组资源信息,忽略已经注册的版本;

  • 更新缓存中的数据;

  • 最后利用接受的信息更新group并重新生成Mapper。