likes
comments
collection
share

从KOOM看线程泄漏监控

作者站长头像
站长
· 阅读数 17

前面提到KOOM包含三个主要模块,其中koom-thread-leak 模块用于 Thread 泄漏监控:它会 hook 线程的生命周期函数,周期性的上报泄漏线程信息。接下来我们看下koom-thread-leak模块的实现:

与koom-java-leak,koom-native-leak类似,koom-thread-leak入口类为ThreadMonitor,我们可以通过ThreadMonitor.startTrackAsync()启动监控,通过MonitorManager.addConfig为其添加统一配置。

ThreadMonitor.startTrackAsync

 fun startTrackAsync() {
   getLoopHandler().postAtFrontOfQueue {
     startTrack()
   }
 }
 ​
 fun startTrack() {
     // Native初始化
     if (handleNativeInit()) {
       mIsRunning = true
       startLoop(clearQueue = true, postAtFront = false, delayMillis = monitorConfig.startDelay)
     }
 }
 ​
 override fun call(): LoopState {
   // 检查Thread泄漏
   handleThreadLeak()
   return LoopState.Continue
 }
 ​
 private fun handleThreadLeak() {
   NativeHandler.refresh()
 }
 ​
 private fun handleNativeInit(): Boolean {
   // 只支持android P以上,android R以下
   if (Build.VERSION.SDK_INT <= Build.VERSION_CODES.O || Build.VERSION.SDK_INT > Build
           .VERSION_CODES.R) {
     monitorConfig.listener?.onError("not support P below or R above now!")
     return false
   }
   // 只支持64位
   if (!isArm64()) {
     monitorConfig.listener?.onError("support arm64 only!")
     return false
   }
   // 加载koom-thread.so
   if (loadSoQuietly("koom-thread")) {
     MonitorLog.i(TAG, "loadLibrary success")
   } else {
     monitorConfig.listener?.onError("loadLibrary fail")
     return false
   }
   if (monitorConfig.disableNativeStack) {
     NativeHandler.disableNativeStack()
   }
   if (monitorConfig.disableJavaStack) {
     NativeHandler.disableJavaStack()
   }
   if (monitorConfig.enableNativeLog) {
     NativeHandler.enableNativeLog()
   }
   NativeHandler.setThreadLeakDelay(monitorConfig.threadLeakDelay)
   // 启动泄漏检测
   NativeHandler.start()
   MonitorLog.i(TAG, "init finish")
   return true
 }

NativeHandler.start

 JNIEXPORT void JNICALL
 Java_com_kwai_performance_overhead_thread_monitor_NativeHandler_start(
     JNIEnv *env, jclass obj) {
   koom::Log::info("koom-thread", "start");
   koom::Start();
 }
 void Start() {
   if (isRunning) {
     return;
   }
   // 初始化数据
   delete sHookLooper;
   sHookLooper = new HookLooper();
   koom::ThreadHooker::Start();
   isRunning = true;
 }

初始化HookLooper

 namespace koom {
 const char *looper_tag = "koom-hook-looper";
 HookLooper::HookLooper() : looper() { this->holder = new koom::ThreadHolder(); }
 HookLooper::~HookLooper() { delete this->holder; }
 void HookLooper::handle(int what, void *data) {
   looper::handle(what, data);
   switch (what) {
     case ACTION_ADD_THREAD: {
       koom::Log::info(looper_tag, "AddThread");
       auto info = static_cast<HookAddInfo *>(data);
       holder->AddThread(info->tid, info->pthread, info->is_thread_detached,
                         info->time, info->create_arg);
       delete info;
       break;
     }
     case ACTION_JOIN_THREAD: {
       koom::Log::info(looper_tag, "JoinThread");
       auto info = static_cast<HookInfo *>(data);
       holder->JoinThread(info->thread_id);
       delete info;
       break;
     }
     case ACTION_DETACH_THREAD: {
       koom::Log::info(looper_tag, "DetachThread");
       auto info = static_cast<HookInfo *>(data);
       holder->DetachThread(info->thread_id);
       delete info;
       break;
     }
     case ACTION_EXIT_THREAD: {
       koom::Log::info(looper_tag, "ExitThread");
       auto info = static_cast<HookExitInfo *>(data);
       holder->ExitThread(info->thread_id, info->threadName, info->time);
       delete info;
       break;
     }
     case ACTION_REFRESH: {
       koom::Log::info(looper_tag, "Refresh");
       auto info = static_cast<SimpleHookInfo *>(data);
       holder->ReportThreadLeak(info->time);
       delete info;
       break;
     }
     default: {
     }
   }
 }
 void HookLooper::post(int what, void *data) { looper::post(what, data); }
 }  // namespace koom

