内核事件通知机制


当进程持有充分大数量的资源时,如何跟踪资源的状态变成了问题,与其挨个轮询,不妨换一种思路,让内核通知我们。 比如在网络编程的条件下,我们可能会同时处理多种连接,可以使用这种接口让内核通知我们哪些套接字可以读写,传入连接等。

在 FreeBSD 中,使用 kqueue() 相关设施提供事件通知机制。 在 Linux 中,使用 epoll() 相关设施提供事件通知机制。

kqueue

kqueue() 需要用到的头文件是:

#include <sys/event.h>

下面先假设定义如下对象:

/* kevent 结构体,用来和 kqueue 相关设施交互 */

/* 下面要复用的临时对象 */
struct kevent event;

/* 用来存放 kevent 返回值,其中是触发的事件和资源的描述 */
struct kevent tevents[5];

/* kqueue 资源的描述符 */
int kq;

使用 kqueue() 创建一个 kqueue 资源描述符:

kq = kqueue()

将需要监控的资源添加进 kqueue,并且设置监控的事件类型:

EV_SET(&event, sock_server, EVFILT_READ, EV_ADD, 0, 0, NULL);
kevent(kq, &event, 1, NULL, 0, NULL);

其中,sock_server 是服务器端套接字,EVFILT_READ 用在服务端套接字上可以处理传入连接,EV_ADD 表示将这个事件加入 kqueue。

事件循环中,通用使用 kevent() 等待内核通知事件:

int numevent;
while(1)
{
    /* tevents 用来存放通知我们的事件,随后的参数是 tevents 所表示数组的长度 */
    /* 返回值是通知我们的事件的个数 */
    numevent = kevent(kq, NULL, 0, tevents, 5, NULL);

    for(int i = 0; i < numevent; ++i)
    {
        struct kevent tevent = tevents[i];

        /* tevent.ident 就是加入监听的文件描述符 */
        if ((int)tevent.ident == sock_server)
        {
            sock_client = accept(sock_server, (struct sockaddr *)&addr_client, (socklen_t *)&addr_client_len);

            /* 加入传入连接 */
            EV_SET(&event, sock_client, EVFILT_READ, EV_ADD, 0, 0, NULL);
            kevent(kq, &event, 1, NULL, 0, NULL);
        }
        /* 在这个例子中,就是其他普通连接的文件描述符 */
        /* 这里使用 EVFILT_READ 表示可以读出请求 */
        else if (tevent.filter & EVFILT_READ)
        {
            int client = tevent.ident;
            read(client, buff, 256);
            write(client, message, strlen(message));
            close(client); /* 文档说关闭的文件描述符自动移除 kqueue */
        }
        else if (tevent.flags & EV_ERROR)
        {
            /* 说明出现问题,此时 tevent.data 存放 errno */
            fprintf(stderr, "kevent(): error: %s", strerror(tevent.data));
        }
        else
        {
            /* 未知事件 */
            fprintf(stderr, "kevent(): unknown event\n");
        }
    }
}

不难看出,kevent() 既可以添加 kqueue,也可以用来取出触发的事件。 其用法是 kevent(kq, changelist, nchanges, eventlist, nevents, timeout),其中 kq 是 kqueue 资源描述符,changlist 是修改 kqueue 监听事件的输入数组,nchanges 指定输入个数,eventlist 是 kqueue 输出触发事件的数组,nevents 指定 eventlist 最大容量,timeout 是该调用的超时事件限制,kevent 返回值表示触发事件的个数。 我们使用 kevent(kq, &event, 1, NULL, 0, NULL) 就是只使用修改监听列表的那一部分,使用 kevent(kq, NULL, 0, tevent, 5, NULL) 就是只使用等待事件的部分。

epoll

epoll() 设施需要用到的头文件是:

#include <sys/epoll.h>

假设下面要用到的对象是:

/* epoll_event 是用来和 epoll 设施交互的结构体 */

/* 用来存放输出触发的事件 */
struct epoll_event events[64];

/* epfd 用来存放 epoll 监听列表的文件描述符,使用 epoll_create 创建监听列表 */
int epfd = epoll_create(1);

将需要监听的资源加入 epoll 中:

struct epoll_event event;
event.events = EPOLLIN | EPOLLOUT | EPOLLET;
event.data.fd = server_sock; // 让 epoll 设施返回文件描述符
epoll_ctl(
    epfd,          // epoll 描述符
    EPOLL_CTL_ADD, // 添加
    server_sock,   // 监听的资源
    &event         // 返回的信息
);

事件循环中,使用 epoll_wait() 等待通知事件:

int ready_fd_num = 0;
while(1)
{
    ready_fd_num = epoll_wait(epfd, events, 64, -1);

    for(int i = 0; i < ready_fd_num; ++i)
    {
        // 服务端套接字,传入连接
        if(events[i].data.fd == server_sock)
        {
            client_sock = accept(server_sock, (struct sockaddr *)&client_addr, &client_addr_len);
            fcntl(client_sock, F_SETFL, fcntl(client_sock, F_GETFL) | O_NONBLOCK);

            // 加入连接套接字
            event.data.fd = client_sock;
            event.events = EPOLLIN;
            epoll_ctl(epfd, EPOLL_CTL_ADD, client_sock, &event);
        }
        // 回复响应
        else if(events[i].events & EPOLLIN)
        {
            write(events[i].data.fd, message, sizeof(message));
            close(events[i].data.fd);
        }
    }
}

完整代码

kqueue in FreeBSD

