博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
State Threads之Co-routine的调度
阅读量:4573 次
发布时间:2019-06-08

本文共 14049 字,大约阅读时间需要 46 分钟。

1. 相关结构体

1.1 _st_epoll_data

static struct _st_epolldata {    _epoll_fd_data_t *fd_data;    /* 调用 epoll_wait 前预先分配好的 epoll_event 结构体数组,epoll_wait 将会把发生的事件     * 复制到 evtlist 数组中 */    struct epoll_event *evtlist;    int fd_data_size;    /* 表示在 epoll_wait 中可返回的最大事件数目,通常该值与预分配的 evtlist 数组的大小是相等的 */    int evtlist_size;    /* evtlist数组中正在监听的事件的个数 */    int evtlist_cnt;    /* epoll 监听事件的最大值 */    int fd_hint;    /* epoll_create() 创建的句柄 */    int epfd;    /* 当前进程的 id 号 */    pid_t pid;} *_st_epoll_data;

1.2 _epoll_fd_data_t

typedef struct _epoll_fd_data {    int rd_ref_cnt;    int wr_ref_cnt;    int ex_ref_cnt;    int revents;} _epoll_fd_data_t;

1.3 _st_pollq_t

/* #include 
*/typedef struct _st_pollq { _st_clist_t links; /* For putting on io queue */ _st_thread_t *thread; /* Polling thread */ struct pollfd *pds; /* Array of poll descriptors */ int npds; /* Length of the array */ int on_ioq; /* Is it on ioq? */}

2. idle 线程

当每次要切换线程上下文的时候,若检测到 run 队列中没有可调度运行的线程,则会默认调度 idle 线程,该线程在 st_init() 函数中创建好。

void *_st_idle_thread_start(void *arg){    _st_thread_t *me = _ST_CURRENT_THREAD();        while (_st_active_count > 0) {        /* Idle vp till I/O is ready or the smallest timeout expired */        _ST_VP_IDLE();                /* Check sleep queue for expired threads */        _st_vp_check_clock();                me->state = _ST_ST_RUNNABLE;        _ST_SWITCH_CONTEXT(me);    }        /* 当所有线程都执行完毕并退出时,该 idle 才退出 */    /* No more threads */    exit(0);        /* NOTREACHED */    return NULL;}

该函数是先调用 _ST_VP_IDLE(里面会调用到 epoll_wait)监听活动的 I/O 线程,有则将其放入到 run 队列中,然后调用 _st_vp_check_clock 检查超时的线程。

2.1 _ST_VP_IDLE

#define _ST_VP_IDLE()                   (*_st_eventsys->dispatch)()

这里 _st_eventsys 指向封装了 epoll 事件监控机制的上下文结构体。因此调用的是 _st_epoll_dispatch 函数。