从HookLooper代码中可以看出,HookLooper关联ThreadHolder对象,当接收到消息时调用ThreadHolder对象的相关能力响应消息,例如接收到AddThread消息则调用ThreadHolder的AddThread方法收集线程信息。

ThreadHooker::Start

 void ThreadHooker::Start() { ThreadHooker::InitHook(); }
 void ThreadHooker::InitHook() {
   koom::Log::info(thread_tag, "HookSo init hook");
   std::set<std::string> libs;
   DlopenCb::GetInstance().GetLoadedLibs(libs);
   HookLibs(libs, Constant::kDlopenSourceInit);
   DlopenCb::GetInstance().AddCallback(DlopenCallback);
 }
 void ThreadHooker::HookLibs(std::set<std::string> &libs, int source) {
   koom::Log::info(thread_tag, "HookSo lib size %d", libs.size());
   if (libs.empty()) {
     return;
   }
   bool hooked = false;
   pthread_mutex_lock(&DlopenCb::hook_mutex);
   xhook_clear();
   for (const auto &lib : libs) {
     hooked |= ThreadHooker::RegisterSo(lib, source);
   }
   if (hooked) {
     int result = xhook_refresh(0);
     koom::Log::info(thread_tag, "HookSo lib Refresh result %d", result);
   }
   pthread_mutex_unlock(&DlopenCb::hook_mutex);
 }
 ​
 bool ThreadHooker::RegisterSo(const std::string &lib, int source) {
   if (IsLibIgnored(lib)) {
     return false;
   }
   auto lib_ctr = lib.c_str();
   koom::Log::info(thread_tag, "HookSo %d %s", source, lib_ctr);
   xhook_register(lib_ctr, "pthread_create",
                  reinterpret_cast<void *>(HookThreadCreate), nullptr);
   xhook_register(lib_ctr, "pthread_detach",
                  reinterpret_cast<void *>(HookThreadDetach), nullptr);
   xhook_register(lib_ctr, "pthread_join",
                  reinterpret_cast<void *>(HookThreadJoin), nullptr);
   xhook_register(lib_ctr, "pthread_exit",
                  reinterpret_cast<void *>(HookThreadExit), nullptr);
 ​
   return true;
 }

通过代码可以看到,在ThreadHooker::Start方法中,最终是通过xhook hook pthread_create,pthread_detach,pthread_join,pthread_exit这四个线程操作的核心方法,而这里的四个方法也与HookLooper中的四种消息对应。

NativeHandler.refresh()

 JNIEXPORT void JNICALL
 Java_com_kwai_performance_overhead_thread_monitor_NativeHandler_refresh(
     JNIEnv *env, jclass obj) {
   koom::Refresh();
 }
