/*-------------------------------------------------------------------------
 * drawElements Quality Program Execution Server
 * ---------------------------------------------
 *
 * Copyright 2014 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 *//*!
 * \file
 * \brief Test Execution Server.
 *//*--------------------------------------------------------------------*/

#include "xsExecutionServer.hpp"
#include "deClock.h"

#include <cstdio>

using std::vector;
using std::string;

#if 1
#	define DBG_PRINT(X) printf X
#else
#	define DBG_PRINT(X)
#endif

namespace xs
{

inline bool MessageBuilder::isComplete (void) const
{
	if (m_buffer.size() < MESSAGE_HEADER_SIZE)
		return false;
	else
		return m_buffer.size() == getMessageSize();
}

const deUint8* MessageBuilder::getMessageData (void) const
{
	return m_buffer.size() > MESSAGE_HEADER_SIZE ? &m_buffer[MESSAGE_HEADER_SIZE] : DE_NULL;
}

size_t MessageBuilder::getMessageDataSize (void) const
{
	DE_ASSERT(isComplete());
	return m_buffer.size() - MESSAGE_HEADER_SIZE;
}

void MessageBuilder::read (ByteBuffer& src)
{
	// Try to get header.
	if (m_buffer.size() < MESSAGE_HEADER_SIZE)
	{
		while (m_buffer.size() < MESSAGE_HEADER_SIZE &&
			   src.getNumElements() > 0)
			m_buffer.push_back(src.popBack());

		DE_ASSERT(m_buffer.size() <= MESSAGE_HEADER_SIZE);

		if (m_buffer.size() == MESSAGE_HEADER_SIZE)
		{
			// Got whole header, parse it.
			Message::parseHeader(&m_buffer[0], (int)m_buffer.size(), m_messageType, m_messageSize);
		}
	}

	if (m_buffer.size() >= MESSAGE_HEADER_SIZE)
	{
		// We have header.
		size_t msgSize			= getMessageSize();
		size_t numBytesLeft		= msgSize - m_buffer.size();
		size_t numToRead		= (size_t)de::min(src.getNumElements(), (int)numBytesLeft);

		if (numToRead > 0)
		{
			int curBufPos = (int)m_buffer.size();
			m_buffer.resize(curBufPos+numToRead);
			src.popBack(&m_buffer[curBufPos], (int)numToRead);
		}
	}
}

void MessageBuilder::clear (void)
{
	m_buffer.clear();
	m_messageType	= MESSAGETYPE_NONE;
	m_messageSize	= 0;
}

ExecutionServer::ExecutionServer (xs::TestProcess* testProcess, deSocketFamily family, int port, RunMode runMode)
	: TcpServer		(family, port)
	, m_testDriver	(testProcess)
	, m_runMode		(runMode)
{
}

ExecutionServer::~ExecutionServer (void)
{
}

TestDriver* ExecutionServer::acquireTestDriver (void)
{
	if (!m_testDriverLock.tryLock())
		throw Error("Failed to acquire test driver");

	return &m_testDriver;
}

void ExecutionServer::releaseTestDriver (TestDriver* driver)
{
	DE_ASSERT(&m_testDriver == driver);
	DE_UNREF(driver);
	m_testDriverLock.unlock();
}

ConnectionHandler* ExecutionServer::createHandler (de::Socket* socket, const de::SocketAddress& clientAddress)
{
	printf("ExecutionServer: New connection from %s:%d\n", clientAddress.getHost(), clientAddress.getPort());
	return new ExecutionRequestHandler(this, socket);
}

void ExecutionServer::connectionDone (ConnectionHandler* handler)
{
	if (m_runMode == RUNMODE_SINGLE_EXEC)
		m_socket.close();

	TcpServer::connectionDone(handler);
}

ExecutionRequestHandler::ExecutionRequestHandler (ExecutionServer* server, de::Socket* socket)
	: ConnectionHandler	(server, socket)
	, m_execServer		(server)
	, m_testDriver		(DE_NULL)
	, m_bufferIn		(RECV_BUFFER_SIZE)
	, m_bufferOut		(SEND_BUFFER_SIZE)
	, m_run				(false)
	, m_sendRecvTmpBuf	(SEND_RECV_TMP_BUFFER_SIZE)
{
	// Set flags.
	m_socket->setFlags(DE_SOCKET_NONBLOCKING|DE_SOCKET_KEEPALIVE|DE_SOCKET_CLOSE_ON_EXEC);

	// Init protocol keepalives.
	initKeepAlives();
}

ExecutionRequestHandler::~ExecutionRequestHandler (void)
{
	if (m_testDriver)
		m_execServer->releaseTestDriver(m_testDriver);
}

void ExecutionRequestHandler::handle (void)
{
	DBG_PRINT(("ExecutionRequestHandler::handle()\n"));

	try
	{
		// Process execution session.
		processSession();
	}
	catch (const std::exception& e)
	{
		printf("ExecutionRequestHandler::run(): %s\n", e.what());
	}

	DBG_PRINT(("ExecutionRequestHandler::handle(): Done!\n"));

	// Release test driver.
	if (m_testDriver)
	{
		try
		{
			m_testDriver->reset();
		}
		catch (...)
		{
		}
		m_execServer->releaseTestDriver(m_testDriver);
		m_testDriver = DE_NULL;
	}

	// Close connection.
	if (m_socket->isConnected())
		m_socket->shutdown();
}

void ExecutionRequestHandler::acquireTestDriver (void)
{
	DE_ASSERT(!m_testDriver);

	// Try to acquire test driver - may fail.
	m_testDriver = m_execServer->acquireTestDriver();
	DE_ASSERT(m_testDriver);
	m_testDriver->reset();

}

void ExecutionRequestHandler::processSession (void)
{
	m_run = true;

	deUint64 lastIoTime = deGetMicroseconds();

	while (m_run)
	{
		bool anyIO = false;

		// Read from socket to buffer.
		anyIO = receive() || anyIO;

		// Send bytes in buffer.
		anyIO = send() || anyIO;

		// Process incoming data.
		if (m_bufferIn.getNumElements() > 0)
		{
			DE_ASSERT(!m_msgBuilder.isComplete());
			m_msgBuilder.read(m_bufferIn);
		}

		if (m_msgBuilder.isComplete())
		{
			// Process message.
			processMessage(m_msgBuilder.getMessageType(), m_msgBuilder.getMessageData(), m_msgBuilder.getMessageDataSize());

			m_msgBuilder.clear();
		}

		// Keepalives, anyone?
		pollKeepAlives();

		// Poll test driver for IO.
		if (m_testDriver)
			anyIO = getTestDriver()->poll(m_bufferOut) || anyIO;

		// If no IO happens in a reasonable amount of time, go to sleep.
		{
			deUint64 curTime = deGetMicroseconds();
			if (anyIO)
				lastIoTime = curTime;
			else if (curTime-lastIoTime > SERVER_IDLE_THRESHOLD*1000)
				deSleep(SERVER_IDLE_SLEEP); // Too long since last IO, sleep for a while.
			else
				deYield(); // Just give other threads chance to run.
		}
	}
}

void ExecutionRequestHandler::processMessage (MessageType type, const deUint8* data, size_t dataSize)
{
	switch (type)
	{
		case MESSAGETYPE_HELLO:
		{
			HelloMessage msg(data, dataSize);
			DBG_PRINT(("HelloMessage: version = %d\n", msg.version));
			if (msg.version != PROTOCOL_VERSION)
				throw ProtocolError("Unsupported protocol version");
			break;
		}

		case MESSAGETYPE_TEST:
		{
			TestMessage msg(data, dataSize);
			DBG_PRINT(("TestMessage: '%s'\n", msg.test.c_str()));
			break;
		}

		case MESSAGETYPE_KEEPALIVE:
		{
			KeepAliveMessage msg(data, dataSize);
			DBG_PRINT(("KeepAliveMessage\n"));
			keepAliveReceived();
			break;
		}

		case MESSAGETYPE_EXECUTE_BINARY:
		{
			ExecuteBinaryMessage msg(data, dataSize);
			DBG_PRINT(("ExecuteBinaryMessage: '%s', '%s', '%s', '%s'\n", msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.substr(0, 10).c_str()));
			getTestDriver()->startProcess(msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.c_str());
			keepAliveReceived(); // \todo [2011-10-11 pyry] Remove this once Candy is fixed.
			break;
		}

		case MESSAGETYPE_STOP_EXECUTION:
		{
			StopExecutionMessage msg(data, dataSize);
			DBG_PRINT(("StopExecutionMessage\n"));
			getTestDriver()->stopProcess();
			break;
		}

		default:
			throw ProtocolError("Unsupported message");
	}
}

void ExecutionRequestHandler::initKeepAlives (void)
{
	deUint64 curTime = deGetMicroseconds();
	m_lastKeepAliveSent		= curTime;
	m_lastKeepAliveReceived	= curTime;
}

void ExecutionRequestHandler::keepAliveReceived (void)
{
	m_lastKeepAliveReceived = deGetMicroseconds();
}

void ExecutionRequestHandler::pollKeepAlives (void)
{
	deUint64 curTime = deGetMicroseconds();

	// Check that we've got keepalives in timely fashion.
	if (curTime - m_lastKeepAliveReceived > KEEPALIVE_TIMEOUT*1000)
		throw ProtocolError("Keepalive timeout occurred");

	// Send some?
	if (curTime - m_lastKeepAliveSent > KEEPALIVE_SEND_INTERVAL*1000 &&
		m_bufferOut.getNumFree() >= MESSAGE_HEADER_SIZE)
	{
		vector<deUint8> buf;
		KeepAliveMessage().write(buf);
		m_bufferOut.pushFront(&buf[0], (int)buf.size());

		m_lastKeepAliveSent = deGetMicroseconds();
	}
}

bool ExecutionRequestHandler::receive (void)
{
	size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferIn.getNumFree());

