【Kubernetes】由sgp引申的controller-runtime 客户端分析
1. 前言
接上篇sgp失效分析,由于amazon-vpc-resource-controller-k8s使用的客户端是由controller-run提供的,所以今天就对controller-runtime 客户端进行简单分析。
2. ControllerRuntimeClient
Operator SDK和Kuberbuilder都是以controller-runtime项目为基础的,这种客户端是单一实例,可用于处理任何在指定Scheme中注册成的Kind,这与Dynamic-client类似。它使用APIServer提供的服务发现信息来把不同的Kind映射到不同的HTTP路径。Golang类型转换为GVK、GVR和HTTP路径的关系如下图所示:
我们以一个demo把整个流程串一下
- 16-26行代码主要是构造rest.Config类型的config,首先从命令行可选参数去读取配置文件,如果没有;会调用rest.InClusterConfig()获取,该方法适合InCluster方式,也就是跑在Kubernetes的pod中;
- 28-29行初始化了一个Scheme,并把Kubernetes的核心资源(也就是/api路径下的资源)注册进scheme;
- 33-41行构建Client接口类型的客户端,client类型实现了Client接口; 28-29的Scheme以及现在Mapper都用来填充client的Option,Mapper是如何进行优化的,后文就详细说明;
type Client interface {
Reader
Writer
StatusClient
SubResourceClientConstructor
Scheme() *runtime.Scheme
RESTMapper() meta.RESTMapper
GroupVersionKindFor(obj runtime.Object) (schema.GroupVersionKind, error)
IsObjectNamespaced(obj runtime.Object) (bool, error)
}
type client struct {
typedClient typedClient
unstructuredClient unstructuredClient
metadataClient metadataClient
scheme \*runtime.Scheme
mapper meta.RESTMapper
cache Reader
uncachedGVKs map\[schema.GroupVersionKind]struct{}
cacheUnstructured bool
}
- 43-58行利用上文的config和option New一个新的client,并list出这个集群所有的Pod。
err = k8sClient.List(context.Background(), podList)这行代码在内部实现中,它使用指定的Scheme把*corev1.PodList这个Go类型转映射到GVK上,然后List()方法使用服务发现信息来获取Pod的GVR,即scheme.GroupVersionResource{"", "v1", "pods"},然后就可以通过访问/api/v1/pods获取所有命名空间下的pod。
3. RestMapper源码浅酌
上文中controller-runtime的issuer主要是针对使用服务发现信息来获取Pod的GVR这部分的优化,接下来我们就走读下这部分的代码,代码位于sigs.k8s.io/controllerruntime@v0.15.0/pkg/client/apiutil/restmapper.go 这个文件中。
上面demo中调用了NewDynamicRESTMapper这个方法来生成实现RESTMapper接口类型的mapper, 获取GVR主要是通过调用ResourceFor或者ResourcesFor来实现的,而它们的核心都是通过 addKnownGroupAndReload这个方法去实现的
type RESTMapper interface {
KindFor(resource schema.GroupVersionResource) (schema.GroupVersionKind, error)
KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error)
ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error)
ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error)
RESTMapping(gk schema.GroupKind, versions ...string) (*RESTMapping, error)
RESTMappings(gk schema.GroupKind, versions ...string) ([]*RESTMapping, error)
ResourceSingularizer(resource string) (singular string, err error)
}
type mapper struct {
mapper meta.RESTMapper
client *discovery.DiscoveryClient
knownGroups map[string]*restmapper.APIGroupResources
apiGroups map[string]*metav1.APIGroup
// mutex to provide thread-safe mapper reloading.
mu sync.RWMutex
}
addKnownGroupAndReload代码如下
func (m *mapper) addKnownGroupAndReload(groupName string, versions ...string) error {
// If no specific versions are set by user, we will scan all available ones for the API group.
// This operation requires 2 requests: /api and /apis, but only once. For all subsequent calls
// this data will be taken from cache.
if len(versions) == 0 {
apiGroup, err := m.findAPIGroupByName(groupName)
if err != nil {
return err
}
for _, version := range apiGroup.Versions {
versions = append(versions, version.Version)
}
}
m.mu.Lock()
defer m.mu.Unlock()
// Create or fetch group resources from cache.
groupResources := &restmapper.APIGroupResources{
Group: metav1.APIGroup{Name: groupName},
VersionedResources: make(map[string][]metav1.APIResource),
}
if _, ok := m.knownGroups[groupName]; ok {
groupResources = m.knownGroups[groupName]
}
// Update information for group resources about versioned resources.
// The number of API calls is equal to the number of versions: /apis/<group>/<version>.
groupVersionResources, err := m.fetchGroupVersionResources(groupName, versions...)
if err != nil {
return fmt.Errorf("failed to get API group resources: %w", err)
}
for version, resources := range groupVersionResources {
groupResources.VersionedResources[version.Version] = resources.APIResources
}
// Update information for group resources about the API group by adding new versions.
// Ignore the versions that are already registered.
for _, version := range versions {
found := false
for _, v := range groupResources.Group.Versions {
if v.Version == version {
found = true
break
}
}
if !found {
groupResources.Group.Versions = append(groupResources.Group.Versions, metav1.GroupVersionForDiscovery{
GroupVersion: metav1.GroupVersion{Group: groupName, Version: version}.String(),
Version: version,
})
}
}
// Update data in the cache.
m.knownGroups[groupName] = groupResources
// Finally, update the group with received information and regenerate the mapper.
updatedGroupResources := make([]*restmapper.APIGroupResources, 0, len(m.knownGroups))
for _, agr := range m.knownGroups {
updatedGroupResources = append(updatedGroupResources, agr)
}
m.mapper = restmapper.NewDiscoveryRESTMapper(updatedGroupResources)
return nil
}
-
通过入参数我们知道可以指定version达到部分更新的目的,如果version没有指定则会遍历这个Group下所有的version;
-
判断该group是否在m.knownGroups的map中,如果存在直接通过map获取groupResource;
-
然后通过调用m.fetchGroupVersionResources(groupName, versions...)去填充 groupResources的VersionedResources字段,更新组资源有关版本控制资源的信息,它调用api的次数和version的数量相当;
-
通过添加新版本更新有关API组的组资源信息,忽略已经注册的版本;
-
更新缓存中的数据;
-
最后利用接受的信息更新group并重新生成Mapper。
转载自:https://juejin.cn/post/7238431321869254711