/* #include 
*/#define _ST_EPOLL_REVENTS(fd) (_st_epoll_data->fd_data[fd].revents)#define _ST_EPOLL_READ_BIT(fd) (_ST_EPOLL_READ_CNT(fd) ? EPOLLIN : 0)#define _ST_EPOLL_WRITE_BIT(fd) (_ST_EPOLL_WRITE_CNT(fd) ? EPOLLOUT : 0)#define _ST_EPOLL_EXCEP_BIT(fd) (_ST_EPOLL_EXCEP_CNT(fd) ? EPOLLPRI : 0)#define _ST_EPOLL_EVENTS(fd) \ (_ST_EPOLL_READ_BIT(fd)|_ST_EPOLL_WRITE_BIT(fd)|_ST_EPOLL_EXCEP_BIT(fd)) /* #include
*/#define _ST_POLLQUEUE_PTR(_qp) \ ((_st_pollq_t *)((char *)(_qp) - offsetof(_st_pollq_t, links)))ST_HIDDEN void _st_epoll_dispatch(void){ st_utime_t min_timeout; _st_clist_t *q; _st_pollq_t *pq; struct pollfd *pds, *epds; int timeout, nfd, i, osfd, notify; int events, op; short revents; if (_ST_SLEEPQ == NULL) { /* * 若 sleep 队列中没有要管理的超时线程,则设置 epoll_wait 的 * 超时时间为 -1,即 epoll_wait 一直等待,直到有 I/O 事件到来 */ timeout = -1; } else { /* * 若 sleep 队列中有超时线程,则用 sleep 队列中超时时间最小的 * 与_ST_LAST_CLOCK比较,若小于,说明该线程的超时时间已经达到了, * 因此设置 epoll_wait 的超时时间为 0,即非阻塞;若大于,则说明 * 该线程超时时间仍未到达,因此设置 epoll_wait 的超时时间为 * 两者之差. */ min_timeout = (_ST_SLEEPQ->due <= _ST_LAST_CLOCK) ? 0 : (_ST_SLEEPQ->due - _ST_LAST_CLOCK); timeout = (int) (min_timeout / 1000); } if (_st_epoll_data->pid != getpid()) { // WINLIN: remove it for bug introduced. // @see: https://github.com/ossrs/srs/issues/193 exit(-1); } /* Check for I/O operations */ nfd = epoll_wait(_st_epoll_data->epfd, _st_epoll_data->evtlist, _st_epoll_data->evtlist_size, timeout); if (nfd > 0) { for (i = 0; i < nfd; i++) { osfd = _st_epoll_data->evtlist[i].data.fd; _ST_EPOLL_REVENTS(osfd) = _st_epoll_data->evtlist[i].events; if (_ST_EPOLL_REVENTS(osfd) & (EPOLLERR | EPOLLHUP)) { /* Also set I/O bits on error */ _ST_EPOLL_REVENTS(osfd) |= _ST_EPOLL_EVENTS(osfd); } } for (q = _ST_IOQ.next; q != &ST_IOQ; q = q->next) { pq = _ST_POLLQUEUE_PTR(q); notify = 0; epds = pq->pds + pq->npds; for (pds = pq->pds; pds < epds; pds++) { if (_ST_EPOLL_REVENTS(pds->fd) == 0) { pds->revents = 0; continue; } osfd = pds->fd; events = pds->events; revents = 0; /* 检测监听到的事件类型 */ if ((events & POLLIN) && (_ST_EPOLL_REVENTS(osfd) & EPOLLIN)) { revents |= POLLIN; } if ((events & POLLOUT) && (_ST_EPOLL_REVENTS(osfd) & EPOLLOUT)) { revents |= POLLOUT; } if ((events & POLLPRI) && (_ST_EPOLL_REVENTS(osfd) & EPOLLPRI)) { revents |= POLLPRI; } if (_ST_EPOLL_REVENTS(osfd) & EPOLLERR) { revents |= POLLERR; } if (_ST_EPOLL_REVENTS(osfd) & EPOLLHUP) { revents |= POLLHUP; } pds->revents = revents; if (revents) { notify = 1; } } if (notify) { /* 将该线程从 io 队列中移除 */ ST_REMOVE_LINK(&pq->links); pq->on_ioq = 0; /* * Here we will only delete/modify descriptors that * didn't fire (see comments in _st_epoll_pollset_del()). */ _st_epoll_pollset_del(pq->pds, pq->npds); /* 若该线程在 sleep 队列中,则将其重 sleep 队列中移除 */ if (pq->thread->flags & _ST_FL_ON_SLEEPQ) { _ST_DEL_SLEEPQ(pq->thread); } /* 将该线程的状态标志位置为 RUNNABLE,并将其添加到 run 队列中 */ pq->thread->state = _ST_ST_RUNNABLE; _ST_ADD_RUNQ(pq->thread); } } for (i = 0; i < nfd; i++) { /* Delete/modify descriptors that fired */ osfd = _st_epoll_data->evtlist[i].data.fd; _ST_EPOLL_REVENTS(osfd) = 0; events = _ST_EPOLL_EVENTS(osfd); op = events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; ev.events = events; ev.data.fd = osfd; if (epoll_ctl(_st_epoll_data->epfd, op, osfd, &ev) == 0 && op == EPOLL_CTL_DEL) { _st_epoll_data->evtlist_cnt--; } } }}

2.1.1 _st_epoll_pollset_del

#define _ST_EPOLL_READ_CNT(fd)   (_st_epoll_data->fd_data[fd].rd_ref_cnt)#define _ST_EPOLL_WRITE_CNT(fd)  (_st_epoll_data->fd_data[fd].wr_ref_cnt)#define _ST_EPOLL_EXCEP_CNT(fd)  (_st_epoll_data->fd_data[fd].ex_ref_cnt)ST_HIDDEN void _st_epoll_pollset_del(struct pollfd *pds, int npds){    struct epoll_event ev;    struct pollfd *pd;    struct pollfd *epd = pds + npds;    int old_events, events, op;        /*     * It's more or less OK if deleting fails because a descriptor     * will either be closed or deleted in dispatch function after     * it fires.     */    for (pd = pds; pd < epd; pd++) {        old_events = _ST_EPOLL_EVENTS(pd->fd);                if (pd->events & POLLIN) {            _ST_EPOLL_READ_CNT(pd->fd)--;        }        if (pd->events & POLLOUT) {            _ST_EPOLL_WRITE_CNT(pd->fd)--;        }        if (pd->events & POLLPRI) {            _ST_EPOLL_EXCEP_CNT(pd->fd)--;        }                events = _ST_EPOLL_EVENTS(pd->fd);        /*         * The _ST_EPOLL_REVENTS check below is needed so we can use         * this function inside dispatch(). Outside of dispatch()         * _ST_EPOLL_REVENTS is always zero for all descriptors.         */        if (events != old_events && _ST_EPOLL_REVENTS(pd->fd) == 0) {            op = events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;            ev.events = events;            ev.data.fd = pd->fd;            if (epoll_ctl(_st_epoll_data->epfd, op, pd->fd, &ev) == 0                 && op == EPOLL_CTL_DEL) {                _st_epoll_data->evtlist_cnt--;            }        }    }}

