likes
comments
collection
share

我发现了同事写的retry函数真的好优雅

作者站长头像
站长
· 阅读数 17

写在前面:偶然发现了一段关于retry函数的优雅实现方法的代码,读下来觉得有许多值得学习的地方,特此记录下来。

retry函数简介

retry函数在日常的研发中用到的频率特别高,特别是连接kube-apiservert进行k8s资源的增删改查时,常常会因为网络等各种各样的问题而操作失败,因此我们一般都会在限定时间内进行限定次数的重试操作(比如15s内重试3次,每5s重试一次)。

通常我们的做法是考虑封装一个retry函数,用来执行重试操作。

如下所示:

  • 我们封装一个函数类型的结构体ExecutionFunc,用来作为我们retry函数的参数。

  • 在retry函数内部,我们调用"k8s.io/apimachinery/pkg/util/wait"的ConditionFunc函数,该函数的作用解释如下ConditionFunc returns true if the condition is satisfied, or an error if the loop should be aborted.我们在其设定退出的条件为execFunc执行成功或者重试次数达到3次,这样子可以避免一直处于重试状态。

// ExecutionFunc defines the execution function
type ExecutionFunc func() (err error)

// ExecuteFuncWithRetry will exec given function with retry
func ExecuteFuncWithRetry(execFunc ExecutionFunc) error {
   if execFunc == nil {
      return errors.New("execution func can't be nil")
   }

   var lastErr error
   retryCount, execFuncName := 0, runtime.FuncForPC(reflect.ValueOf(execFunc).Pointer()).Name()
   wrapConditionFunc := wait.ConditionFunc(func() (done bool, err error) {
      if retryCount >= constant.DefaultMaxRetries {
         return true, fmt.Errorf("failed to execFunc func %s after %d retries", execFuncName, constant.DefaultMaxRetries)
      }

      if err := execFunc(); err != nil {
         log.WithError(err).Error(fmt.Sprintf("failed to execFunc func %s", execFuncName))
         lastErr = err
         if apierrors.IsNotFound(err) || apierrors.IsAlreadyExists(err) || apierrors.IsConflict(err) {
            return true, err
         }
         retryCount++
         return false, nil
      }
      return true, nil
   })

   if err := wait.PollImmediate(constant.DefaultInterval, constant.DefaultTotalTimeout, wrapConditionFunc); err != nil {
      log.WithError(err).Error(fmt.Sprintf("failed to execFunc func %s", runtime.FuncForPC(reflect.ValueOf(execFunc).Pointer()).Name()))
      return lastErr
   }

   return nil
}

在封装完重试函数之后,我们可以封装针对k8s资源增删改查的相关函数,如下所示:

Create k8s资源对象

// CreateObjectWithRetry will create given kubernetes resource with retry
func CreateObjectWithRetry(ctx context.Context, kubeClient client.Client, object client.Object, resourceName string) error {
   err := retry.ExecuteFuncWithRetry(func() (err error) {
      return kubeClient.Create(ctx, object)
   })

   if err != nil {
      log.WithContext(ctx).WithError(err).Error(fmt.Sprintf("failed to create resource(%s/%s/%s)", resourceName, object.GetNamespace(), object.GetName()))
      return err
   }
   
   return nil
}

Delete k8s资源对象

// DeleteObjectWithRetry will delete given kubernetes resource with retry
func DeleteObjectWithRetry(ctx context.Context, kubeclient client.Client, object client.Object, resourceName string, opts ...client.DeleteOption) error {
   err := retry.ExecuteFuncWithRetry(func() (err error) {
      return kubeclient.Delete(ctx, object, opts...)
   })

   if err != nil {
      log.Error(ctx, fmt.Sprintf("failed to delete resource(%s/%s/%s): %v", resourceName, object.GetNamespace(), object.GetName(), err))
      return err
   }

   return nil
}

Update k8s资源对象

// UpdateObjectWithRetry will update given kubernetes resource with retry
func UpdateObjectWithRetry(ctx context.Context, kubeClient client.Client, object client.Object, resourceName string) error {
   err := retry.ExecuteFuncWithRetry(func() (err error) {
      return kubeClient.Update(ctx, object)
   })

   if err != nil {
      log.Error(fmt.Sprintf("failed to update resource(%s/%s/%s): %v", resourceName, object.GetNamespace(), object.GetName(), err))
      return err
   }

   return nil
}

Get k8s资源对象

// GetObjectWithRetry will get given kubernetes resource with retry
func GetObjectWithRetry(ctx context.Context, kubeclient client.Client, key client.ObjectKey, object client.Object, resourceName string) error {
   err := retry.ExecuteFuncWithRetry(func() (err error) {
      return kubeclient.Get(ctx, key, object)
   })

   if err != nil {
      log.Error(ctx, fmt.Sprintf("failed to get resource(%s/%s/%s): %v", resourceName, key.Namespace, key.Name, err))
      return err
   }

   return nil
}

List k8s资源对象

// ListObjectsWithRetry will list given object with retry.
func ListObjectsWithRetry(ctx context.Context, kubeclient client.Client, objectList client.ObjectList, resourceName string, opts ...client.ListOption) error {
   err := retry.ExecuteFuncWithRetry(func() (err error) {
      return kubeclient.List(ctx, objectList, opts...)
   })

   if err != nil {
      log.Error(ctx, fmt.Sprintf("failed to list resource(%s): %v", resourceName, err))
      return err
   }

   return nil
}

Merge Patch k8s资源对象

// PatchObjectWithRetry will patch given kubernetes resource with retry
func PatchObjectWithRetry(ctx context.Context, kubeClient client.Client, patchObject client.Object, currentObject client.Object, resourceName string) error {
   err := retry.ExecuteFuncWithRetry(func() (err error) {
      return kubeClient.Patch(ctx, patchObject, client.MergeFrom(currentObject))
   })

   if err != nil {
      log.Error(fmt.Sprintf("failed to patch resource(%s/%s/%s): %v", resourceName, currentObject.GetNamespace(), currentObject.GetName(), err))
      return err
   }
   
   return nil
}

Json Patch k8s资源对象

// JsonPatchObjectWithRetry will patch given kubernetes resource with retry
func JsonPatchObjectWithRetry(ctx context.Context, kubeClient client.Client, patchObject client.Object, patchData []byte, resourceName string) error {
   err := retry.ExecuteFuncWithRetry(func() (err error) {
      return kubeClient.Patch(ctx, patchObject, client.RawPatch(types.JSONPatchType, patchData))
   })

   if err != nil {
      log.Error(fmt.Sprintf("failed to patch resource(%s/%s/%s): %v", resourceName, patchObject.GetNamespace(), patchObject.GetName(), err))
      return err
   }

   return nil
}