/* C language */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
/* POSIX */
#include <fcntl.h>
#include <unistd.h>
/* socket */
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
/* kqueue */
#include <sys/event.h>
/* FreeBSD */
#include <err.h>

const char *message = "HTTP/1.1 200 OK\r\n"
"Content-Type: text/html\r\n"
"Connection: close\r\n"
"Content-Length: 20\r\n"
"\r\n"
"<p>hello, world!</p>"
;

int main()
{
    struct kevent event;      // monitor for
    struct kevent tevents[5]; // triggered event
    int kq;  // for kqueue()

    // socket
    int sock_server = -1;
    int sock_client = -1;
    struct sockaddr_in addr_server;
    struct sockaddr_in addr_client;
    int addr_client_len = sizeof(addr_client);
    char buff[256];
    char addr_str[INET_ADDRSTRLEN];

    // server socket
    sock_server = socket(AF_INET, SOCK_STREAM, 0);
    addr_server.sin_family = AF_INET;
    addr_server.sin_addr.s_addr = htonl(INADDR_ANY);
    addr_server.sin_port = htons(8080);

    (void)bind(sock_server, (struct sockaddr *)&addr_server, sizeof(addr_server));
    listen(sock_server, 5);

    // kqueue
    kq = kqueue();

    EV_SET(&event, sock_server, EVFILT_READ, EV_ADD, 0, 0, NULL);
    kevent(kq, &event, 1, NULL, 0, NULL);

    // event loop
    int numevent;
    while(1)
    {
        numevent = kevent(kq, NULL, 0, tevents, 5, NULL);

        if (numevent == -1)
            err(EXIT_FAILURE, "kevent() wait");

        fprintf(stderr, "kevent(): %d event comes\n", numevent);
        for(int i = 0; i < numevent; ++i)
        {
            struct kevent tevent = tevents[i];

            if ((int)tevent.ident == sock_server)
            {
                sock_client = accept(sock_server, (struct sockaddr *)&addr_client, (socklen_t *)&addr_client_len);
                if (sock_client == -1)
                {
                    perror("accept() error");
                    continue;
                }
                printf("request <- %s:%d\n", inet_ntop(addr_client.sin_family, &addr_client.sin_addr, addr_str, sizeof(addr_str)), ntohs(addr_client.sin_port));
                EV_SET(&event, sock_client, EVFILT_READ, EV_ADD, 0, 0, NULL);
                kevent(kq, &event, 1, NULL, 0, NULL);
            }
            else if (tevent.filter & EVFILT_READ)
            {
                int client = tevent.ident;
                memset(buff, 0, 256);
                read(client, buff, 256);
                write(client, message, strlen(message));
                close(client);
            }
            else if (tevent.flags & EV_ERROR)
            {
                fprintf(stderr, "kevent(): error: %s", strerror(tevent.data));
            }
            else
            {
                fprintf(stderr, "kevent(): unknown event\n");
            }
        }
    }

    return 0;
}

epoll in Linux

/* C language */
#include <stdio.h>
#include <stdlib.h>
/* POSIX */
#include <unistd.h>
#include <fcntl.h>
/* socket */
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
/* epoll */
#include <sys/epoll.h>

const char *message = "HTTP/1.1 200 OK\r\n"
"Content-Type: text/html\r\n"
"Connection: close\r\n"
"Content-Length: 20\r\n"
"\r\n"
"<p>hello, world!</p>"
;

int main()
{
    int server_sock = -1;
    server_sock = socket(AF_INET, SOCK_STREAM, 0);

    int sockopt_value = 1;
    setsockopt(server_sock, SOL_SOCKET, SO_REUSEADDR, &sockopt_value, sizeof(sockopt_value))
    setsockopt(server_sock, SOL_SOCKET, SO_REUSEPORT, &sockopt_value, sizeof(sockopt_value))

    struct sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(8080);
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

    int client_sock = -1;
    struct sockaddr_in client_addr;
    size_t client_addr_len = sizeof(client_addr);
    char addr_buff[INET_ADDRSTRLEN];

    bind(server_sock, (struct sockaddr *)&server_addr, sizeof(server_addr))
    listen(server_sock, 5)
 
    struct epoll_event events[64];
    int epfd = epoll_create(1);
    struct epoll_event event;
    event.events = EPOLLIN | EPOLLOUT | EPOLLET;
    event.data.fd = server_sock;
    epoll_ctl(epfd, EPOLL_CTL_ADD, server_sock, &event);

    int ready_fd_num = 0;
    while(1)
    {
        ready_fd_num = epoll_wait(epfd, events, 64, -1);

        if (ready_fd_num < 0) {
            perror("epoll_wait()");
            continue;
        }
        
        for(int i = 0; i < ready_fd_num; ++i)
        {
            // accept connect
            if (events[i].data.fd == server_sock)
            {
                client_sock = accept(server_sock, (struct sockaddr *)&client_addr, &client_addr_len);
                printf("request <- %s:%d\n", inet_ntop(client_addr.sin_family, &client_addr.sin_addr, addr_buff, sizeof(addr_buff)), ntohs(client_addr.sin_port));
                fcntl(client_sock, F_SETFL, fcntl(client_sock, F_GETFL) | O_NONBLOCK);

                event.data.fd = client_sock;
                event.events = EPOLLIN;
                epoll_ctl(epfd, EPOLL_CTL_ADD, client_sock, &event);
            }
            // send response
            else if(events[i].events & EPOLLIN)
            {
                write(events[i].data.fd, message, sizeof(message));
                //epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
                close(events[i].data.fd);
            }
        }
    }

    close(server_sock);

    return 0;
}