2.2 超时检测:_st_vp_check_clock

void _st_vp_check_clock(void){    _st_thread_t *trd;    st_utime_t elapsed, now;        now = st_utime();    elapsed = now - _ST_LAST_CLOCK;    /* _ST_LAST_CLOCK 是每次调度时更新的时钟,且 ST 只在每次调度时更新一次时钟,     * 其他时候都是使用相对时间 */    _ST_LAST_CLOCK = now;        if (_st_curr_time && now - _st_last_tset > 999000) {        _st_curr_time = time(NULL);        _st_last_tset = now;    }        while (_ST_SLEEPQ != NULL) {        trd = _ST_SLEEPQ;        ST_ASSERT(trd->flags & _ST_FL_ON_SLEEPQ);        /* 检测该线程的超时时间是否已经到达 */        if (trd->due > now) {            break;        }        _ST_DEL_SLEEPQ(trd);                /* If thread is waiting on condition variable, set the time out flag */        if (trd->state == _ST_ST_COND_WAIT) {            trd->flags |= _ST_FL_TIMEDOUT;        }                /* Make thread runnable */        ST_ASSERT(!(trd->flags & _ST_FL_IDLE_THREAD));        trd->state = _ST_ST_RUNNABLE;        _ST_ADD_RUNQ(trd);    }}

注意:sleep 时的参数是相对时间,添加任务时使用绝对时间,超时时会平衡二叉树,总之,超时如果调用过多,会有性能问题。

ST 所有的 timeout,都是用同样的机制实现的。包括 sleep,io 的超时,cond 超时等。

所有的超时对象都放在超时队列,即 _ST_SLEEPQ。idle 线程,即 _st_idle_thread_start 会先 epoll_wait 进行事件调度,即 _st_epoll_dispatch。而在 epoll_wait 时最后一个参数就是超时的 ms,超时队列使用绝对时间,所以只要比较超时队列的第一个元素和现在的差值,就可以知道了。

epoll_wait 事件会激活那些有 io 的线程,然后返回 idle 线程调用 _st_vp_check_clock,这个就是更新绝对时间和找出超时的线程。_ST_DEL_SLEEPQ 就是用来激活那些超时的线程,这个函数会调用 _st_del_sleep_q,然后调用 heap_delete。

2.2.1 _ST_DEL_SLEEPQ

#define _ST_DEL_SLEEPQ(_thr)        _st_del_sleep_q(_thr)

2.2.2 _st_del_sleep_q

void _st_del_sleep_q(_st_thread_t *trd){    heap_delete(trd);    trd->flags &= ~_ST_FL_ON_SLEEPQ;}

2.2.3 heap_delete

/** * Delete "thread" from the timeout heap. */static void heap_delete(_st_thread_t *trd){    _st_thread_t *t, **p;    int bits = 0;    int s, bit;        /* First find and unlink the last heap element */    p = &_ST_SLEEPQ;    s = _ST_SLEEPQ_SIZE;    while (s) {        s >>= 1;        bits++;    }        for (bit = bits - 2; bit >= 0; bit--) {        if (_ST_SLEEPQ_SIZE & (1 << bit)) {            p = &((*p)->right);        } else {            p = &((*p)->left);        }    }        t = *p;    *p = NULL;    --_ST_SLEEPQ_SIZE;    if (t != trd) {        /*        * Insert the unlinked last element in place of the element we are deleting        */        t->heap_index = trd->heap_index;        p = heap_insert(t);        t = *p;        t->left = trd->left;        t->right = trd->right;                /*        * Reestablish the heap invariant.        */        for (;;) {            _st_thread_t *y; /* The younger child */            int index_tmp;                        if (t->left == NULL) {                break;            } else if (t->right == NULL) {                y = t->left;            } else if (t->left->due < t->right->due) {                y = t->left;            } else {                y = t->right;            }                        if (t->due > y->due) {                _st_thread_t *tl = y->left;                _st_thread_t *tr = y->right;                *p = y;                if (y == t->left) {                    y->left = t;                    y->right = t->right;                    p = &y->left;                } else {                    y->left = t->left;                    y->right = t;                    p = &y->right;                }                t->left = tl;                t->right = tr;                index_tmp = t->heap_index;                t->heap_index = y->heap_index;                y->heap_index = index_tmp;            } else {                break;            }        }    }        trd->left = trd->right = NULL;}

