likes
comments
collection
share

Android性能优化系列-腾讯matrix-流量监控之TrafficPlugin源码分析

作者站长头像
站长
· 阅读数 14

前言

本篇进行matrix框架的网络流量监控模块的代码分析。你可能想,为什么需要对流量进行监控呢?我们平常进行的网络接口请求都是一些必要的操作,监控它的意义何在?首先我们要明确流量监控的对象是什么,是上行(发请求消耗的流量)和下行(接收到服务器返回的数据流量)这两块消耗的用户流量。通过这个监控,我们可以清晰的看到每个接口在每次调用时所消耗的流量的具体值,有了这个数据之后,我们可以从两个维度来分析流量问题。第一,明确单次接口请求是否存在过多消耗流量的情况,从而促进网络数据包的体积优化;第二,从多次请求的维度来看,也能帮助我们定位是否存在单个接口请求数量异常的问题,从而定位代码存在的业务逻辑问题。笔者在万能钥匙的时候,就曾遇到过类似的情况,某接口在应用启动阶段频繁调用,导致用户流量消耗过多被用户投诉的问题。试想假如当时做了流量监控,那么对开发团队来说,就可以更高效的定位到问题所在。

言归正传,我们进入今天的代码分析,分析的对象时matrix中的TrafficPlugin,我们从它的几个关键方法入手。

  • 静态代码块
  • start
  • stop

静态代码块

static {
    System.loadLibrary("matrix-traffic");
}

根据加载的名称matrix-traffic找到MatrixTraffic.cc这个c++的class,loadLibrary方法执行的时候会进入到JNI_OnLoad方法,JNI_OnLoad方法通常用来做一些准备性的工作,用于后边c++层和Java层的一个互调,下面是一部分关键代码,可以看到,这里将Java层的TrafficPlugin保存为全局引用,并获取了它的setStackTrace方法备用,并动态注册了一些Java层到native层方法的映射关系。

JNIEXPORT jint JNICALL JNI_OnLoad(JavaVM *vm, void *) {
    jclass trafficCollectorCls = env->FindClass("com/tencent/matrix/traffic/TrafficPlugin");
    if (!trafficCollectorCls)
        return -1;
    //保存TrafficPlugin的jclass对象为全局引用
    gJ.TrafficPlugin = static_cast<jclass>(env->NewGlobalRef(trafficCollectorCls));
    //保存TrafficPlugin的setStackTrace方法为全局引用
    gJ.TrafficPlugin_setFdStackTrace =
            env->GetStaticMethodID(trafficCollectorCls, "setStackTrace", "(Ljava/lang/String;Ljava/lang/String;)V");
    //动态注册一些Java层方法到native方法的映射关系
    if (env->RegisterNatives(
            trafficCollectorCls, TRAFFIC_METHODS, static_cast<jint>(NELEM(TRAFFIC_METHODS))) != 0)
        return -1;
    return JNI_VERSION_1_6;
} 

start

通过nativeInitMatrixTraffic方法调用进入native层,根据上边JNI_OnLoad动态注册的映射关系找到MatrixTraffic.cc中的nativeInitMatrixTraffic方法。

@Override
public void start() {
    //这里可以设置需要过滤的so
    String[] ignoreSoFiles = trafficConfig.getIgnoreSoFiles();
    //进入native层
    nativeInitMatrixTraffic(trafficConfig.isRxCollectorEnable(), trafficConfig.isTxCollectorEnable(), trafficConfig.willDumpStackTrace(), trafficConfig.willDumpNativeBackTrace(), trafficConfig.willLookupIpAddress(), ignoreSoFiles);
}

MatrixTraffic.cc中的nativeInitMatrixTraffic方法。

static void nativeInitMatrixTraffic(JNIEnv *env, jclass, jboolean rxEnable, jboolean txEnable, jboolean dumpStackTrace, jboolean dumpNativeBackTrace, jboolean lookupIpAddress, jobjectArray ignoreSoFiles) {
    //启动loop循环线程
    TrafficCollector::startLoop(dumpStackTrace == JNI_TRUE, lookupIpAddress == JNI_TRUE);
    //是否dump native堆栈
    sDumpNativeBackTrace = (dumpNativeBackTrace == JNI_TRUE);
    //需要过滤的so
    ignoreSo(env, ignoreSoFiles);
    //通过hook socket实现对网络请求的拦截
    hookSocket(rxEnable == JNI_TRUE, txEnable == JNI_TRUE);
}

startLoop

首先startLoop启动循环线程

void TrafficCollector::startLoop(bool dumpStackTrace, bool lookupIpAddress) {
    thread loopThread(loop);
    loopThread.detach();
}

