/*
 * libhdfs engine
 *
 * this engine helps perform read/write operations on hdfs cluster using
 * libhdfs. hdfs doesnot support modification of data once file is created.
 *
 * so to mimic that create many files of small size (e.g 256k), and this
 * engine select a file based on the offset generated by fio.
 *
 * thus, random reads and writes can also be achieved with this logic.
 *
 * NOTE: please set environment variables FIO_HDFS_BS and FIO_HDFS_FCOUNT
 * to appropriate value to work this engine properly
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/uio.h>
#include <errno.h>
#include <assert.h>

#include "../fio.h"

#include "hdfs.h"

struct hdfsio_data {
	char host[256];
	int port;
	hdfsFS fs;
	hdfsFile fp;
	unsigned long fsbs;
	unsigned long fscount;
	unsigned long curr_file_id;
	unsigned int numjobs;
	unsigned int fid_correction;
};

static int fio_hdfsio_setup_fs_params(struct hdfsio_data *hd)
{
	/* make sure that hdfsConnect is invoked before executing this function */
	hdfsSetWorkingDirectory(hd->fs, "/.perftest");
	hd->fp = hdfsOpenFile(hd->fs, ".fcount", O_RDONLY, 0, 0, 0);
	if (hd->fp) {
		hdfsRead(hd->fs, hd->fp, &(hd->fscount), sizeof(hd->fscount));
		hdfsCloseFile(hd->fs, hd->fp);
	}
	hd->fp = hdfsOpenFile(hd->fs, ".fbs", O_RDONLY, 0, 0, 0);
	if (hd->fp) {
		hdfsRead(hd->fs, hd->fp, &(hd->fsbs), sizeof(hd->fsbs));
		hdfsCloseFile(hd->fs, hd->fp);
	}

	return 0;
}

static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
{
	struct hdfsio_data *hd;
	hdfsFileInfo *fi;
	unsigned long f_id;
	char fname[80];
	int open_flags = 0;

	hd = td->io_ops->data;

	if (hd->curr_file_id == -1) {
		/* see comment in fio_hdfsio_setup() function */
		fio_hdfsio_setup_fs_params(hd);
	}

	/* find out file id based on the offset generated by fio */
	f_id = (io_u->offset / hd->fsbs) + hd->fid_correction;

	if (f_id == hd->curr_file_id) {
		/* file is already open */
		return 0;
	}

	if (hd->curr_file_id != -1) {
		hdfsCloseFile(hd->fs, hd->fp);
	}

	if (io_u->ddir == DDIR_READ) {
		open_flags = O_RDONLY;
	} else if (io_u->ddir == DDIR_WRITE) {
		open_flags = O_WRONLY;
	} else {
		log_err("hdfs: Invalid I/O Operation\n");
	}

	hd->curr_file_id = f_id;
	do {
		sprintf(fname, ".f%lu", f_id);
		fi = hdfsGetPathInfo(hd->fs, fname);
		if (fi->mSize >= hd->fsbs || io_u->ddir == DDIR_WRITE) {
			/* file has enough data to read OR file is opened in write mode */
			hd->fp =
			    hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
					 hd->fsbs);
			if (hd->fp) {
				break;
			}
		}
		/* file is empty, so try next file for reading */
		f_id = (f_id + 1) % hd->fscount;
	} while (1);

	return 0;
}

static int fio_io_end(struct thread_data *td, struct io_u *io_u, int ret)
{
	if (ret != (int)io_u->xfer_buflen) {
		if (ret >= 0) {
			io_u->resid = io_u->xfer_buflen - ret;
			io_u->error = 0;
			return FIO_Q_COMPLETED;
		} else
			io_u->error = errno;
	}

	if (io_u->error)
		td_verror(td, io_u->error, "xfer");

	return FIO_Q_COMPLETED;
}

static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u)
{
	struct hdfsio_data *hd;
	int ret = 0;

	hd = td->io_ops->data;

	if (io_u->ddir == DDIR_READ) {
		ret =
		    hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
	} else if (io_u->ddir == DDIR_WRITE) {
		ret =
		    hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
			      io_u->xfer_buflen);
	} else {
		log_err("hdfs: Invalid I/O Operation\n");
	}

	return fio_io_end(td, io_u, ret);
}

int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
{
	struct hdfsio_data *hd;

	hd = td->io_ops->data;
	hd->fs = hdfsConnect(hd->host, hd->port);
	hdfsSetWorkingDirectory(hd->fs, "/.perftest");
	hd->fid_correction = (getpid() % hd->numjobs);

	return 0;
}

int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
{
	struct hdfsio_data *hd;

	hd = td->io_ops->data;
	hdfsDisconnect(hd->fs);

	return 0;
}

static int fio_hdfsio_setup(struct thread_data *td)
{
	struct hdfsio_data *hd;
	struct fio_file *f;
	static unsigned int numjobs = 1;	/* atleast one job has to be there! */
	numjobs = (td->o.numjobs > numjobs) ? td->o.numjobs : numjobs;

	if (!td->io_ops->data) {
		hd = malloc(sizeof(*hd));;

		memset(hd, 0, sizeof(*hd));
		td->io_ops->data = hd;

		/* separate host and port from filename */
		*(strchr(td->o.filename, ',')) = ' ';
		sscanf(td->o.filename, "%s%d", hd->host, &(hd->port));

		/* read fbs and fcount and based on that set f->real_file_size */
		f = td->files[0];
#if 0
		/* IMHO, this should be done here instead of fio_hdfsio_prep()
		 * but somehow calling it here doesn't seem to work,
		 * some problem with libhdfs that needs to be debugged */
		hd->fs = hdfsConnect(hd->host, hd->port);
		fio_hdfsio_setup_fs_params(hd);
		hdfsDisconnect(hd->fs);
#else
		/* so, as an alternate, using environment variables */
		if (getenv("FIO_HDFS_FCOUNT") && getenv("FIO_HDFS_BS")) {
			hd->fscount = atol(getenv("FIO_HDFS_FCOUNT"));
			hd->fsbs = atol(getenv("FIO_HDFS_BS"));
		} else {
			log_err("FIO_HDFS_FCOUNT and/or FIO_HDFS_BS not set.\n");
			return 1;
		}
#endif
		f->real_file_size = hd->fscount * hd->fsbs;

		td->o.nr_files = 1;
		hd->curr_file_id = -1;
		hd->numjobs = numjobs;
		fio_file_set_size_known(f);
	}

	return 0;
}

static struct ioengine_ops ioengine_hdfs = {
	.name = "libhdfs",
	.version = FIO_IOOPS_VERSION,
	.setup = fio_hdfsio_setup,
	.prep = fio_hdfsio_prep,
	.queue = fio_hdfsio_queue,
	.open_file = fio_hdfsio_open_file,
	.close_file = fio_hdfsio_close_file,
	.flags = FIO_SYNCIO,
};

static void fio_init fio_hdfsio_register(void)
{
	register_ioengine(&ioengine_hdfs);
}

static void fio_exit fio_hdfsio_unregister(void)
{
	unregister_ioengine(&ioengine_hdfs);
}