sylar-from-scratch----协程模块Fiber
Fiber类
类成员变量:
/// 协程id
uint64_t m_id = 0;
/// 协程栈大小
uint32_t m_stacksize = 0;
/// 协程状态
State m_state = READY;
/// 协程上下文
ucontext_t m_ctx;
/// 协程栈地址
void *m_stack = nullptr;
/// 协程入口函数
std::function<void()> m_cb;
/// 本协程是否参与调度器调度
bool m_runInScheduler;
1、枚举协程状态
将协程状态转化,只定义了三种状态的转换。 不区分协程的初始状态,初始即READY,不区分协程的异常结束状态,只要结束就是TERM状态;也不区分HOLD状态,协程只要未结束也非运行态就是READY态。
enum State {
/// 就绪态,刚创建或者yield之后的状态
READY,
/// 运行态,resume之后的状态
RUNNING,
/// 结束态,协程的回调函数执行完之后为TERM状态
TERM
};
2、构造函数Fiber()
创建线程的第一个协程,也就是线程主函数对应的协程。首先设置当前运行协程,将其状态设置运行态,保存上下文信息,更新协程数与协程id,最后打印相应的日志信息。
Fiber::Fiber() {
SetThis(this);//设置当前运行协程(局部变量t_fiber)的值
m_state = RUNNING;//协程运行状态设为运行态
if (getcontext(&m_ctx)) {//将当前上下文信息保存在ucontext_t结构体m_ctx中
SYLAR_ASSERT2(false, "getcontext");//返回非零值,获取上下文失败,则调用断言宏,断言失败输出错误信息
}
++s_fiber_count;//协程数加1
m_id = s_fiber_id++; // 协程id从0开始,用完加1
SYLAR_LOG_DEBUG(g_logger) << "Fiber::Fiber() main id = " << m_id;
}
3、创建用户协程
Fiber(std::function<void()> cb, size_t stacksize = 0, bool run_in_scheduler = true);
用于创建主协程之外的其它协程,需要分配协程栈来保存协程的上下文信息,设置协程入口函数、栈大小、本协程是否参与调度器调度,默认为true ,为栈开辟内存,确定栈指针,更新协程数与协程id。
Fiber::Fiber(std::function<void()> cb, size_t stacksize, bool run_in_scheduler)
: m_id(s_fiber_id++)
, m_cb(cb)
, m_runInScheduler(run_in_scheduler) {//参数列表初始化参数
++s_fiber_count;
m_stacksize = stacksize ? stacksize : g_fiber_stack_size->getValue();
m_stack = StackAllocator::Alloc(m_stacksize);//为协程栈分配内存,返回内存指针
if (getcontext(&m_ctx)) {//保存上下文信息
SYLAR_ASSERT2(false, "getcontext");
}
m_ctx.uc_link = nullptr;//不定义下一个运行的协程
m_ctx.uc_stack.ss_sp = m_stack;//协程栈指针
m_ctx.uc_stack.ss_size = m_stacksize;//栈大小
makecontext(&m_ctx, &Fiber::MainFunc, 0);//修改m_ctx的上下文信息,函数入口,参数为0
SYLAR_LOG_DEBUG(g_logger) << "Fiber::Fiber() id = " << m_id;//打印日志信息
}
4、析构函数
完成了对协程对象的清理工作,包括释放子协程的栈内存和对主协程的特殊处理。同时,通过日志输出和断言等手段确保了协程对象的状态和操作正确性。
Fiber::~Fiber() {
SYLAR_LOG_DEBUG(g_logger) << "Fiber::~Fiber() id = " << m_id;
--s_fiber_count;
if (m_stack) {
// 有栈,说明是子协程,需要确保子协程一定是结束状态
SYLAR_ASSERT(m_state == TERM);
StackAllocator::Dealloc(m_stack, m_stacksize);//释放协程栈内存
SYLAR_LOG_DEBUG(g_logger) << "dealloc stack, id = " << m_id;
} else {
// 没有栈,说明是线程的主协程
SYLAR_ASSERT(!m_cb); // 主协程没有cb
SYLAR_ASSERT(m_state == RUNNING); // 主协程一定是执行状态
Fiber *cur = t_fiber; // 当前协程就是自己
if (cur == this) {
SetThis(nullptr);
}
}
}
5、重置协程状态和入口函数reset
为简化状态管理,强制只有TERM状态的协程才可以重置,但其实刚创建好但没执行过的协程也应该被重置。重置时不用再重新分配内存,而是复用被重置的栈,将协程状态由TERM态转换为READY态。
void Fiber::reset(std::function<void()> cb) {
SYLAR_ASSERT(m_stack);
SYLAR_ASSERT(m_state == TERM);
m_cb = cb;
if (getcontext(&m_ctx)) {
SYLAR_ASSERT2(false, "getcontext");
}
m_ctx.uc_link = nullptr;//复用栈
m_ctx.uc_stack.ss_sp = m_stack;
m_ctx.uc_stack.ss_size = m_stacksize;
makecontext(&m_ctx, &Fiber::MainFunc, 0);
m_state = READY;//改变协程状态
}
6、切换当前协程为运行状态resume
将当前协程与正在运行的协程进行交换,前者状态变为RUNNING,后者状态变为READY。 当前协程非运行态变为运行态,与它交换的是正在运行的协程,而不是主协程。
void Fiber::resume() {
SYLAR_ASSERT(m_state != TERM && m_state != RUNNING);//不为结束态也不为运行态
SetThis(this);//当前协程指针
m_state = RUNNING;
// 如果协程参与调度器调度,那么应该和调度器的主协程进行swap,而不是线程主协程
if (m_runInScheduler) {
if (swapcontext(&(Scheduler::GetMainFiber()->m_ctx), &m_ctx)) {
SYLAR_ASSERT2(false, "swapcontext");
}
} else {
if (swapcontext(&(t_thread_fiber->m_ctx), &m_ctx)) {
SYLAR_ASSERT2(false, "swapcontext");//交换上下文信息,将m_ctx上下文信息保存在主协程上下文信息,将m_ctx上下文信息恢复到cpu中
}
}
}
7、协程让出执行权函数yield
当前协程与resume时退到后台的协程进行交换,前者状态变为READY,后者状态变为RUNNING 。能yield的协程要么是运行态,要么是结束态,若为非结束态,则状态变为READY;与它交换的协程一定是主协程,让当前协程切换到主协程,然后进行上下文信息的交换。
void Fiber::yield() {
/// 协程运行完之后会自动yield一次,用于回到主协程,此时状态已为结束状态
SYLAR_ASSERT(m_state == RUNNING || m_state == TERM);//当前协程
SetThis(t_thread_fiber.get());//获取主协程
if (m_state != TERM) {
m_state = READY;
}
// 如果协程参与调度器调度,那么应该和调度器的主协程进行swap,而不是线程主协程
if (m_runInScheduler) {
if (swapcontext(&m_ctx, &(Scheduler::GetMainFiber()->m_ctx))) {
SYLAR_ASSERT2(false, "swapcontext");
}
} else {
if (swapcontext(&m_ctx, &(t_thread_fiber->m_ctx))) {
SYLAR_ASSERT2(false, "swapcontext");
}
}
}
当前正在运行协程相关函数----局部变量
借助线程局部变量实现协程模块,线程局部变量与全局变量类似,声明的线程局部变量在每个线程都独有一份,而全局变量是全部线程共享一份 。
/// 全局静态变量,用于生成协程id
static std::atomic<uint64_t> s_fiber_id{0};
/// 全局静态变量,用于统计当前的协程数
static std::atomic<uint64_t> s_fiber_count{0};
/// 线程局部变量,当前线程正在运行的协程
static thread_local Fiber *t_fiber = nullptr;
/// 线程局部变量,当前线程的主协程,切换到这个协程,就相当于切换到了主线程中运行,智能指针形式
static thread_local Fiber::ptr t_thread_fiber = nullptr;
//协程栈大小,可通过配置文件获取,默认128k
static ConfigVar<uint32_t>::ptr g_fiber_stack_size =
Config::Lookup<uint32_t>("fiber.stack_size", 128 * 1024, "fiber stack size");
1、设置当前正在运行协程SetThis
即设置线程局部变量t_fiber的值
2、返回当前正在运行协程GetThis()
如果当前线程还未创建协程,则创建线程的第一个协程,且将其设为当前线程的主协程,其它协程都通过这个协程来调度,即其它协程结束时切回该主协程,由主协程重新选择新的协程进行resume。
3、协程入口函数MainFunc
void Fiber::MainFunc() {
Fiber::ptr cur = GetThis(); // GetThis()的shared_from_this()方法让引用计数加1
SYLAR_ASSERT(cur);
cur->m_cb();
cur->m_cb = nullptr;
cur->m_state = TERM;
auto raw_ptr = cur.get(); // 手动让t_fiber的引用计数减1
cur.reset();
raw_ptr->yield();
}
基础知识
协程背景
1、提出原因
线程是进程中的执行体,是程序执行的基本单位。拥有一个执行入口以及从进程虚拟地址空间中分配的栈,包括用户栈和内核栈。操作系统记录线程控制信息,而线程获得CPU时间片以后才可以执行,在执行时CPU栈基、栈指针、指令指针等寄存器都要切换到对应的线程。 如果线程自己又创建几个执行体,给他们各自指定执行入口,申请一些内存分给它们用做执行栈,那么线程就可以按需调度这几个执行体了。为了实现这些执行体的切换,线程也需要记录它们的控制信息,包括ID、栈的位置、执行入口地址、执行现场等等。 线程可以选择一个执行体来执行,此时CPU中指令指针就会指向这个执行体的执行入口,栈基和栈指针寄存器也会指向线程给它分配的执行栈;要切换执行体时需要保存当前执行体的执行现场,然后切换到另一个执行体,通过同样的方式可以恢复到之前的执行体,这样就从上次中断的地方继续执行,这些由线程创建的执行体就是“协程”(coroutine)。 因为用户程序不能操作内核空间,所以只能给协程分配用户栈,而操作系统对协程一无所知,所以协程称为“用户态线程”。 在创建协程时都要指定执行入口,底层都要分配协程执行栈和控制信息,否则无法实现用户态,让出执行权时也要保存执行现场以从中断处恢复执行,协程的思想关键在于控制流的“主动让出”和“恢复”,每个协程用于自己的执行栈,可以保存自己的执行现场,所以可以由用户程序按需创建协程。协程主动让出执行权时会保存执行现场,然后切换到其它协程,恢复执行时会根据保存到现场恢复到之前的状态继续执行,这样通过协程实现既轻量又灵活的由用户态进行调度的多任务模型。
2、协程与IO多路复用
操作系统记录的进程控制信息可以找到打开文件描述符表,进程打开的文件、创建的socket等都会记录到这张表中,socket的所有操作都由操作系统来提供,也就是通过系统调用来完成,每创建一个socket,都要在打开文件描述表中对应增加一条记录,而返回给应用程序的只有一个socket描述符,用于识别不同的socket,而且每个TCP socket在创建时操作系统都会为它创建一个读缓冲区和一个写缓冲区,要获得响应数据就要从读缓冲区拷贝过来,同样的要通过socket发送数据也要先把数据拷贝到写缓冲区才行。但是用户想要读数据时,读缓冲区未必有数据,想发送数据时,写缓冲区也未必有空间。对此的解决办法有:
- 阻塞式IO:让出CPU,进入等待队列,等socket就绪后再次获得时间片就可以执行了,为阻塞式IO;阻塞式IO处理一个socket就要占用一个线程,等当前socket处理完才能接受下一个,这在高并发场景下会加剧调度开销。
- 非阻塞式IO:不让CPU,但是需要频繁检测socket是否就绪,即“忙等待”,很难把握轮询的间隔时间,易造成空耗CPU,加剧响应延迟。
- IO多路复用:由操作系统提供支持,把需要等待的socket加入到操作集合,这样就可以通过一次系统调用同时监听多个socket,有socket就绪了就可以逐个处理了,既不用为等待某个socket被阻塞,也不会陷入忙等待之中。
3、linux的IO多路复用
- select:select支持监听可读、可写、异常三类事件;受限于监听的个数为1024个;且每次调用select都要传递所有监听集合,需要频繁地从用户态到内核态拷贝数据;即便有fd就绪了,也需要遍历整个监听集合,来判断哪个fd是可操作的,影响性能。
int main() {
fd_set readfds;
int max_fd, ret;
FD_ZERO(&readfds);
FD_SET(STDIN_FILENO, &readfds); // 设置要等待的描述符
max_fd = STDIN_FILENO + 1;
struct timeval timeout;
timeout.tv_sec = 5; // 设置等待超时的时间
timeout.tv_usec = 0;
ret = select(max_fd, &readfds, NULL, NULL, &timeout);//等待有事件就绪或超时,select函数就会返回
if (ret == -1) {
perror("select");
exit(EXIT_FAILURE);
} else if (ret == 0) {
printf("Timeout\n");
} else {
if (FD_ISSET(STDIN_FILENO, &readfds)) {
printf("Stdin is ready for reading\n");
// 读取标准输入数据
char buffer[100];
fgets(buffer, sizeof(buffer), stdin);
printf("Received: %s", buffer);
}
}
return 0;
}
- poll:支持的fd数目等于最多可打开的文件描述符个数,但仍存在另外两个问题;
- epoll:提供三个接口epoll_create创建epoll并获得一个句柄,epoll_ctl用于添加或删除fd与对应的事件信息,除了指定fd和要监听事件的类型,还要指定一个evt.data,通常会按需定义一个数据结构用于处理对应的fd。每次都只需要传入要操作的一个fd,无需传入所有监听集合且只需注册这一次。通过epoll_wait得到的fd集合都是已经就绪的,逐个处理即可,无需遍历所有监听集合,通过IO多路复用,线程再也不用为了等待某一个socket而阻塞或空耗CPU,并发和处理能力大幅提升。 存在问题:如一个socket可读,但只读了半条请求,也就是需要再次等待这个socket可读,再继续处理下一个socket之前,需要记录这个socket的处理状态,当这个socket可读时也需要恢复上次保存的现场才能继续处理。在IO多路复用中实现业务逻辑时需要随着事件的等待和就绪频繁地保存和恢复先现场,对于较为复杂的业务逻辑并不适用。 要等待事件时需要保存现场并切换下一个fd,当事件就绪时又需要恢复现场继续处理,非常适用于协程。
- 用协程:即在IO复用位置的事件循环中依然需要逐个处理fd,但处理的过程面向协程调度,若用于监听端口的fd就绪了,就建立连接创建一个新的fd,交给一个协程来负责。协程执行入口就指向业务处理函数入口,业务处理过程中需要等待时就注册IO事件让出,这样执行权就会回到切换该协程的地方继续执行;若其它等待IO事件的fd就绪了,只需要恢复关联的协程即可,协程拥有自己的栈,要保存和恢复现场都很容易实现,这样IO多路复用这一层的事件循环就和具体业务逻辑解耦了。 可以把read、write、connect等可能会发生等待的函数包装一下,在其中实现IO事件注册与主动让出,这样在业务逻辑层面就可以使用这些包装函数,按照常规的顺序编程方式来实现业务逻辑;这些包装函数在需要等待时就会注册IO事件,然后让出协程,这样在实现业务逻辑时就不用关心保存和恢复现场的问题了。 协程与IO多路复用之间的合作,保留了IO 多路复用的高并发性能,解放了业务逻辑的实现。
int main(void)
{
int epfd,nfds;
struct epoll_event ev,events[5]; //ev用于注册事件,数组用于返回要处理的事件
epfd = epoll_create(1); //只需要监听一个描述符——标准输入 创建epoll并获取句柄
ev.data.fd = STDIN_FILENO;
ev.events = EPOLLIN|EPOLLET; //监听读状态同时设置ET模式
epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev); //注册epoll事件
for(;;)
{
nfds = epoll_wait(epfd, events, 5, -1);
//协程==========
for(int i = 0; i < nfds; i++)
{
if(events[i].data.fd==STDIN_FILENO)
printf("welcome to epoll's word!\n");
}
}
}
4、对称协程与非对称协程
协程类似于函数,但是函数一旦被调用就会从头开始执行,直到执行结束才退出;而协程可能会执行一部分就退出,称为yield,此时协程并未结束,会保留当前的执行现场,暂时让出CPU使用权,在之后适当的时机会恢复保存的现场重新从上次退出的位置继续运行,称为resume。在此期间,其它协程也可以获得CPU运行,因此,协程称为轻量级线程。
- 对称协程:协程之间的操作只有一个yield,用于将程序的控制流转移给另外的协程,对称协程机制一般需要一个调用器的支持,按照一定的调度算法去选择yield的目标协程。调度器调用协程,协程进入之后只能有一个操作yield将CPU让回给调度器。字协程可以与字协程切换,每个协程不仅要运行在即的入口函数代码,还需要选出下一个合适的协程进行切换,管理起来较为麻烦。
- 非对称协程:与一个特定的调用者绑定,调用者负责选出下一个运行的协程,协程让出CPU时只能让回给原调用者。非对称表现在程序控制流转移待被调用协程中使用call/resume,而被调协程让出CPU时使用的却是return/yield操作;此外协程间地位也不对等,即调用者与被调用者关系是确定的,非对称协程只能返回最初调用它的协程。
5、协程与线程的区别:
(1)在单线程内,协程不能并发执行,只有当一个协程结束或者yield后,才会执行另一个协程;而线程是可以并发执行的。 (2)线程的并发执行使得它可以使用线程级别的锁来做协程同步,但是协程不能并发执行,就不能使用锁,否则当一个协程在持有锁后让出执行,同线程的其它协程一旦尝试再次持有这个锁,整个线程就会被锁死。 (3)协程的运行和调度都由应用程序完成,单线程下协程的yield和resume一定是同步进行,一个协程的yield必然对应另一个协程的resume,因为线程不能没有执行体;而线程的调度是由操作系统自动完成的。故协程被称为“用户态线程”。
6、协程的上下文切换
寄存器是CPU中用来存放数据的一些小型存储区域,暂存参数运算的数据和运算结果。
ucontext_t
ucontext_t是一个结构体变量,通过定义一个uncontext来保存上下文信息。定义如下:
typedef struct ucontext
{
unsigned long int uc_flags;
struct ucontext *uc_link;//后序上下文,当前上下文结束后,下一个激活的上下文指针对象,只有当当前上下文是由makecontext创建时才有效
__sigset_t uc_sigmask;// 信号屏蔽字掩码
stack_t uc_stack;// 上下文所使用的栈
mcontext_t uc_mcontext;// 保存的上下文的寄存器信息
long int uc_filler[5];
} ucontext_t;
//其中mcontext_t 定义如下
typedef struct
{
gregset_t __ctx(gregs);//所装载寄存器
fpregset_t __ctx(fpregs);//寄存器的类型
} mcontext_t;
//其中stack_t的上下文信息
typedef struct {
void *ss_sp;//栈空间指针,指向当前栈所在位置
int ss_flags;//栈空间的flags
size_t ss_size;//整个栈的大小
} stack_t;
//其中gregset_t 定义如下
typedef greg_t gregset_t[NGREG];//包括了所有的寄存器的信息
getcontext()
函数:int getcontext(ucontext_t* ucp)
初始化并保存当前的上下文到ucp中,如movq %rbx, oRBX(%rdi)
就是将%rbx内存中的信息先备份然后再将内存中的信息传递保存到%rdi(指的就是ucp)中;在多个寄存器中进行类似操作,就将上下文信息和栈顶指针都保存在gregset_t[NGREG]中,而gregset_t就是结构体uc_mcount的成员。ucontext结构体定义了一个mcontext_t表示保存的上写文信息。
setcontext()
函数:int setcontext(const ucontext_t *ucp)
功能:将ucontext_t结构体变量ucp中的上下文信息重新恢复到cpu中运行。
其代码与getcontext中汇编代码类似,但是setcontext是将参数变量中的上下文信息保存在cpu中,即跳到ucp上下文对应的函数中执行,相当于变相调用了函数。如:
movq oRSP(%rdi), %rsp
movq oRBX(%rdi), %rbx
movq oRBP(%rdi), %rbp
movq oR12(%rdi), %r12
movq oR13(%rdi), %r13
movq oR14(%rdi), %r14
movq oR15(%rdi), %r15
根据ucp的创建方式,setcontext的返回方式不同:
- 若有setcontext创建,程序表现为从getcontext开始执行;
- 若由makecontext创建,程序将执行makecontext的传入函数,当函数执行结束后,程序表现为执行setcontext其ucp参数为makecontext的ucp参数。
- 若uc_link指向为0,即空指针,那么当前上下文为主上下文,当返回时,线程直接退出。
makecontext()
函数:void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...)
功能:修改上写文信息,参数ucp是要修改的上下文信息结构体,func是上下文的入口函数,argc是入口函数的参数个数,后面...是具体的函数参数,该参数必须为整型。
在调用makecontext之前必须手动给ucp分配一段内存空间,存储在ucp->uc_stack中,这段内存空间将作func函数运行时的栈空间。
同时也可指定uc_link表示函数运行结束后恢复uc_link指向的上下文。若不赋值,func函数结束时必须调用setcontext或swapcontext以重新指定一个有效的上下文。
makecontent执行完后,ucp就与函数func绑定了,调用setcontext或swapcontext激活ucp时,fun就会被运行。
swapcontext()
函数:int swapcontext(ucontext_t *oucp, ucontext_t *ucp)
功能:将当前ucp中的上下文信息保存到oucp结构体变量%rdi中,然后将ucp的结构体的上下文信息%rsi恢复到cpu中;可以理解为调用两个函数(先调getcontext(oucp)后调用setcontext(ucp));这里第一个参数为%rdi,第二个参数为%rsi。函数不会返回,而是会跳到ucp上下文对应的函数中执行,相当于调用看函数。
blog.csdn.net/qq_44443986…
日志断言ASSERT
assert() 是一个调试程序时经常使用的宏,在程序运行时它计算括号内的表达式,如果表达式为 false (0), 程序将报告错误,并终止执行。如果表达式不为0,则继续执行后面的语句。这个宏通常用来判断程序中是否出现了明显非法的数据,如果出现了终止程序以免导致严重后果,同时也便于查找错误。
- 语法: 包含头文件 <assert.h> 或 ; assert(condition); condition:要判断的条件 返回值:true继续运行,false停止运行。
- 使用原则: (1)使用断言捕捉不应该发生的非法情况。不要混淆非法情况与错误情况之间的区别,后者是必然存在的并且是一定要作出处理的。 (2)使用断言对函数的参数进行确认。 (3)在编写函数时,要进行反复的考查,并且自问:"我打算做哪些假定?"一旦确定了的假定,就要使用断言对假定进行检查。 (4)一般教科书都鼓励程序员们进行防错性的程序设计,但要记住这种编程风格会隐瞒错误。当进行防错性编程时,如果"不可能发生"的事情的确发生了,则要使用断言进行报警。 blog.csdn.net/weixin_4503…
转载自:https://juejin.cn/post/7377567987119521843