我正在测试内核异步io函数(不是posix aio),并试图弄清楚它是如何工作的。下面的代码是一个完整的程序,其中我简单地将一个数组重复写入使用O_DIRECT打开的文件中。我在回调函数中收到一个错误“写错的字节期望1024得到0”(请参阅work_done()中的fprintf语句)。
对于不熟悉内核aio的用户,以下代码将执行以下操作:
我在步骤5遇到错误。如果不使用O_DIRECT打开文件,一切正常,但它超出了进行异步写入的目的。有人可以告诉我我在做什么错吗?这是内核aio的正确用法吗,例如,我对回调的使用正确吗?O_DIRECT的使用是否受到限制?
我使用’gcc -Wall test.c -laio’进行编译
提前致谢。
/* * File: myaiocp.c * Author: kmehta * * Created on July 11, 2011, 12:50 PM * * * Testing kernel aio. * Program creates a 2D matrix and writes it multiple times to create a file of desired size. * Writes are performed using kernel aio functions (io_prep_pwrite, io_submit, etc.) */ #define _GNU_SOURCE #define _XOPEN_SOURCE 600 #include <stdio.h> #include <stdlib.h> #include <getopt.h> #include <pthread.h> #include <fcntl.h> #include <string.h> #include <sys/uio.h> #include <sys/time.h> #include <omp.h> #include <unistd.h> #include <sys/types.h> #include <sys/stat.h> #include <errno.h> #include <libaio.h> char ** buf; long seg_size; int seg_rows; double total_size; char * filename; static int wait_count = 0; void io_task(); void cleanup(); void allocate_2D_matrix(int[]); int file_open(char *); void wr_done(io_context_t ctx, struct iocb* iocb, long res, long res2); int main(int argc, char **argv) { total_size = 1048576; //1MB seg_size = 1024; //1kB seg_rows = 1024; filename = "aio.out"; int dims[] = {seg_rows, seg_size}; allocate_2D_matrix(dims); //Creates 2D matrix io_task(); cleanup(); return 0; } /* * Create a 2D matrix */ void allocate_2D_matrix(int dims[2]) { int i; char *data; //create the matrix data = (char *) calloc(1, dims[0] * dims[1] * sizeof (char)); if (data == NULL) { printf("\nCould not allocate memory for matrix.\n"); exit(1); } buf = (char **) malloc(dims[0] * sizeof (char *)); if (buf == NULL) { printf("\nCould not allocate memory for matrix.\n"); exit(1); } for (i = 0; i < dims[0]; i++) { buf[i] = &(data[i * dims[1]]); } } static void io_error(const char *func, int rc) { if (rc == -ENOSYS) fprintf(stderr, "AIO not in this kernel\n"); else if (rc < 0) fprintf(stderr, "%s: %s\n", func, strerror(-rc)); else fprintf(stderr, "%s: error %d\n", func, rc); exit(1); } /* * Callback function */ static void work_done(io_context_t ctx, struct iocb *iocb, long res, long res2) { if (res2 != 0) { io_error("aio write", res2); } if (res != iocb->u.c.nbytes) { fprintf(stderr, "write missed bytes expect %lu got %ld\n", iocb->u.c.nbytes, res2); exit(1); } wait_count --; printf("%d ", wait_count); } /* * Wait routine. Get events and call the callback function work_done() */ int io_wait_run(io_context_t ctx, long iter) { struct io_event events[iter]; struct io_event *ep; int ret, n; /* * get up to aio_maxio events at a time. */ ret = n = io_getevents(ctx, iter, iter, events, NULL); printf("got %d events\n", n); /* * Call the callback functions for each event. */ for (ep = events ; n-- > 0 ; ep++) { io_callback_t cb = (io_callback_t)ep->data ; struct iocb *iocb = ep->obj ; cb(ctx, iocb, ep->res, ep->res2); } return ret; } void io_task() { long offset = 0; int bufIndex = 0; //Open file int fd = file_open(filename); //Initialize structures long i; long iter = total_size / seg_size; //No. of iterations to reach desired file size (total_size) io_context_t myctx; if(0 != io_queue_init(iter, &myctx)) { perror("Could not initialize io queue"); exit(EXIT_FAILURE); } struct iocb * ioq[iter]; //loop through iter times to reach desired file size for (i = 0; i < iter; i++) { struct iocb *io = (struct iocb*) malloc(sizeof (struct iocb)); io_prep_pwrite(io, fd, buf[bufIndex], seg_size, offset); io_set_callback(io, work_done); ioq[i] = io; offset += seg_size; bufIndex ++; if (bufIndex > seg_rows - 1) //If entire matrix written, start again from index 0 bufIndex = 0; } printf("done preparing. Now submitting..\n"); if(iter != io_submit(myctx, iter, ioq)) { perror("Failure on submit"); exit(EXIT_FAILURE); } printf("now awaiting completion..\n"); wait_count = iter; int res; while (wait_count) { res = io_wait_run(myctx, iter); if (res < 0) io_error("io_wait_run", res); } close(fd); } void cleanup() { free(buf[0]); free(buf); } int file_open(char *filename) { int fd; if (-1 == (fd = open(filename, O_DIRECT | O_CREAT | O_WRONLY | O_TRUNC, 0666))) { printf("\nError opening file. \n"); exit(-1); } return fd; }
首先,使用libaioPOSIX代替POSIX 做好工作aio。
libaio
aio
O_DIRECT的使用是否受到限制?
我不是100%确定这是真正的问题,但是 O_DIRECT 有一些要求(主要引用TLPI):
O_DIRECT
posix_memalign
乍一看,我可以看到您没有采取进一步的措施来对齐中的内存allocate_2D_matrix。
allocate_2D_matrix
如果我不使用O_DIRECT打开文件,则一切正常,但它超出了进行异步写入的目的。
碰巧并非如此。在没有异步I / O的情况下,它可以很好地工作O_DIRECT(例如,考虑大幅减少的系统调用数量)。