loop循环线程是作为一个消费者线程出现的,我们先跳过这个方法的具体实现,先把生产者生产数据的过程看一下。

void loop() {
    while (loopRunning) {
        if (msgQueue.empty()) {
            queueMutex.lock();
        } else {
            ...
        }
    }
}

hookSocket

网络请求最终都是通过底层socket进行发起的,所以通过hook socket的方式可以拦截到所有的网络请求,这里是用了plt hook的方式,什么是plt hook可以参考爱奇艺的xhook框架介绍。为了使代码更简洁,用...省略了部分代码。

static void hookSocket(bool rxHook, bool txHook) {
    //连接和关闭
    xhook_grouped_register(..., ".*\.so$", "connect",(void *) my_connect, (void **) (&original_connect));
    xhook_grouped_register(..., ".*\.so$", "close",(void *) my_close, (void **) (&original_close));
    //接收的数据监控
    if (rxHook) {
        xhook_grouped_register(..., ".*\.so$", "read",(void *) my_read, (void **) (&original_read));
        xhook_grouped_register(..., ".*\.so$", "recv",(void *) my_recv, (void **) (&original_recv));
        xhook_grouped_register(..., ".*\.so$", "recvfrom",(void *) my_recvfrom, (void **) (&original_recvfrom));
        xhook_grouped_register(..., ".*\.so$", "recvmsg",(void *) my_recvmsg, (void **) (&original_recvmsg));
    }
    //上传的数据监控
    if (txHook) {
        xhook_grouped_register(..., ".*\.so$", "write",(void *) my_write, (void **) (&original_write));
        xhook_grouped_register(..., ".*\.so$", "send",(void *) my_send, (void **) (&original_send));
        xhook_grouped_register(.., ".*\.so$", "sendto",(void *) my_sendto, (void **) (&original_sendto));
        xhook_grouped_register(.., ".*\.so$", "sendmsg",(void *) my_sendmsg, (void **) (&original_sendmsg));
    }
}

可以看到这里hook了socket的一些关键方法,这些方法被hook之后,当方法再次被调用的时候,我们就可以拦截到它的执行,从而做一些额外的处理。

  • connect
  • close
  • read
  • recv
  • recvfrom
  • recvmsg
  • write
  • send
  • sendto
  • sendmsg

connect

int my_connect(int fd, sockaddr *addr, socklen_t addr_length) {
    TrafficCollector::enQueueConnect(fd, addr, addr_length);
    return original_connect(fd, addr, addr_length);
}

通过调用TrafficCollector的enQueueConnect方法记录本次socket连接的信息,将MSG_TYPE_CONNECT类型,文件描述符,socket地址,调用栈等信息封装成TrafficMsg存入msgQueue队列。

void TrafficCollector::enQueueConnect(int fd, sockaddr *addr, socklen_t addr_length) {
    //将MSG_TYPE_CONNECT类型,文件描述符,socket地址,调用栈等信息封装成TrafficMsg存入msgQueue队列
    shared_ptr<TrafficMsg> msg = make_shared<TrafficMsg>(MSG_TYPE_CONNECT, fd, addr->sa_family, getKeyAndSaveStack(fd), 0);
    msgQueue.push(msg);
    queueMutex.unlock();
}

close

int my_close(int fd) {
    TrafficCollector::enQueueClose(fd);
    return original_close(fd);
}

通过调用TrafficCollector的enQueueClose方法记录本次socket关闭的信息,将MSG_TYPE_CLOSE类型,文件描述符封装成TrafficMsg存入msgQueue队列。

void TrafficCollector::enQueueClose(int fd) {
    shared_ptr<TrafficMsg> msg = make_shared<TrafficMsg>(MSG_TYPE_CLOSE, fd, 0, "", 0);
    msgQueue.push(msg);
    queueMutex.unlock();
}

read

ssize_t my_read(int fd, void *buf, size_t count) {
    ssize_t ret = original_read(fd, buf, count);
    TrafficCollector::enQueueRx(MSG_TYPE_READ, fd, ret);
    return ret;
}

type为MSG_TYPE_READ,调用TrafficCollector的enQueueRx方法。

void enQueueMsg(int type, int fd, size_t len) {
    shared_ptr<TrafficMsg> msg = make_shared<TrafficMsg>(type, fd, 0, getKeyAndSaveStack(fd), len);
    msgQueue.push(msg);
    queueMutex.unlock();
}

recv

ssize_t my_recv(int sockfd, void *buf, size_t len, int flags) {
    ssize_t ret = original_recv(sockfd, buf, len, flags);
    TrafficCollector::enQueueRx(MSG_TYPE_RECV, sockfd, ret);
    return ret;
}

