servcraft/p7 - Stackful协程库


BSD
Linux
C/C++

软件简介

servcraft/p7是使用C实现的stackful协程库,提供“半”透明的CSP并发。

目前,servcraft只支持Linux/GCC/pthread. 由于阻塞IO封装使用了epoll, 推荐使用Linux 2.6以上。

若要利用servcraft提供的GCC闭包功能,请使用主版本号不低于5的GCC.

servcraft/p7所使用的协程(coroutines, coro)提供类似goroutine的功能。

  • 编写并发程序的过程较一般事件驱动框架更简单。每个协程假定自己和其他协程一样,被并发地调度且独立地执行,因而可以顺序编写逻辑。

  • 为使用文件描述符的系统调用提供了IO阻塞-重调度接口。协程阻塞在IO操作上时自动暂停执行、引发重调度,IO就绪时重新获得调度权。

  • 提供Actor风格的协程间通信。协程可以注册自身名字,并按注册的协程名向其他协程的内建邮箱中(零拷贝地)投递信息。

  • 所有IO和协程间通信的阻塞接口都自带超时定时器。

  • 提供协程间瘦自旋锁和读写自旋锁。读写自旋锁使用尽量公平对待读者、写者的机制。

简介和简单demo可以在这里获得。一个粗暴的echo
server如下:

#include    <stdio.h>
#include    "./net_common.h"        // common header for necessary network APIs
#include    <sys/stat.h>
#include    "../servcraft/p7/libp7.h"
#include    "../servcraft/p7/p7_root_alloc.h"
#include    "../servcraft/include/model_alloc.h"

void test_echo(void *arg) {
    intptr_t fd_conn_i64crap = (long long) arg;
    int fd_conn = (int) (fd_conn_i64crap & 0xFFFFFFFF);
    char msg[32];
    memset(msg, 0, sizeof(char) * 32);
    int ret = p7_iowrap_timed(recv, P7_IOMODE_READ, 30000, fd_conn, msg, sizeof(msg), 0);
    switch (ret) {
        case -1:
            printf("internal error\n");
            break;
        case -2:
            printf("p7 timed out\n");
            break;
        default:
            p7_iowrap(send, P7_IOMODE_WRITE, fd_conn, msg, strlen(msg), 0);
    }
    close(fd_conn);
}

void test_timeout(void *arg) {
    long long fd_conn_i64crap = (long long) arg;
    int fd_conn = (int) (fd_conn_i64crap & 0xFFFFFFFF);
    printf("Timed out: %d\n", fd_conn);
}

void test_echo_wrapper(void *arg) {
    p7_coro_concat(test_echo, arg, 2048);
}

void test_startup(void *unused) {
    printf("startup\n");
}

int main(int argc, char *argv[]) {
    __auto_type allocator = p7_root_alloc_get_allocator();
    allocator->allocator_.closure_ = malloc;
    allocator->deallocator_.closure_ = free;
    allocator->reallocator_.closure_ = realloc;

    struct p7_init_config config;
    config.namespace_config.namespace_size = 1024;
    config.namespace_config.rwlock_granularity = 10;
    config.namespace_config.spintime = 400;
    config.pthread_config.nthreads = atoi(argv[1]);
    config.pthread_config.at_startup = test_startup;
    config.pthread_config.arg_startup = NULL;
    p7_init(config);

    int fd_listen;

    if ((fd_listen = socket(PF_INET, SOCK_STREAM, 0)) == -1) {
        perror("socket");
        exit(1);
    }

    struct sockaddr_in lserv;
    lserv.sin_family = AF_INET;
    lserv.sin_port = htons(8080);
    lserv.sin_addr.s_addr = inet_addr(argv[2]);
    bzero(&(lserv.sin_zero), 8);

    if (bind(fd_listen, (struct sockaddr *) &lserv, sizeof(lserv)) == -1) {
        perror("bind");
        exit(1);
    }
    
    if (listen(fd_listen, 5) == -1) {
        perror("listen");
        exit(1);
    }

    while (1) {
        struct sockaddr_in rcli;
        socklen_t addrlen = sizeof(rcli);
        int fd_conn = p7_iowrap(accept, P7_IOMODE_READ, fd_listen, (struct sockaddr *) &rcli, &addrlen);
        intptr_t fd_conn_i64crap = fd_conn;
        p7_coro_create(test_echo_wrapper, (void *) fd_conn_i64crap, 512);
    }

    return 0;
}