void Refresh() {
  auto info = new SimpleHookInfo(Util::CurrentTimeNs());
  sHookLooper->post(ACTION_REFRESH, info);
}
case ACTION_REFRESH: {
  koom::Log::info(looper_tag, "Refresh");
  auto info = static_cast<SimpleHookInfo *>(data);
  holder->ReportThreadLeak(info->time);
  delete info;
  break;
}
void ThreadHolder::ReportThreadLeak(long long time) {
  int needReport{};
  const char *type = "detach_leak";
  auto delay = threadLeakDelay * 1000000LL;  // ms -> ns
  rapidjson::StringBuffer jsonBuf;
  rapidjson::Writer<rapidjson::StringBuffer> writer(jsonBuf);
  writer.StartObject();

  writer.Key("leakType");
  writer.String(type);

  writer.Key("threads");
  writer.StartArray();

  for (auto &item : leakThreadMap) {
    if (item.second.exitTime + delay < time && !item.second.thread_reported) {
      koom::Log::info(holder_tag, "ReportThreadLeak %ld, %ld, %ld",
                      item.second.exitTime, time, delay);
      needReport++;
      item.second.thread_reported = true;
      WriteThreadJson(writer, item.second);
    }
  }
  writer.EndArray();
  writer.EndObject();
  koom::Log::info(holder_tag, "ReportThreadLeak %d", needReport);
  if (needReport) {
    JavaCallback(jsonBuf.GetString());
    // clean up
    auto it = leakThreadMap.begin();
    for (; it != leakThreadMap.end();) {
      if (it->second.thread_reported) {
        leakThreadMap.erase(it++);
      } else {
        it++;
      }
    }
  }
}

可以看到最终是将leakThreadMap中包含的线程信息写入json文件中,最后将json文件回调到java侧。

线程泄漏判定(leakThreadMap生成)

void ThreadHolder::JoinThread(pthread_t threadId) {
  bool valid = threadMap.count(threadId) > 0;
  koom::Log::info(holder_tag, "JoinThread tid:%p", threadId);
  if (valid) {
    threadMap[threadId].thread_detached = true;
  } else {
    leakThreadMap.erase(threadId);
  }
}

void ThreadHolder::ExitThread(pthread_t threadId, std::string &threadName,
                              long long int time) {
  bool valid = threadMap.count(threadId) > 0;
  if (!valid) return;
  auto &item = threadMap[threadId];
  koom::Log::info(holder_tag, "ExitThread tid:%p name:%s", threadId,
                  item.name.c_str());

  item.exitTime = time;
  item.name.assign(threadName);
  // 如果线程退出时,仍然没有detach,则表示线程泄漏了
  if (!item.thread_detached) {
    // 泄露了
    koom::Log::error(holder_tag,
                     "Exited thread Leak! Not joined or detached!\n tid:%p",
                     threadId);
    // 检测到线程泄漏,添加到leakThreadMap中
    leakThreadMap[threadId] = item;
  }
  threadMap.erase(threadId);
  koom::Log::info(holder_tag, "ExitThread finish");
}

void ThreadHolder::DetachThread(pthread_t threadId) {
  bool valid = threadMap.count(threadId) > 0;
  koom::Log::info(holder_tag, "DetachThread tid:%p", threadId);
  if (valid) {
    threadMap[threadId].thread_detached = true;
  } else {
    leakThreadMap.erase(threadId);
  }
}

从代码可以看出,在线程detach和join时,会判断线程状态,将其设置为detach=true的状态,也就意味着针对一个线程而言,如果其没有执行detach或者join直接执行exit则会判定为线程泄漏。

  • pthread有两种状态joinable状态(属性)和unjoinable状态,如果线程是joinable状态,当线程函数自己返回退出时或pthread_exit时都不会释放线程所占用堆栈和线程描述符。只有当你调用了pthread_join之后这些资源才会被释放。若是unjoinable状态的线程,这些资源在线程函数退出时或pthread_exit时自动会被释放。
  • unjoinable属性可以在pthread_create时指定,或在线程创建后在线程中pthread_detach自己, 如:pthread_detach(pthread_self()),将状态改为unjoinable状态,确保资源的释放。或者将线程置为 joinable,然后适时调用pthread_join.
  • 其实简单的说就是在线程函数头加上 pthread_detach(pthread_self())的话,线程状态改变,在函数尾部直接 pthread_exit线程就会自动退出。省去了给线程擦屁股的麻烦。
  • pthread_exit实际就类似于进程的exit,线程会直接退出, 而其资源不会释放.