Kqueue和其他的多路復(fù)用IO的核心是,單消費(fèi)者同時監(jiān)聽不同種類的生產(chǎn)者,從而提供高性能的單線程IO,減少調(diào)度開銷。而Kqueue通過在內(nèi)核態(tài)維持狀態(tài)提供了更高的性能。
生產(chǎn)者消費(fèi)者模型
單Producer和單Consumer
生產(chǎn)者/消費(fèi)者模型是常見的通信模型,通過共享內(nèi)核緩沖區(qū)環(huán)形隊列,實(shí)現(xiàn)異步的事件通知。雙方只關(guān)注緩沖區(qū)內(nèi)的數(shù)據(jù),而不關(guān)注彼此,因此常常被用于網(wǎng)絡(luò)通信。
信號量
為了避免消費(fèi)者在緩存區(qū)未滿時無意義的輪詢,消費(fèi)者block直到生產(chǎn)者通知。wait時線程設(shè)置信號量并且block,notify時內(nèi)核通知所有等待信號的線程狀態(tài)改為RUNNABLE。
事實(shí)上就是linux的pthread_cond_wait和phread_cond_signal原語。consumer之所以要帶鎖wait,是因為在內(nèi)部進(jìn)行調(diào)度yield_wait前要放掉鎖,否則其他線程無法進(jìn)入臨界區(qū);喚醒之后重新獲得鎖。(這里指的鎖是外部事務(wù)的鎖)
wait和notify需要增加鎖,防止notify先于wait進(jìn)行。(這里的鎖指的是內(nèi)部事務(wù)的鎖)
wait調(diào)用的yield_wait在調(diào)度時需要臨時釋放并隨后獲取內(nèi)部事務(wù)鎖,否則會阻塞其他的notify造成全員block。
send(bb, msg):
acquire(bb.lock)
while True:
if bb.in - bb.out < N:
bb.buf[bb.in mod N] <- msg
bb.in <- bb.in + 1
release(bb.lock)
notify(bb.not_empty)
return
wait(bb.not_full, bb.block)
receive(bb):
acquire(bb.lock)
while True:
if bb.in > bb.out:
msg <- bb.buf[bb.out mod N]
bb.out <- bb.out + 1
release(bb.lock)
wait(bb.not_full)
return
wait(bb.not_empty, bb.block)
Eventcount & Sequencer
這是1979年提出的算法,作為信號量的可替換實(shí)現(xiàn)。Sequencer的目的是處理多producer。
semaphores
send(Buffer& buffer,Message msg) {
t=TICKET(T);
AWAIT(buffer.in, t);
AWAIT(buffer.out, READ(buffer.in)-N);
buffer[READ(buffer.in)%N]=msg;
ADVANCE(in);
}
receive(Buffer& buffer) {
AWAIT(buffer.in, READ(buffer.out));
msg = buffer[READ(buffer.out)%N];
ADVANCE(buffer.out);
return msg;
}
- AWAIT(event*,val) - 比較event.count和val,如果大于則返回,否則存入線程TCB并yield
- ADVANCE(event*) - 自增event.count并將所有同event且event.count>val的線程喚醒
- TICKET(sequencer*) - 原子性自增序號,目的是處理并發(fā)的sender
- READ(event*) - 原子性讀event.count,因為可能讀操作涉及多memory cell
send等待in超過ticket,相當(dāng)于拿排隊鎖輪到自己。然后等待緩存區(qū)未滿時寫入數(shù)據(jù)。
receive等待緩沖區(qū)存在數(shù)據(jù)時讀取數(shù)據(jù)。
Kqueue
https://people.freebsd.org/~jlemon/papers/kqueue.pdf
問題在于,上面提到的做法本質(zhì)上都是監(jiān)聽著一個事件,如果我們想要處理多個監(jiān)聽事件,操作系統(tǒng)必須提供新的原語,例如每個socket都對應(yīng)著一個file descriptor,需要同時監(jiān)聽所有socket的事件。BSD的Kqueue和Linux的epoll都是解決這種問題的方式,本質(zhì)上它們就是IPC,但是單純從IO的角度看叫做多路復(fù)用IO。目前epoll用于netty的底層,是單線程實(shí)現(xiàn)高性能網(wǎng)絡(luò)的關(guān)鍵。
傳統(tǒng)的select和poll僅僅適用于file descriptor,但是無法關(guān)注其他IPC機(jī)制,例如信號、文件系統(tǒng)變化、異步IO完成、進(jìn)程存在;并且也不具備scalability。
第一個問題在于參數(shù)傳遞,每次都必須傳遞整個事件組,并且動態(tài)在內(nèi)核中創(chuàng)建和銷毀內(nèi)存。第二個問題在于內(nèi)核必須遍歷整個fd列表去找活躍的fd。初始遍歷一次確定沒有active的fd才能沉睡,如果沒有active還要再遍歷一次設(shè)定回調(diào)來喚醒,最后喚醒時還要再遍歷一次來看是哪個fd喚醒了。
問題出在這個syscall無狀態(tài)上,無法利用之前的信息,每次都得重新計算。因此Kqueue的機(jī)制就在于內(nèi)核中維持一個隊列儲存狀態(tài)。
int
kqueue(void);
int
kevent(int kq,const struct kevent *changelist, int nchanges,
struct kevent *eventlist, int nevents,
const struct timespec *timeout);
struct kevent{
uintpt t ident; // 事件關(guān)注對象的ID,kq,ident,filter確定唯一的event
// 事件類型,ident,fflags,data應(yīng)該如何被解釋?
u short flags; // 輸入: 增加/減少,使能/禁止, 執(zhí)行后重置/刪除;輸出: 發(fā)生EOF或者ERROR
u int fflags; // 活躍時應(yīng)該怎么做,是否返回event?
intptr t data; // filter和fflags規(guī)定的數(shù)據(jù)傳輸方式
void *udata; // 自定義的數(shù)據(jù)傳輸方式
__uint64_t ext[4]; //在末尾增加的額外信息Hint
}
EV_SET(&kev, ident, filter, flags, fflags, data, udata);
kevent()用于創(chuàng)建kqueue并且返回對應(yīng)的capability(權(quán)限控制的抽象)。
kevent()用于注冊event,并設(shè)定超時,changelist是指kqueue注冊的event如何變化,eventlist則是返回的event。當(dāng)event觸發(fā)時,會調(diào)用內(nèi)核的回調(diào)函數(shù),通知進(jìn)程。
filter
- EVFILT READ :poll近似的實(shí)現(xiàn),當(dāng)socket_buffer大于SO_LOWAT時觸發(fā)將size寫入data或者斷連時觸發(fā)EOF,幫助應(yīng)用處理數(shù)據(jù)。
- EVFILT WRITE: 類似READ
- EVFILT AIO: aio_read/write請求后通過事件進(jìn)行aio_error輪詢,事件返回后aio_return
- EVFILT SIGNAL: id為信號值,返回data為信號計數(shù),通知后clear
- EVFILT VNODE: 監(jiān)聽文件系統(tǒng)vnode,id為fd, fflags監(jiān)聽下列事件并返回所有發(fā)生事件
NOTE DELETE
NOTE WRITE
NOTE EXTEND
NOTE ATTRIB
NOTE LINK
NOTE RENAME
- EVFILT PROC:監(jiān)聽進(jìn)程狀態(tài),id為PID,fflags監(jiān)聽下列事件
NOTE EXIT/FORK/EXEC 監(jiān)聽exit,fork,execve等原語
NOTE TRACK 若父進(jìn)程設(shè)定為Track則fork后子進(jìn)程為CHILD
輸出:
NOTE CHILD 子進(jìn)程fork后設(shè)定child,并且父進(jìn)程id存入data
NOTE TRACKERR 無法添加子進(jìn)程事件,通常因為資源限制
sample
handle_events()
{
int i, n;
struct timespec timeout =
{ TMOUT_SEC, TMOUT_NSEC };
n = kevent(kq, ch, nchanges,
evi, nevents, &timeout);
if (n <= 0)
goto error_or_timeout;
for (i = 0; i < n; i++) {
if (evi.flags & EV_ERROR)
/* error */
if (evi.filter == EVFILT_READ)
readable_fd(evi.ident);
if (evi.filter == EVFILT_WRITE)
writeable_fd(evi.ident);
}
...
}
update_fd(int fd, int action,int filter)
{
EV_SET(&chnchanges, fd, filter,action == ADD ?
EV_ADD : EV_DELETE,
0, 0, 0);
nchanges++;
}
Kqueue實(shí)現(xiàn)
Knote
- 計算當(dāng)前節(jié)點(diǎn)的活躍度
- 鏈接其他knote
- 存儲自己所在的Kqueue的指針
struct knote {
SLIST_ENTRY(knote) kn_link; /* for kq */
SLIST_ENTRY(knote) kn_selnext; /* for struct selinfo */
struct knlist *kn_knlist; /* f_attach populated */
TAILQ_ENTRY(knote) kn_tqe;
struct kqueue *kn_kq; /* which queue we are on */
struct kevent kn_kevent;
void *kn_hook;
int kn_hookid;
int kn_status; /* protected by kq lock */
#define KN_ACTIVE 0x01 /* event has been triggered */
#define KN_QUEUED 0x02 /* event is on queue */
#define KN_DISABLED 0x04 /* event is disabled */
#define KN_DETACHED 0x08 /* knote is detached */
#define KN_MARKER 0x20 /* ignore this knote */
#define KN_KQUEUE 0x40 /* this knote belongs to a kq */
#define KN_SCAN 0x100 /* flux set in kqueue_scan() */
int kn_influx;
int kn_sfflags; /* saved filter flags */
int64_t kn_sdata; /* saved data field */
union {
struct file *p_fp; /* file data pointer */
struct proc *p_proc; /* proc pointer */
struct kaiocb *p_aio; /* AIO job pointer */
struct aioliojob *p_lio; /* LIO job pointer */
void *p_v; /* generic other pointer */
} kn_ptr;
struct filterops *kn_fop;
#define kn_id kn_kevent.ident
#define kn_filter kn_kevent.filter
#define kn_flags kn_kevent.flags
#define kn_fflags kn_kevent.fflags
#define kn_data kn_kevent.data
#define kn_fp kn_ptr.p_fp
};
Kqueue
- kp_knlist存所有knode用于GC
- kp_head存存儲所有標(biāo)記為active的knode
- kq_knhash存儲iden->descriptor的映射
- kq_fdp fd索引的數(shù)組(同open file table)用于關(guān)閉fd時刪除對應(yīng)的knode
struct kqueue {
struct mtx kq_lock;
int kq_refcnt;
TAILQ_ENTRY(kqueue) kq_list;
TAILQ_HEAD(, knote) kq_head; /* list of pending event */
int kq_count; /* number of pending events */
struct selinfo kq_sel;
struct sigio *kq_sigio;
struct filedesc *kq_fdp;
int kq_state;
#define KQ_SEL 0x01
#define KQ_SLEEP 0x02
#define KQ_FLUXWAIT 0x04 /* waiting for a in flux kn */
#define KQ_ASYNC 0x08
#define KQ_CLOSING 0x10
#define KQ_TASKSCHED 0x20 /* task scheduled */
#define KQ_TASKDRAIN 0x40 /* waiting for task to drain */
int kq_knlistsize; /* size of knlist */
struct klist *kq_knlist; /* list of knotes */
u_long kq_knhashmask; /* size of knhash */
struct klist *kq_knhash; /* hash table for knotes */
struct task kq_task;
struct ucred *kq_cred;
};
Registration
kqueue
kqueue本身作為文件抽象看待,在OFT里注冊entry創(chuàng)建內(nèi)核對象并賦予descriptor索引。hash和內(nèi)部的array并不分配。
kevent
int
kevent(int kq, const struct kevent *changelist, int nchanges,
struct kevent *eventlist, int nevents, const struct timespec *timeout)
{
return (((int (*)(int, const struct kevent *, int,
struct kevent *, int, const struct timespec *))
__libc_interposing[INTERPOS_kevent])(kq, changelist, nchanges,
eventlist, nevents, timeout));
}
這里調(diào)用了kqueue_register來對changeList進(jìn)行注冊。首先根據(jù)線程和fd獲取文件的FCB,kq對于fp引用計數(shù)++,然后調(diào)用實(shí)際的注冊函數(shù)。注冊的代碼太長了,大體就是先根據(jù)<Iden,filter>尋找knote節(jié)點(diǎn),找不到如果是EV_ADD則增加knote,否則把事件增加到knote上去。
int
kqfd_register(int fd, struct kevent *kev, struct thread *td, int mflag)
{
struct kqueue *kq;
struct file *fp;
cap_rights_t rights;
int error;
error = fget(td, fd, cap_rights_init(&rights, CAP_KQUEUE_CHANGE), &fp);
if (error != 0)
return (error);
if ((error = kqueue_acquire(fp, &kq)) != 0)
goto noacquire;
error = kqueue_register(kq, kev, td, mflag);
kqueue_release(kq, 0);
noacquire:
fdrop(fp, td);
return (error);
}
Filter
filter的作用就是對于事件源進(jìn)行過濾,事件源所有的活動都會調(diào)用filter,但是只有符合filter規(guī)則的事件才會報告給應(yīng)用,也就是返回布爾值,同時他也會修改fflags和data產(chǎn)生副作用(上面提到的輸出參數(shù))。filter封裝了事件,kqueue只能詢問他是否活躍,而對事件的細(xì)節(jié)一無所知。因此只需要增加filter,就能拓展事件的內(nèi)容。
Activity
在所有觸發(fā)這些活動的地方插入hook函數(shù),調(diào)用knote()函數(shù)遍歷自己維護(hù)的klist(注冊的時候維護(hù)的),調(diào)用filter。
如果事件觸發(fā)則激活,通過knote找到其所屬的kqueue,并且將knote加入kqueue的active鏈末尾。如果已經(jīng)在了,那么不用增加knote,但是filter還是會記錄activity(e.g.上文提到的副作用)。
這里有些special case,例如fork需要看是不是TRACK,來判斷是否報告子節(jié)點(diǎn)的PID
Additionally, for each knote attached to the parent, check whether user wants to track the new process. If so, attach a new knote to it, and immediately report an event with the child's pid.
首先,激活父進(jìn)程的knote,然后創(chuàng)建新的knote分配給子節(jié)點(diǎn),并且設(shè)置CHILD flag和對應(yīng)的父進(jìn)程PID。同時這里還提到了可能存在事件可能改變data,因此為EXIT額外分配一個節(jié)點(diǎn)。
/*
* Activate existing knote and register tracking knotes with
* new process.
*
* First register a knote to get just the child notice. This
* must be a separate note from a potential NOTE_EXIT
* notification since both NOTE_CHILD and NOTE_EXIT are defined
* to use the data field (in conflicting ways).
*/
kev.ident = pid;
kev.filter = kn->kn_filter;
kev.flags = kn->kn_flags | EV_ADD | EV_ENABLE | EV_ONESHOT |
EV_FLAG2;
kev.fflags = kn->kn_sfflags;
kev.data = kn->kn_id; /* parent */
kev.udata = kn->kn_kevent.udata;/* preserve udata */
error = kqueue_register(kq, &kev, NULL, M_NOWAIT);
if (error)
kn->kn_fflags |= NOTE_TRACKERR;
/*
* Then register another knote to track other potential events
* from the new process.
*/
kev.ident = pid;
kev.filter = kn->kn_filter;
kev.flags = kn->kn_flags | EV_ADD | EV_ENABLE | EV_FLAG1;
kev.fflags = kn->kn_sfflags;
kev.data = kn->kn_id; /* parent */
kev.udata = kn->kn_kevent.udata;/* preserve udata */
error = kqueue_register(kq, &kev, NULL, M_NOWAIT);
if (error)
kn->kn_fflags |= NOTE_TRACKERR;
if (kn->kn_fop->f_event(kn, NOTE_FORK))
KNOTE_ACTIVATE(kn, 0);
list->kl_lock(list->kl_lockarg);
KQ_LOCK(kq);
kn_leave_flux(kn);
KQ_UNLOCK_FLUX(kq);
Delivery
kqueue_scan在active鏈末尾加入哨兵,如果scan時扔出了哨兵,那么遍歷結(jié)束。
每次都從active移除一個節(jié)點(diǎn)(注意檢查timeout,過期也要移除,DISABLE也是在這里移除),如果不是ONESHOP,那么filter帶著query hint重新檢查一遍是否激活,防止途中又被修改。
The rationale for this is the case where data arrives for a socket, which causes the knote to be queued, but the Application happens to call read() and empty the socket buffer before calling kevent. If the knote was still queued, then an event would be returned telling the application to read an empty buffer.
確認(rèn)激活的knote的信息將會拷貝到kevnet通過eventlist返回給應(yīng)用進(jìn)行通知。如果ONESHOP則直接從kqueue中移除,否則如果filter看它仍然active,就把它重新放到active鏈末尾(上次掃描的哨兵之后)。直到哨兵被出列,scan完成。
Miscellaneous Notes
1.論文的版本fork的時候不復(fù)制kqueue的df除非vfork。如果復(fù)制的話需要在fork時進(jìn)行整個kqueue復(fù)制或者標(biāo)記為COW。(現(xiàn)在不知道是不是這么做的)
2.kqueue是通過維護(hù)klist來對整條鏈涉及的所有進(jìn)程進(jìn)行通知的,而不是像poll或者select那樣在sellInfo持有pid。下面這段話看不懂了,沒看過poll不知道啥叫collision。
While this may be a natural outcome from the way knotes are implemented, it also means that the kqueue system is not susceptible to select collisions. As each knote is queued in the active list, only processes sleeping on that kqueue are woken up
3.考慮同一個klist有不同類型的filter,調(diào)用knode時應(yīng)該給予額外信息通知他到底是什么事件觸發(fā)的(例如PROC和SIGNAL容易混淆),因此利用hint確定activity和哪個相關(guān)
4. kevent要經(jīng)歷兩次拷貝,增加了overhead。因此如果采用AIO更好,kernel直接修改user狀態(tài)下的control block。那么為什么不這么做呢?根本原因在于如果內(nèi)核不允許直接寫用戶態(tài)數(shù)據(jù)的話,bug會更好定位,同時應(yīng)用也不需要考慮狀態(tài)。
總結(jié)
精妙之處在于kqueue維持在內(nèi)核中,因此socket如果滿了可以直接將knote加入進(jìn)程kqueue的活躍鏈,而不需要等到下次syscall的時候再檢查。例如,即使我長期不kevent,knote()依然會將他們的activity存儲在knote上并且插入active list,下次只需要遍歷active list而不需要重頭遍歷整個queue。
同時因為kqueue有狀態(tài),進(jìn)行修改也開銷很小,只需要改變變化的那部分就行了。
看的時候還是有些地方比較難理解,加上源代碼也很復(fù)雜,如果有糾錯請指正。
附錄
filechange
struct kevent ev;
struct timespec nullts = { 0, 0 };
EV_SET(&ev, fd, EVFILT_VNODE,
EV_ADD | EV_ENABLE | EV_CLEAR,
NOTE_RENAME | NOTE_WRITE |
NOTE_DELETE | NOTE_ATTRIB, 0, 0);
kevent(kq, &ev, 1, NULL, 0, &nullts);
for (;;) {
n = kevent(kq, NULL, 0, &ev, 1, NULL);
if (n > 0) {
printf("The file was");
if (ev.fflags & NOTE_RENAME)
printf(" renamed");
if (ev.fflags & NOTE_WRITE)
printf(" written");
if (ev.fflags & NOTE_DELETE)
printf(" deleted");
if (ev.fflags & NOTE_ATTRIB)
printf(" chmod/chowned");
printf("n");
}
signal
struct kevent ev;
struct timespec nullts = { 0, 0 };
EV_SET(&ev, SIGHUP, EVFILT_SIGNAL,
EV_ADD | EV_ENABLE, 0, 0, 0);
kevent(kq, &ev, 1, NULL, 0, &nullts);
signal(SIGHUP, SIG_IGN);
for (;;) {
n = kevent(kq, NULL, 0, &ev, 1, NULL);
if (n > 0)
printf("signal %d delivered"
" %d timesn",
ev.ident, ev.data);
}
udata
int i, n;
struct timespec timeout =
{ TMOUT_SEC, TMOUT_NSEC };
void (* fcn)(struct kevent *);
n = kevent(kq, ch, nchanges,
ev, nevents, &timeout);
if (n <= 0)
goto error_or_timeout;
for (i = 0; i < n; i++) {
if (evi.flags & EV_ERROR)
/* error */
fcn = evi.udata;
fcn(&evi);
}