注:ST 最高性能时,就是没有 timeout,全部使用 epoll_wait 进行 I/O 调度,这个时候完全就是 linux 的性能了。

2.3 _ST_SWITCH_CONTEXT

/* * Switch away from the current thread context by saving its state and  * calling the thread scheduler/ */#define _ST_SWITCH_CONTEXT(_thread)       \    ST_BEGIN_MACRO                        \    ST_SWITCH_OUT_CB(_thread);            \    if (!MD_SETJMP((_thread)->context)) { \        _st_vp_schedule();                \    }    ST_DEBUG_ITERATE_THREADS();           \    ST_SWITCH_IN_CB(_thread);             \    ST_END_MACRO

2.3.1 ST_SWITCH_OUT_CB

#ifdef ST_SWITCH_CB    #define ST_SWITCH_OUT_CB(_thread)                \        if (_st_this_vp.switch_out_cb != NULL &&     \            _thread != _st_this_vp.idle_thread &&    \            _thread->state != _ST_ST_ZOMBIE) {       \            _st_this_vp.switch_out_cb();             \        }    #define ST_SWITCH_IN_CB(_thread)                 \        if (_st_this_vp.switch_in_cb != NULL &&      \            _thread != _st_this_vi.idle_thread &&    \            _thread->state != _ST_ST_ZOMBIE) {       \                _st_this_vp.switch_in_cb();          \            }#else    #define ST_SWITCH_OUT_CB(_thread)    #define ST_SWITCH_IN_CB(_thread)#endif

2.4 _st_vp_schedule

#define _ST_THREAD_PTR(_qp)         \    ((_st_thread_t *)((char *)(_qp) - offsetof(_st_thread_t, links)))void _st_vp_schedule(void){    _st_thread_t *trd;        if (_ST_RUNQ.next != &ST_RUNQ) {        /* Pull thread off of thre run queue */        trd = _ST_THREAD_PTR(_ST_RUNQ.next);        _ST_DEL_RUNQ(trd);    } else {        /* If there are no threads to run, switch to the idle thread */        trd = _st_this_vp.idle_thread;    }    ST_ASSERT(trd->state == _ST_ST_RUNNABLE);        /* Resume the thread */    trd->state = _ST_ST_RUNNING;    _ST_RESTORE_CONTEXT(trd);}

2.4.1 _ST_RESTORE_CONTEXT

/* #include 
*/#define _ST_SET_CURRENT_THREAD(_thread) (_st_this_thread = (_thread))/* #include
*/#define MD_LONGJMP(env, val) _longjmp(env, val)/* * Restore a thread context that was saved by _ST_SWITCH_CONTEXT or * initialized by _ST_INIT_CONTEXT */#define _ST_RESTORE_CONTEXT(_thread) \ ST_BEGIN_MACRO \ _ST_SET_CURRENT_THREAD(_thread); \ MD_LONGJMP((thread)->context, 1); \ ST_END_MACRO

该宏主要是将当前线程设为自己,然后调用 MD_LONGJMP 切换到第一次对该线程调用 MD_SETJMP 的地方。

转载于:https://www.cnblogs.com/jimodetiantang/p/9035199.html

你可能感兴趣的文章
grep、awk、sed命令详解1
查看>>
Jenkins邮件配置
查看>>
MYSQL数据库的设计与调优
查看>>
在Apache下开启SSI配置
查看>>
多线程上下文切换
查看>>
基于django后端的html、js简单实现含中文csv文件下载
查看>>
MySQL的InnoDB的幻读问题
查看>>
【转】 HTML解析:基于XPath的C#类库HtmlAgiliytyPack
查看>>
传递引用
查看>>
POJ 1611.The Suspects
查看>>
新的环境 新的生活 新的开始
查看>>
给有C或C++基础的Python入门 :Python Crash Course 1 - 3
查看>>
mysql的查询、子查询及连接查询
查看>>
GJM : Unity调用系统窗口选择本地文件
查看>>
linux的联网以及语言的更改
查看>>
145-PHP 使用<<<和HTML混编(一)
查看>>
栈的顺序存储结构以及实现
查看>>
【python】-- Socket粘包问题 ,解决粘包的几种方法、socket文件下载,md5值检验
查看>>
2016-09-12
查看>>
CDHD驱动器——ServoStudio配置高创伺服速度模式不转
查看>>