type为MSG_TYPE_RECV,调用TrafficCollector的enQueueRx方法。

void enQueueMsg(int type, int fd, size_t len) {
    shared_ptr<TrafficMsg> msg = make_shared<TrafficMsg>(type, fd, 0, getKeyAndSaveStack(fd), len);
    msgQueue.push(msg);
    queueMutex.unlock();
}

recvfrom

ssize_t my_recvfrom(int sockfd, void *buf, size_t len, int flags,
                    struct sockaddr *src_addr, socklen_t *addrlen) {
    ssize_t ret = original_recvfrom(sockfd, buf, len, flags, src_addr, addrlen);
    TrafficCollector::enQueueRx(MSG_TYPE_RECVFROM, sockfd, ret);
    return ret;
}

type为MSG_TYPE_RECVFROM,调用TrafficCollector的enQueueRx方法。

recvmsg

ssize_t my_recvmsg(int sockfd, struct msghdr *msg, int flags) {
    ssize_t ret = original_recvmsg(sockfd, msg, flags);
    TrafficCollector::enQueueRx(MSG_TYPE_RECVMSG, sockfd, ret);
    return ret;
}

type为MSG_TYPE_RECVMSG,调用TrafficCollector的enQueueRx方法。

write

ssize_t my_write(int fd, const void *buf, size_t count) {
    ssize_t ret = original_write(fd, buf, count);
    TrafficCollector::enQueueTx(MSG_TYPE_WRITE, fd, ret);
    return ret;
}

type为MSG_TYPE_WRITE,调用TrafficCollector的enQueueRx方法。

send

ssize_t my_send(int sockfd, const void *buf, size_t len, int flags) {
    ssize_t ret = original_send(sockfd, buf, len, flags);
    TrafficCollector::enQueueTx(MSG_TYPE_SEND, sockfd, ret);
    return ret;
}

type为MSG_TYPE_SEND,调用TrafficCollector的enQueueRx方法。

sendto

ssize_t my_sendto(int sockfd, const void *buf, size_t len, int flags,
                  const struct sockaddr *dest_addr, socklen_t addrlen) {
    ssize_t ret = original_sendto(sockfd, buf, len, flags, dest_addr, addrlen);
    TrafficCollector::enQueueTx(MSG_TYPE_SENDTO, sockfd, ret);
    return ret;
}

type为MSG_TYPE_SENDTO,调用TrafficCollector的enQueueRx方法。

sendmsg

ssize_t my_sendmsg(int sockfd, const struct msghdr *msg, int flags) {
    ssize_t ret = original_sendmsg(sockfd, msg, flags);
    TrafficCollector::enQueueTx(MSG_TYPE_SENDMSG, sockfd, ret);
    return ret;
}

type为MSG_TYPE_SENDMSG,调用TrafficCollector的enQueueRx方法。

enQueueRx

可以看到上边除了connect和close方法外,其他都调用到了TrafficCollector的enQueueRx方法,我们看看这个方法做了什么。

void TrafficCollector::enQueueTx(int type, int fd, size_t len) {
    enQueueMsg(type, fd, len);
}
void enQueueMsg(int type, int fd, size_t len) {
    shared_ptr<TrafficMsg> msg = make_shared<TrafficMsg>(type, fd, 0, getKeyAndSaveStack(fd), len);
    msgQueue.push(msg);
    queueMutex.unlock();
}

enQueueRx方法在不断的封装TrafficMsg对象并存入队列msgQueue,每个方法间区别在于type的定义不同。所以到这里我们可以有一个简单的结论了: 被hook的这些方法,执行时会不断的获取socket此时的信息,封装成TrafficMsg对象存入队列供消费者线程进行消费,所以这里扮演的角色就是生产者线程。此时我们回头再去看loop线程。

loop

loop线程就是上边提到的消费者线程,消费线程不断的循环,当msgQueue中有数据时,就开始做进一步的处理。

