/*
* version of copy command using async i/o
* From: Stephen Hemminger <shemminger@osdl.org>
* Modified by Daniel McNeil <daniel@osdl.org> for testing aio.
* - added -a alignment
* - added -b blksize option
* _ added -s size option
* - added -f open_flag option
* - added -w (no write) option (reads from source only)
* - added -n (num aio) option
* - added -z (zero dest) opton (writes zeros to dest only)
* - added -D delay_ms option
*
* Copy file by using a async I/O state machine.
* 1. Start read request
* 2. When read completes turn it into a write request
* 3. When write completes decrement counter and free resources
*
*
* Usage: aiocp [-b blksize] -n [num_aio] [-w] [-z] [-s filesize]
* [-f DIRECT|TRUNC|CREAT|SYNC|LARGEFILE] src dest
*/
#define _GNU_SOURCE
#include <unistd.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/param.h>
#include <fcntl.h>
#include <errno.h>
#include <stdlib.h>
#include <mntent.h>
#include <sys/select.h>
#include <sys/mount.h>
#include "config.h"
#include "tst_res_flags.h"
#ifdef HAVE_LIBAIO
#include <libaio.h>
#define AIO_BLKSIZE (64*1024)
#define AIO_MAXIO 32
static int aio_blksize = AIO_BLKSIZE;
static int aio_maxio = AIO_MAXIO;
static int busy = 0; // # of I/O's in flight
static int tocopy = 0; // # of blocks left to copy
static int srcfd; // source fd
static int srcfd2; // source fd - end of file non-sector
static int dstfd = -1; // destination file descriptor
static int dstfd2 = -1; // Handle end of file for non-sector size
static const char *dstname = NULL;
static const char *srcname = NULL;
static int source_open_flag = O_RDONLY; /* open flags on source file */
static int dest_open_flag = O_WRONLY; /* open flags on dest file */
static int no_write; /* do not write */
static int zero; /* write zero's only */
static int debug;
static int count_io_q_waits; /* how many time io_queue_wait called */
struct iocb **iocb_free; /* array of pointers to iocb */
int iocb_free_count; /* current free count */
int alignment = 512; /* buffer alignment */
struct timeval delay; /* delay between i/o */
static int dev_block_size_by_path(const char *path)
{
FILE *f;
struct mntent *mnt;
size_t prefix_len, prefix_max = 0;
char dev_name[1024];
int fd, size;
if (!path)
return 0;
f = setmntent("/proc/mounts", "r");
if (!f) {
fprintf(stderr, "Failed to open /proc/mounts\n");
return 0;
}
while ((mnt = getmntent(f))) {
/* Skip pseudo fs */
if (mnt->mnt_fsname[0] != '/')
continue;
prefix_len = strlen(mnt->mnt_dir);
if (prefix_len > prefix_max &&
!strncmp(path, mnt->mnt_dir, prefix_len)) {
prefix_max = prefix_len;
strncpy(dev_name, mnt->mnt_fsname, sizeof(dev_name));
dev_name[sizeof(dev_name)-1] = '\0';
}
}
endmntent(f);
if (!prefix_max) {
fprintf(stderr, "Path '%s' not found in /proc/mounts\n", path);
return 0;
}
printf("Path '%s' is on device '%s'\n", path, dev_name);
fd = open(dev_name, O_RDONLY);
if (!fd) {
fprintf(stderr, "open('%s'): %s\n", dev_name, strerror(errno));
return 0;
}
if (ioctl(fd, BLKSSZGET, &size)) {
fprintf(stderr, "ioctl(BLKSSZGET): %s\n", strerror(errno));
close(fd);
return 0;
}
close(fd);
printf("'%s' has block size %i\n", dev_name, size);
return size;
}
int init_iocb(int n, int iosize)
{
void *buf;
int i;
if ((iocb_free = malloc(n * sizeof(struct iocb *))) == 0) {
return -1;
}
for (i = 0; i < n; i++) {
if (!
(iocb_free[i] = malloc(sizeof(struct iocb))))
return -1;
if (posix_memalign(&buf, alignment, iosize))
return -1;
if (debug > 1) {
printf("buf allocated at 0x%p, align:%d\n",
buf, alignment);
}
if (zero) {
/*
* We are writing zero's to dstfd
*/
memset(buf, 0, iosize);
}
io_prep_pread(iocb_free[i], -1, buf, iosize, 0);
}
iocb_free_count = i;
return 0;
}
static struct iocb *alloc_iocb(void)
{
if (!iocb_free_count)
return 0;
return iocb_free[--iocb_free_count];
}
void free_iocb(struct iocb *io)
{
iocb_free[iocb_free_count++] = io;
}
/*
* io_wait_run() - wait for an io_event and then call the callback.
*/
int io_wait_run(io_context_t ctx, struct timespec *to)
{
struct io_event events[aio_maxio];
struct io_event *ep;
int ret, n;
/*
* get up to aio_maxio events at a time.
*/
ret = n = io_getevents(ctx, 1, aio_maxio, events, to);
/*
* Call the callback functions for each event.
*/
for (ep = events; n-- > 0; ep++) {
io_callback_t cb = (io_callback_t) ep->data;
struct iocb *iocb = ep->obj;
if (debug > 1) {
fprintf(stderr, "ev:%p iocb:%p res:%ld res2:%ld\n",
ep, iocb, ep->res, ep->res2);
}
cb(ctx, iocb, ep->res, ep->res2);
}
return ret;
}
/* Fatal error handler */
static void io_error(const char *func, int rc)
{
if (rc == -ENOSYS)
fprintf(stderr, "AIO not in this kernel\n");
else if (rc < 0)
fprintf(stderr, "%s: %s\n", func, strerror(-rc));
else
fprintf(stderr, "%s: error %d\n", func, rc);
if (dstfd > 0)
close(dstfd);
if (dstname && dest_open_flag & O_CREAT)
unlink(dstname);
exit(1);
}
/*
* Write complete callback.
* Adjust counts and free resources
*/
static void wr_done(io_context_t ctx, struct iocb *iocb, long res, long res2)
{
if (res2 != 0) {
io_error("aio write", res2);
}
if (res != iocb->u.c.nbytes) {
fprintf(stderr, "write missed bytes expect %lu got %ld\n",
iocb->u.c.nbytes, res);
exit(1);
}
--tocopy;
--busy;
free_iocb(iocb);
if (debug)
write(2, "w", 1);
}
/*
* Read complete callback.
* Change read iocb into a write iocb and start it.
*/
static void rd_done(io_context_t ctx, struct iocb *iocb, long res, long res2)
{
/* library needs accessors to look at iocb? */
int iosize = iocb->u.c.nbytes;
char *buf = iocb->u.c.buf;
off_t offset = iocb->u.c.offset;
if (res2 != 0)
io_error("aio read", res2);
if (res != iosize) {
fprintf(stderr, "read missing bytes expect %lu got %ld\n",
iocb->u.c.nbytes, res);
exit(1);
}
/* turn read into write */
if (no_write) {
--tocopy;
--busy;
free_iocb(iocb);
} else {
int fd;
if (iocb->aio_fildes == srcfd)
fd = dstfd;
else
fd = dstfd2;
io_prep_pwrite(iocb, fd, buf, iosize, offset);
io_set_callback(iocb, wr_done);
if (1 != (res = io_submit(ctx, 1, &iocb)))
io_error("io_submit write", res);
}
if (debug)
write(2, "r", 1);
if (debug > 1)
printf("%d", iosize);
}
static void usage(void)
{
fprintf(stderr,
"Usage: aiocp [-a align] [-s size] [-b blksize] [-n num_io]"
" [-f open_flag] SOURCE DEST\n"
"This copies from SOURCE to DEST using AIO.\n\n"
"Usage: aiocp [options] -w SOURCE\n"
"This does sequential AIO reads (no writes).\n\n"
"Usage: aiocp [options] -z DEST\n"
"This does sequential AIO writes of zeros.\n");
exit(1);
}
/*
* Scale value by kilo, mega, or giga.
*/
long long scale_by_kmg(long long value, char scale)
{
switch (scale) {
case 'g':
case 'G':
value *= 1024;
case 'm':
case 'M':
value *= 1024;
case 'k':
case 'K':
value *= 1024;
break;
case '\0':
break;
default:
usage();
break;
}
return value;
}
int main(int argc, char *const *argv)
{
struct stat st;
off_t length = 0, offset = 0;
off_t leftover = 0;
io_context_t myctx;
int c;
extern char *optarg;
extern int optind, opterr, optopt;
while ((c = getopt(argc, argv, "a:b:df:n:s:wzD:")) != -1) {
char *endp;
switch (c) {
case 'a': /* alignment of data buffer */
alignment = strtol(optarg, &endp, 0);
alignment = (long)scale_by_kmg((long long)alignment,
*endp);
break;
case 'f': /* use these open flags */
if (strcmp(optarg, "LARGEFILE") == 0 ||
strcmp(optarg, "O_LARGEFILE") == 0) {
source_open_flag |= O_LARGEFILE;
dest_open_flag |= O_LARGEFILE;
} else if (strcmp(optarg, "TRUNC") == 0 ||
strcmp(optarg, "O_TRUNC") == 0) {
dest_open_flag |= O_TRUNC;
} else if (strcmp(optarg, "SYNC") == 0 ||
strcmp(optarg, "O_SYNC") == 0) {
dest_open_flag |= O_SYNC;
} else if (strcmp(optarg, "DIRECT") == 0 ||
strcmp(optarg, "O_DIRECT") == 0) {
source_open_flag |= O_DIRECT;
dest_open_flag |= O_DIRECT;
} else if (strncmp(optarg, "CREAT", 5) == 0 ||
strncmp(optarg, "O_CREAT", 5) == 0) {
dest_open_flag |= O_CREAT;
}
break;
case 'd':
debug++;
break;
case 'D':
delay.tv_usec = atoi(optarg);
break;
case 'b': /* block size */
aio_blksize = strtol(optarg, &endp, 0);
aio_blksize =
(long)scale_by_kmg((long long)aio_blksize, *endp);
break;
case 'n': /* num io */
aio_maxio = strtol(optarg, &endp, 0);
break;
case 's': /* size to transfer */
length = strtoll(optarg, &endp, 0);
length = scale_by_kmg(length, *endp);
break;
case 'w': /* no write */
no_write = 1;
break;
case 'z': /* write zero's */
zero = 1;
break;
default:
usage();
}
}
argc -= optind;
argv += optind;
if (argc < 1) {
usage();
}
if (!zero) {
if ((srcfd = open(srcname = *argv, source_open_flag)) < 0) {
perror(srcname);
exit(1);
}
argv++;
argc--;
if (fstat(srcfd, &st) < 0) {
perror("fstat");
exit(1);
}
if (length == 0)
length = st.st_size;
}
if (!no_write) {
/*
* We are either copying or writing zeros to dstname
*/
if (argc < 1) {
usage();
}
if ((dstfd = open(dstname = *argv, dest_open_flag, 0666)) < 0) {
perror(dstname);
exit(1);
}
if (zero) {
/*
* get size of dest, if we are zeroing it.
* TODO: handle devices.
*/
if (fstat(dstfd, &st) < 0) {
perror("fstat");
exit(1);
}
if (length == 0)
length = st.st_size;
}
}
/*
* O_DIRECT cannot handle non-sector sizes
*/
if (dest_open_flag & O_DIRECT) {
int src_alignment = dev_block_size_by_path(srcname);
int dst_alignment = dev_block_size_by_path(dstname);
/*
* Given we expect the block sizes to be multiple of 2 the
* larger is always divideable by the smaller, so we only need
* to care about maximum.
*/
if (src_alignment > dst_alignment)
dst_alignment = src_alignment;
if (alignment < dst_alignment) {
alignment = dst_alignment;
printf("Forcing aligment to %i\n", alignment);
}
if (aio_blksize % alignment) {
printf("Block size is not multiple of drive block size\n");
printf("Skipping the test!\n");
exit(0);
}
leftover = length % alignment;
if (leftover) {
int flag;
length -= leftover;
if (!zero) {
flag = source_open_flag & ~O_DIRECT;
srcfd2 = open(srcname, flag);
if (srcfd2 < 0) {
perror(srcname);
exit(1);
}
}
if (!no_write) {
flag = (O_SYNC | dest_open_flag) &
~(O_DIRECT | O_CREAT);
dstfd2 = open(dstname, flag);
if (dstfd2 < 0) {
perror(dstname);
exit(1);
}
}
}
}
/* initialize state machine */
memset(&myctx, 0, sizeof(myctx));
io_queue_init(aio_maxio, &myctx);
tocopy = howmany(length, aio_blksize);
if (init_iocb(aio_maxio, aio_blksize) < 0) {
fprintf(stderr, "Error allocating the i/o buffers\n");
exit(1);
}
while (tocopy > 0) {
int i, rc;
/* Submit as many reads as once as possible upto aio_maxio */
int n = MIN(MIN(aio_maxio - busy, aio_maxio),
howmany(length - offset, aio_blksize));
if (n > 0) {
struct iocb *ioq[n];
for (i = 0; i < n; i++) {
struct iocb *io = alloc_iocb();
int iosize = MIN(length - offset, aio_blksize);
if (zero) {
/*
* We are writing zero's to dstfd
*/
io_prep_pwrite(io, dstfd, io->u.c.buf,
iosize, offset);
io_set_callback(io, wr_done);
} else {
io_prep_pread(io, srcfd, io->u.c.buf,
iosize, offset);
io_set_callback(io, rd_done);
}
ioq[i] = io;
offset += iosize;
}
rc = io_submit(myctx, n, ioq);
if (rc < 0)
io_error("io_submit", rc);
busy += n;
if (debug > 1)
printf("io_submit(%d) busy:%d\n", n, busy);
if (delay.tv_usec) {
struct timeval t = delay;
(void)select(0, 0, 0, 0, &t);
}
}
/*
* We have submitted all the i/o requests. Wait for at least one to complete
* and call the callbacks.
*/
count_io_q_waits++;
rc = io_wait_run(myctx, 0);
if (rc < 0)
io_error("io_wait_run", rc);
if (debug > 1) {
printf("io_wait_run: rc == %d\n", rc);
printf("busy:%d aio_maxio:%d tocopy:%d\n",
busy, aio_maxio, tocopy);
}
}
if (leftover) {
/* non-sector size end of file */
struct iocb *io = alloc_iocb();
int rc;
if (zero) {
/*
* We are writing zero's to dstfd2
*/
io_prep_pwrite(io, dstfd2, io->u.c.buf,
leftover, offset);
io_set_callback(io, wr_done);
} else {
io_prep_pread(io, srcfd2, io->u.c.buf,
leftover, offset);
io_set_callback(io, rd_done);
}
rc = io_submit(myctx, 1, &io);
if (rc < 0)
io_error("io_submit", rc);
count_io_q_waits++;
rc = io_wait_run(myctx, 0);
if (rc < 0)
io_error("io_wait_run", rc);
}
if (srcfd != -1)
close(srcfd);
if (dstfd != -1)
close(dstfd);
exit(0);
}
/*
* Results look like:
* [alanm@toolbox ~/MOT3]$ ../taio -d kernel-source-2.4.8-0.4g.ppc.rpm abc
* rrrrrrrrrrrrrrrwwwrwrrwwrrwrwwrrwrwrwwrrwrwrrrrwwrwwwrrwrrrwwwwwwwwwwwwwwwww
* rrrrrrrrrrrrrrwwwrrwrwrwrwrrwwwwwwwwwwwwwwrrrrrrrrrrrrrrrrrrwwwwrwrwwrwrwrwr
* wrrrrrrrwwwwwwwwwwwwwrrrwrrrwrrwrwwwwwwwwwwrrrrwwrwrrrrrrrrrrrwwwwwwwwwwwrww
* wwwrrrrrrrrwwrrrwwrwrwrwwwrrrrrrrwwwrrwwwrrwrwwwwwwwwrrrrrrrwwwrrrrrrrwwwwww
* wwwwwwwrwrrrrrrrrwrrwrrwrrwrwrrrwrrrwrrrwrwwwwwwwwwwwwwwwwwwrrrwwwrrrrrrrrrr
* rrwrrrrrrwrrwwwwwwwwwwwwwwwwrwwwrrwrwwrrrrrrrrrrrrrrrrrrrwwwwwwwwwwwwwwwwwww
* rrrrrwrrwrwrwrrwrrrwwwwwwwwrrrrwrrrwrwwrwrrrwrrwrrrrwwwwwwwrwrwwwwrwwrrrwrrr
* rrrwwwwwwwrrrrwwrrrrrrrrrrrrwrwrrrrwwwwwwwwwwwwwwrwrrrrwwwwrwrrrrwrwwwrrrwww
* rwwrrrrrrrwrrrrrrrrrrrrwwwwrrrwwwrwrrwwwwwwwwwwwwwwwwwwwwwrrrrrrrwwwwwwwrw
*/
#else
int main(void)
{
fprintf(stderr, "test requires libaio and it's development packages\n");
return TCONF;
}
#endif