/*
 * Copyright (C) 2016 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#define DEBUG false
#include "Log.h"

#include "FdBuffer.h"

#include <log/log.h>
#include <utils/SystemClock.h>

#include <fcntl.h>
#include <poll.h>
#include <unistd.h>
#include <wait.h>

namespace android {
namespace os {
namespace incidentd {

const ssize_t BUFFER_SIZE = 16 * 1024;  // 16 KB
const ssize_t MAX_BUFFER_COUNT = 6144;   // 96 MB max

FdBuffer::FdBuffer()
        :mBuffer(new EncodedBuffer(BUFFER_SIZE)),
         mStartTime(-1),
         mFinishTime(-1),
         mTimedOut(false),
         mTruncated(false) {
}

FdBuffer::~FdBuffer() {
}

status_t FdBuffer::read(int fd, int64_t timeout) {
    struct pollfd pfds = {.fd = fd, .events = POLLIN};
    mStartTime = uptimeMillis();

    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);

    while (true) {
        if (mBuffer->size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
            mTruncated = true;
            VLOG("Truncating data");
            break;
        }
        if (mBuffer->writeBuffer() == NULL) {
            VLOG("No memory");
            return NO_MEMORY;
        }

        int64_t remainingTime = (mStartTime + timeout) - uptimeMillis();
        if (remainingTime <= 0) {
            VLOG("timed out due to long read");
            mTimedOut = true;
            break;
        }

        int count = TEMP_FAILURE_RETRY(poll(&pfds, 1, remainingTime));
        if (count == 0) {
            VLOG("timed out due to block calling poll");
            mTimedOut = true;
            break;
        } else if (count < 0) {
            VLOG("poll failed: %s", strerror(errno));
            return -errno;
        } else {
            if ((pfds.revents & POLLERR) != 0) {
                VLOG("return event has error %s", strerror(errno));
                return errno != 0 ? -errno : UNKNOWN_ERROR;
            } else {
                ssize_t amt = TEMP_FAILURE_RETRY(
                        ::read(fd, mBuffer->writeBuffer(), mBuffer->currentToWrite()));
                if (amt < 0) {
                    if (errno == EAGAIN || errno == EWOULDBLOCK) {
                        continue;
                    } else {
                        VLOG("Fail to read %d: %s", fd, strerror(errno));
                        return -errno;
                    }
                } else if (amt == 0) {
                    VLOG("Reached EOF of fd=%d", fd);
                    break;
                }
                mBuffer->wp()->move(amt);
            }
        }
    }
    mFinishTime = uptimeMillis();
    return NO_ERROR;
}

status_t FdBuffer::readFully(int fd) {
    mStartTime = uptimeMillis();

    while (true) {
        if (mBuffer->size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
            // Don't let it get too big.
            mTruncated = true;
            VLOG("Truncating data");
            break;
        }
        if (mBuffer->writeBuffer() == NULL) {
            VLOG("No memory");
            return NO_MEMORY;
        }

        ssize_t amt =
                TEMP_FAILURE_RETRY(::read(fd, mBuffer->writeBuffer(), mBuffer->currentToWrite()));
        if (amt < 0) {
            VLOG("Fail to read %d: %s", fd, strerror(errno));
            return -errno;
        } else if (amt == 0) {
            VLOG("Done reading %zu bytes", mBuffer->size());
            // We're done.
            break;
        }
        mBuffer->wp()->move(amt);
    }

    mFinishTime = uptimeMillis();
    return NO_ERROR;
}

status_t FdBuffer::readProcessedDataInStream(int fd, unique_fd toFd, unique_fd fromFd,
                                             int64_t timeoutMs, const bool isSysfs) {
    struct pollfd pfds[] = {
            {.fd = fd, .events = POLLIN},
            {.fd = toFd.get(), .events = POLLOUT},
            {.fd = fromFd.get(), .events = POLLIN},
    };

    mStartTime = uptimeMillis();

    // mark all fds non blocking
    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
    fcntl(toFd.get(), F_SETFL, fcntl(toFd.get(), F_GETFL, 0) | O_NONBLOCK);
    fcntl(fromFd.get(), F_SETFL, fcntl(fromFd.get(), F_GETFL, 0) | O_NONBLOCK);

    // A circular buffer holds data read from fd and writes to parsing process
    uint8_t cirBuf[BUFFER_SIZE];
    size_t cirSize = 0;
    int rpos = 0, wpos = 0;

    // This is the buffer used to store processed data
    while (true) {
        if (mBuffer->size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
            VLOG("Truncating data");
            mTruncated = true;
            break;
        }
        if (mBuffer->writeBuffer() == NULL) {
            VLOG("No memory");
            return NO_MEMORY;
        }

        int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis();
        if (remainingTime <= 0) {
            VLOG("timed out due to long read");
            mTimedOut = true;
            break;
        }

        // wait for any pfds to be ready to perform IO
        int count = TEMP_FAILURE_RETRY(poll(pfds, 3, remainingTime));
        if (count == 0) {
            VLOG("timed out due to block calling poll");
            mTimedOut = true;
            break;
        } else if (count < 0) {
            VLOG("Fail to poll: %s", strerror(errno));
            return -errno;
        }

        // make sure no errors occur on any fds
        for (int i = 0; i < 3; ++i) {
            if ((pfds[i].revents & POLLERR) != 0) {
                if (i == 0 && isSysfs) {
                    VLOG("fd %d is sysfs, ignore its POLLERR return value", fd);
                    continue;
                }
                VLOG("fd[%d]=%d returns error events: %s", i, fd, strerror(errno));
                return errno != 0 ? -errno : UNKNOWN_ERROR;
            }
        }

        // read from fd
        if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) {
            ssize_t amt;
            if (rpos >= wpos) {
                amt = TEMP_FAILURE_RETRY(::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos));
            } else {
                amt = TEMP_FAILURE_RETRY(::read(fd, cirBuf + rpos, wpos - rpos));
            }
            if (amt < 0) {
                if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
                    VLOG("Fail to read fd %d: %s", fd, strerror(errno));
                    return -errno;
                }  // otherwise just continue
            } else if (amt == 0) {
                VLOG("Reached EOF of input file %d", fd);
                pfds[0].fd = -1;  // reach EOF so don't have to poll pfds[0].
            } else {
                rpos += amt;
                cirSize += amt;
            }
        }

        // write to parsing process
        if (cirSize > 0 && pfds[1].fd != -1) {
            ssize_t amt;
            if (rpos > wpos) {
                amt = TEMP_FAILURE_RETRY(::write(toFd.get(), cirBuf + wpos, rpos - wpos));
            } else {
                amt = TEMP_FAILURE_RETRY(::write(toFd.get(), cirBuf + wpos, BUFFER_SIZE - wpos));
            }
            if (amt < 0) {
                if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
                    VLOG("Fail to write toFd %d: %s", toFd.get(), strerror(errno));
                    return -errno;
                }  // otherwise just continue
            } else {
                wpos += amt;
                cirSize -= amt;
            }
        }

        // if buffer is empty and fd is closed, close write fd.
        if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) {
            VLOG("Close write pipe %d", toFd.get());
            toFd.reset();
            pfds[1].fd = -1;
        }

        // circular buffer, reset rpos and wpos
        if (rpos >= BUFFER_SIZE) {
            rpos = 0;
        }
        if (wpos >= BUFFER_SIZE) {
            wpos = 0;
        }

        // read from parsing process
        ssize_t amt = TEMP_FAILURE_RETRY(
                ::read(fromFd.get(), mBuffer->writeBuffer(), mBuffer->currentToWrite()));
        if (amt < 0) {
            if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
                VLOG("Fail to read fromFd %d: %s", fromFd.get(), strerror(errno));
                return -errno;
            }  // otherwise just continue
        } else if (amt == 0) {
            VLOG("Reached EOF of fromFd %d", fromFd.get());
            break;
        } else {
            mBuffer->wp()->move(amt);
        }
    }

    mFinishTime = uptimeMillis();
    return NO_ERROR;
}

status_t FdBuffer::write(uint8_t const* buf, size_t size) {
    return mBuffer->writeRaw(buf, size);
}

status_t FdBuffer::write(const sp<ProtoReader>& reader) {
    return mBuffer->writeRaw(reader);
}

status_t FdBuffer::write(const sp<ProtoReader>& reader, size_t size) {
    return mBuffer->writeRaw(reader, size);
}

size_t FdBuffer::size() const {
    return mBuffer->size();
}

sp<EncodedBuffer> FdBuffer::data() const {
    return mBuffer;
}

}  // namespace incidentd
}  // namespace os
}  // namespace android