void loop() {
    while (loopRunning) {
        if (msgQueue.empty()) {
            queueMutex.lock();
        } else {
            shared_ptr<TrafficMsg> msg = msgQueue.front();
            if (msg->type == MSG_TYPE_CONNECT) {
                //socket开始连接,以文件描述符为key, 地址为value存入fdFamilyMap中
                fdFamilyMap[msg->fd] = msg->sa_family;
            } else if (msg->type == MSG_TYPE_READ) {
                //接收数据,开始read,假如fdFamilyMap存在,这个fd,说明已连接过
                if (fdFamilyMap.count(msg->fd) > 0) {
                    appendRxTraffic(msg->threadName, msg->len);
                }
            } else if (msg->type >= MSG_TYPE_RECV && msg->type <= MSG_TYPE_RECVMSG) {
                //接收数据
                if (fdFamilyMap[msg->fd] != AF_LOCAL) {
                    appendRxTraffic(msg->threadName, msg->len);
                }
            } else if (msg->type == MSG_TYPE_WRITE) {
                //写入数据
                if (fdFamilyMap.count(msg->fd) > 0) {
                    appendTxTraffic(msg->threadName, msg->len);
                }
            } else if (msg->type >= MSG_TYPE_SEND && msg->type <= MSG_TYPE_SENDMSG) {
                //写入数据
                if (fdFamilyMap[msg->fd] != AF_LOCAL) {
                    appendTxTraffic(msg->threadName, msg->len);
                }
            } else if (msg->type == MSG_TYPE_CLOSE) {
                //关闭
                fdThreadNameMapLock.lock();
                fdThreadNameMap.erase(msg->fd);
                fdThreadNameMapLock.unlock();
                fdFamilyMap.erase(msg->fd);
            }
            msgQueue.pop();
        }
    }
}

从上边代码的注释可以看到,关键的两个方法是上传数据时调用的appendTxTraffic用来记录上行的数据流量,接收数据时调用的appendRxTraffic方法用来记录下行的数据流量,所以读或写的过程也就是不断的实时记录流量的过程。

appendTxTraffic

以线程名为key, 流量值为value存入txTrafficInfoMap中。

void appendTxTraffic(const string& threadName, long len) {
    txTrafficInfoMapLock.lock();
    txTrafficInfoMap[threadName] += len;
    txTrafficInfoMapLock.unlock();
}

appendRxTraffic

以线程名为key, 流量值为value存入rxTrafficInfoMap中。

void appendRxTraffic(const string& threadName, long len) {
    rxTrafficInfoMapLock.lock();
    rxTrafficInfoMap[threadName] += len;
    rxTrafficInfoMapLock.unlock();
}

看到这里感觉有点奇怪了,怎么只是将数据记录到对应的map中,什么时候取的数据?在TrafficPlugin.java中我们可以找到这个方法getTrafficInfoMap,它的返回值是HashMap,其实就是上边提到的存储起来的流量信息。

getTrafficInfoMap

public HashMap<String, String> getTrafficInfoMap(int type) {
    //进入native层的方法
    return nativeGetTrafficInfoMap(type);
}

来到TrafficCollector的getTrafficInfoMap方法。

static jobject nativeGetTrafficInfoMap(JNIEnv *env, jclass, jint type) {
    return TrafficCollector::getTrafficInfoMap(type);
}

可以看到,下面的逻辑很清晰,通过构造一个Java层的HashMap对象,并将指定类型的信息从c++层的map对象中转移到HashMap中,这样一来,就实时的拿到了当前流量消耗的数据,数据包含线程名和流量值,拿到线程名后可以通过getStackTraceMap拿到线程名和堆栈信息的映射关系,从而获取到实时的调用堆栈信息。

jobject TrafficCollector::getTrafficInfoMap(int type) {
    ...
    if (type == TYPE_GET_TRAFFIC_RX) {
        //接收的数据
        for (auto & it : rxTrafficInfoMap) {
            //线程名
            jstring threadName = env->NewStringUTF(it.first.c_str());
            //流量长度,是一个数值型
            jstring traffic = env->NewStringUTF(to_string(it.second).c_str());
            env->CallObjectMethod(jHashMap, mapPut, threadName, traffic);
        }
    } else if (type == TYPE_GET_TRAFFIC_TX) {
        //上传的数据
        for (auto & it : txTrafficInfoMap) {
            jstring threadName = env->NewStringUTF(it.first.c_str());
            jstring traffic = env->NewStringUTF(to_string(it.second).c_str());
            env->CallObjectMethod(jHashMap, mapPut, threadName, traffic);
        }
    }
    return jHashMap;
}

看一个效果图

Android性能优化系列-腾讯matrix-流量监控之TrafficPlugin源码分析

stop

stop方法做的就是清理资源的工作了,因为核心功能都在native层,所以清理的工作还是会进入native层做处理,第一跳出循环线程,第二清理内存中的map映射表,至此,流量监控的代码分析完成。

@Override
public void stop() {
    nativeReleaseMatrixTraffic();
}
static void nativeReleaseMatrixTraffic(JNIEnv *env, jclass) {
    //停止循环线程
    TrafficCollector::stopLoop();
    //清理所有的映射表
    TrafficCollector::clearTrafficInfo();
}

总结