	if (maxLen > 0)
	{
		size_t			numRecv;
		deSocketResult	result	= m_socket->receive(&m_sendRecvTmpBuf[0], maxLen, &numRecv);

		if (result == DE_SOCKETRESULT_SUCCESS)
		{
			DE_ASSERT(numRecv > 0);
			m_bufferIn.pushFront(&m_sendRecvTmpBuf[0], (int)numRecv);
			return true;
		}
		else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
		{
			m_run = false;
			return true;
		}
		else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
			return false;
		else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
			throw ConnectionError("Connection terminated");
		else
			throw ConnectionError("receive() failed");
	}
	else
		return false;
}

bool ExecutionRequestHandler::send (void)
{
	size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferOut.getNumElements());

	if (maxLen > 0)
	{
		m_bufferOut.peekBack(&m_sendRecvTmpBuf[0], (int)maxLen);

		size_t			numSent;
		deSocketResult	result	= m_socket->send(&m_sendRecvTmpBuf[0], maxLen, &numSent);

		if (result == DE_SOCKETRESULT_SUCCESS)
		{
			DE_ASSERT(numSent > 0);
			m_bufferOut.popBack((int)numSent);
			return true;
		}
		else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
		{
			m_run = false;
			return true;
		}
		else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
			return false;
		else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
			throw ConnectionError("Connection terminated");
		else
			throw ConnectionError("send() failed");
	}
	else
		return false;
}

} // xs