/* * libhdfs engine * * this engine helps perform read/write operations on hdfs cluster using * libhdfs. hdfs doesnot support modification of data once file is created. * * so to mimic that create many files of small size (e.g 256k), and this * engine select a file based on the offset generated by fio. * * thus, random reads and writes can also be achieved with this logic. * * NOTE: please set environment variables FIO_HDFS_BS and FIO_HDFS_FCOUNT * to appropriate value to work this engine properly * */ #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/uio.h> #include <errno.h> #include <assert.h> #include "../fio.h" #include "hdfs.h" struct hdfsio_data { char host[256]; int port; hdfsFS fs; hdfsFile fp; unsigned long fsbs; unsigned long fscount; unsigned long curr_file_id; unsigned int numjobs; unsigned int fid_correction; }; static int fio_hdfsio_setup_fs_params(struct hdfsio_data *hd) { /* make sure that hdfsConnect is invoked before executing this function */ hdfsSetWorkingDirectory(hd->fs, "/.perftest"); hd->fp = hdfsOpenFile(hd->fs, ".fcount", O_RDONLY, 0, 0, 0); if (hd->fp) { hdfsRead(hd->fs, hd->fp, &(hd->fscount), sizeof(hd->fscount)); hdfsCloseFile(hd->fs, hd->fp); } hd->fp = hdfsOpenFile(hd->fs, ".fbs", O_RDONLY, 0, 0, 0); if (hd->fp) { hdfsRead(hd->fs, hd->fp, &(hd->fsbs), sizeof(hd->fsbs)); hdfsCloseFile(hd->fs, hd->fp); } return 0; } static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u) { struct hdfsio_data *hd; hdfsFileInfo *fi; unsigned long f_id; char fname[80]; int open_flags = 0; hd = td->io_ops->data; if (hd->curr_file_id == -1) { /* see comment in fio_hdfsio_setup() function */ fio_hdfsio_setup_fs_params(hd); } /* find out file id based on the offset generated by fio */ f_id = (io_u->offset / hd->fsbs) + hd->fid_correction; if (f_id == hd->curr_file_id) { /* file is already open */ return 0; } if (hd->curr_file_id != -1) { hdfsCloseFile(hd->fs, hd->fp); } if (io_u->ddir == DDIR_READ) { open_flags = O_RDONLY; } else if (io_u->ddir == DDIR_WRITE) { open_flags = O_WRONLY; } else { log_err("hdfs: Invalid I/O Operation\n"); } hd->curr_file_id = f_id; do { sprintf(fname, ".f%lu", f_id); fi = hdfsGetPathInfo(hd->fs, fname); if (fi->mSize >= hd->fsbs || io_u->ddir == DDIR_WRITE) { /* file has enough data to read OR file is opened in write mode */ hd->fp = hdfsOpenFile(hd->fs, fname, open_flags, 0, 0, hd->fsbs); if (hd->fp) { break; } } /* file is empty, so try next file for reading */ f_id = (f_id + 1) % hd->fscount; } while (1); return 0; } static int fio_io_end(struct thread_data *td, struct io_u *io_u, int ret) { if (ret != (int)io_u->xfer_buflen) { if (ret >= 0) { io_u->resid = io_u->xfer_buflen - ret; io_u->error = 0; return FIO_Q_COMPLETED; } else io_u->error = errno; } if (io_u->error) td_verror(td, io_u->error, "xfer"); return FIO_Q_COMPLETED; } static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u) { struct hdfsio_data *hd; int ret = 0; hd = td->io_ops->data; if (io_u->ddir == DDIR_READ) { ret = hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen); } else if (io_u->ddir == DDIR_WRITE) { ret = hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen); } else { log_err("hdfs: Invalid I/O Operation\n"); } return fio_io_end(td, io_u, ret); } int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f) { struct hdfsio_data *hd; hd = td->io_ops->data; hd->fs = hdfsConnect(hd->host, hd->port); hdfsSetWorkingDirectory(hd->fs, "/.perftest"); hd->fid_correction = (getpid() % hd->numjobs); return 0; } int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f) { struct hdfsio_data *hd; hd = td->io_ops->data; hdfsDisconnect(hd->fs); return 0; } static int fio_hdfsio_setup(struct thread_data *td) { struct hdfsio_data *hd; struct fio_file *f; static unsigned int numjobs = 1; /* atleast one job has to be there! */ numjobs = (td->o.numjobs > numjobs) ? td->o.numjobs : numjobs; if (!td->io_ops->data) { hd = malloc(sizeof(*hd));; memset(hd, 0, sizeof(*hd)); td->io_ops->data = hd; /* separate host and port from filename */ *(strchr(td->o.filename, ',')) = ' '; sscanf(td->o.filename, "%s%d", hd->host, &(hd->port)); /* read fbs and fcount and based on that set f->real_file_size */ f = td->files[0]; #if 0 /* IMHO, this should be done here instead of fio_hdfsio_prep() * but somehow calling it here doesn't seem to work, * some problem with libhdfs that needs to be debugged */ hd->fs = hdfsConnect(hd->host, hd->port); fio_hdfsio_setup_fs_params(hd); hdfsDisconnect(hd->fs); #else /* so, as an alternate, using environment variables */ if (getenv("FIO_HDFS_FCOUNT") && getenv("FIO_HDFS_BS")) { hd->fscount = atol(getenv("FIO_HDFS_FCOUNT")); hd->fsbs = atol(getenv("FIO_HDFS_BS")); } else { log_err("FIO_HDFS_FCOUNT and/or FIO_HDFS_BS not set.\n"); return 1; } #endif f->real_file_size = hd->fscount * hd->fsbs; td->o.nr_files = 1; hd->curr_file_id = -1; hd->numjobs = numjobs; fio_file_set_size_known(f); } return 0; } static struct ioengine_ops ioengine_hdfs = { .name = "libhdfs", .version = FIO_IOOPS_VERSION, .setup = fio_hdfsio_setup, .prep = fio_hdfsio_prep, .queue = fio_hdfsio_queue, .open_file = fio_hdfsio_open_file, .close_file = fio_hdfsio_close_file, .flags = FIO_SYNCIO, }; static void fio_init fio_hdfsio_register(void) { register_ioengine(&ioengine_hdfs); } static void fio_exit fio_hdfsio_unregister(void) { unregister_ioengine(&ioengine_hdfs); }