epoll是Linux引以为荣的技术,因为相对于select和poll有很大的效能改进。本文主要介绍epoll的实现原理,了解epoll高效背后的魔法。
epoll的使用简介
1. epoll_create使用epoll时需要使用epoll_create()建立一个epoll的档案控制代码,epoll_create()函式的原型如下:
intepoll_create(int size);
此界面用于建立一个epoll的控制代码,size用来告诉核心这个监听的数目一共有多大。
2. epoll_ctl
使用epoll_ctl()可以向epoll控制代码新增或者删除要监听的档案控制代码。epoll_ctl()函式原型如下:
intepoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
要监听档案是否可读写时先要向epoll控制代码注册要监听的事件型别。第一个引数是epoll_create()返回的epoll控制代码,第二个引数表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中。EPOLL_CTL_MOD:修改已经注册的fd的监听事件。EPOLL_CTL_DEL:从epfd中删除一个fd。第三个引数是需要监听的档案控制代码,第四个引数是告诉核心需要监听什么事。
3. epoll_wait
万事俱备,现在只需要呼叫epoll_wait()函式就可以开始等待事件发生了。epoll_wailt()函式的原型如下:
intepoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
引数events用来从核心得到事件的集合,maxevents告之核心这个events有多大,这个 maxevents的值不能大于建立epoll_create()时的size,引数timeout是超时时间(毫秒,0会立即返回,-1是永久阻塞)。该函式返回需要处理的事件数目,如返回0表示已超时。
epoll实现原理
前面介绍了epoll的使用,接下来主要介绍epoll在核心的实现原理。当我们在使用者态呼叫epoll_create()时,会触发呼叫核心的sys_epoll_create()。我们先来看看sys_epoll_create()这个函式:
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error, fd;
structeventpoll *ep = NULL;
structfile *file;
...
error = ep_alloc(&ep);
...
fd = get_unused_fd_flags(O_RDWR| (flags & O_CLOEXEC));
...
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags &O_CLOEXEC));
...
fd_install(fd,file);
ep->file = file;
return fd;
}
这个函式主要呼叫ep_alloc()申请一个eventpoll结构,eventpoll结构是epoll的核心资料结构,我们来看看这个结构的定义:
structeventpoll {
spinlock_t lock;
structmutexmtx;
wait_queue_head_t wq;
wait_queue_head_t poll_wait;
structlist_headrdllist;
structrb_rootrbr;
structepitem *ovflist;
structuser_struct *user;
structfile *file;
intvisited;
structlist_headvisited_list_link;
};
eventpoll结构有三个字段是比较重要的,分别是:wq、rdllist和rbr。
wq用于储存有哪些程序在等待这个epoll返回。rdllist用于储存可读写的档案。rbr用于建立一个快速查询档案控制代码的红黑树。 建立完eventpoll结构后,sys_epoll_create()会呼叫get_unused_fd_flags()获取一个空闲的档案控制代码fd,接着调anon_inode_getfile()获取一个空闲的file结构,并且把eventpoll结构与file结构系结。最后呼叫fd_install()把档案控制代码fd与file结构系结,返回档案控制代码fd。通过一系列的操作后,核心就可以通过档案控制代码fd与eventpoll结构进行关联。
根据epoll的使用流程,使用epoll_create()建立epoll控制代码后,可以通过epoll_ctl()函式向epoll控制代码新增和删除要监视的档案控制代码。呼叫epoll_ctl()会触发核心呼叫sys_epoll_ctl()函式,我们来看看sys_epoll_ctl()函式的最重要部分:
SYSCALL_DEFINE4(epoll_ctl,int, epfd, int, op, int, fd,
struct epoll_event __user *,event)
{
...
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds,tfile, fd);
} else
error = -EEXIST;
clear_tfile_check_list();
break;
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi,&epds);
} else
error = -ENOENT;
break;
}
...
return error;
}
sys_epoll_ctl()会根据我们传递的op引数来进行不同的操作,我们主要看看op为EPOLL_CTL_ADD的操作,也就是新增操作。当进行新增操作时,sys_epoll_ctl()最终会呼叫ep_insert()把档案控制代码fd新增到eventpoll结构维护的红黑树中,ep_insert()程式码如下:
static int ep_insert(struct eventpoll *ep, struct epoll_event*event,
struct file *tfile, int fd)
{
int error, revents, pwake =0;
unsignedlong flags;
structepitem *epi;
structep_pqueue epq;
...
if (!(epi =kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;
...
init_poll_funcptr(&epq.pt,ep_ptable_queue_proc);
revents = tfile->f_op->poll(tfile,&epq.pt);
...
ep_rbtree_insert(ep, epi);
...
return 0;
}
ep_insert()函式首先建立一个epitem结构用于管理事件,然后呼叫档案控制代码的poll()界面,根据init_poll_funcptr(&epq.pt,ep_ptable_queue_proc)这行程式码的设定,poll()界面最终会呼叫ep_ptable_queue_proc()函式。ep_ptable_queue_proc()函式程式码如下:
static void ep_ptable_queue_proc(struct file *file,wait_queue_head_t *whead, poll_table*pt)
{
structepitem *epi = ep_item_from_epqueue(pt);
structeppoll_entry *pwq;
if (epi->nwait >= 0
&& (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL)))
{
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
epi->nwait = -1;
}
}
ep_ptable_queue_proc()函式的作用就是把epitem结构新增到档案的等待伫列中,根据上面的程式码可知,当等待伫列被唤醒的时候,将会呼叫ep_poll_callback()函式。而ep_poll_callback()函式的作用就是把epitem结构放置到eventpoll结构的rdllist伫列中。我们前面分析过,rdllist就是就绪的档案伫列。ep_poll_callback()函式最终会呼叫wake_up_locked(&ep->wq)唤醒程序。简化后的程式码如下:
static int ep_poll_callback(wait_queue_t *wait,
unsigned mode, int sync, void *key)
{
...
// 把epitem新增到rdllist伫列中
if(!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink,&ep->rdllist);
// 唤醒程序
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
...
return 1;
}
ep_insert()函式最后一个操作就是呼叫ep_rbtree_insert()把epitem结构新增的eventpoll结构的红黑树中。如下图:

使用红黑树管理epitem结构的目的是可以根据档案控制代码fd快速查询到对应的epitem结构。红黑树是一棵平衡二叉树,时间复杂度为O(logN)。
新增档案控制代码到epoll之后,就可以呼叫epoll_wait()函式开始监听档案。epoll_wait()会呼叫核心的sys_epoll_wait()函式,而sys_epoll_wait()最终会呼叫ep_poll()函式,程式码如下:
static int ep_poll(struct eventpoll *ep, struct epoll_event __user*events,
int maxevents, long timeout)
{
...
if (list_empty(&ep->rdllist)) {
init_waitqueue_entry(&wait,current);
wait.flags |= WQ_FLAG_EXCLUSIVE;
__add_wait_queue(&ep->wq,&wait);
for (;;) {
set_current_state(TASK_INTERRUPTIBLE);
if (!list_empty(&ep->rdllist)|| !jtimeout)
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock, flags);
jtimeout = schedule_timeout(jtimeout);
spin_lock_irqsave(&ep->lock,flags);
}
__remove_wait_queue(&ep->wq,&wait);
set_current_state(TASK_RUNNING);
}
...
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents))&& jtimeout)
goto retry;
return res;
}
ep_poll()函式所做的事情很简单,就是把当前程序设定为可中断睡眠状态,然后新增eventpoll结构的等待伫列中,最后呼叫schedule_timeout()让出CPU。这样当前程序就会进入睡眠状态,当程序醒来的时候会判断eventpoll结构的rdllist伫列是否为空,然后不为空就呼叫ep_send_events()函式把可读写的档案拷贝到使用者态的events阵列中。
那么什么时候当前程序会被唤醒呢?在分析ep_insert()函式的时候,我们提及过当档案状态发生改变时会呼叫ep_poll_callback()函式,而ep_poll_callback()函式会把就绪的档案新增到rdllist伫列,并且就会把当前程序唤醒。
总结
本文主要分析了epoll的实现原理,可以知道,epoll并不会对所有档案进行扫描(而select和poll会对所以档案进行扫描),而是使用事件的方式把就绪的档案收集起来,所以epoll的效率非常高。当然本文还有一些epoll的细节并没有介绍到,例如水平触发和边缘触发等,有兴趣可以自己研究程式码。





























