/*------------------------------------------------------------------------- * drawElements Quality Program Execution Server * --------------------------------------------- * * Copyright 2014 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * *//*! * \file * \brief Test Execution Server. *//*--------------------------------------------------------------------*/ #include "xsExecutionServer.hpp" #include "deClock.h" #include <cstdio> using std::vector; using std::string; #if 1 # define DBG_PRINT(X) printf X #else # define DBG_PRINT(X) #endif namespace xs { inline bool MessageBuilder::isComplete (void) const { if (m_buffer.size() < MESSAGE_HEADER_SIZE) return false; else return m_buffer.size() == getMessageSize(); } const deUint8* MessageBuilder::getMessageData (void) const { return m_buffer.size() > MESSAGE_HEADER_SIZE ? &m_buffer[MESSAGE_HEADER_SIZE] : DE_NULL; } size_t MessageBuilder::getMessageDataSize (void) const { DE_ASSERT(isComplete()); return m_buffer.size() - MESSAGE_HEADER_SIZE; } void MessageBuilder::read (ByteBuffer& src) { // Try to get header. if (m_buffer.size() < MESSAGE_HEADER_SIZE) { while (m_buffer.size() < MESSAGE_HEADER_SIZE && src.getNumElements() > 0) m_buffer.push_back(src.popBack()); DE_ASSERT(m_buffer.size() <= MESSAGE_HEADER_SIZE); if (m_buffer.size() == MESSAGE_HEADER_SIZE) { // Got whole header, parse it. Message::parseHeader(&m_buffer[0], (int)m_buffer.size(), m_messageType, m_messageSize); } } if (m_buffer.size() >= MESSAGE_HEADER_SIZE) { // We have header. size_t msgSize = getMessageSize(); size_t numBytesLeft = msgSize - m_buffer.size(); size_t numToRead = (size_t)de::min(src.getNumElements(), (int)numBytesLeft); if (numToRead > 0) { int curBufPos = (int)m_buffer.size(); m_buffer.resize(curBufPos+numToRead); src.popBack(&m_buffer[curBufPos], (int)numToRead); } } } void MessageBuilder::clear (void) { m_buffer.clear(); m_messageType = MESSAGETYPE_NONE; m_messageSize = 0; } ExecutionServer::ExecutionServer (xs::TestProcess* testProcess, deSocketFamily family, int port, RunMode runMode) : TcpServer (family, port) , m_testDriver (testProcess) , m_runMode (runMode) { } ExecutionServer::~ExecutionServer (void) { } TestDriver* ExecutionServer::acquireTestDriver (void) { if (!m_testDriverLock.tryLock()) throw Error("Failed to acquire test driver"); return &m_testDriver; } void ExecutionServer::releaseTestDriver (TestDriver* driver) { DE_ASSERT(&m_testDriver == driver); DE_UNREF(driver); m_testDriverLock.unlock(); } ConnectionHandler* ExecutionServer::createHandler (de::Socket* socket, const de::SocketAddress& clientAddress) { printf("ExecutionServer: New connection from %s:%d\n", clientAddress.getHost(), clientAddress.getPort()); return new ExecutionRequestHandler(this, socket); } void ExecutionServer::connectionDone (ConnectionHandler* handler) { if (m_runMode == RUNMODE_SINGLE_EXEC) m_socket.close(); TcpServer::connectionDone(handler); } ExecutionRequestHandler::ExecutionRequestHandler (ExecutionServer* server, de::Socket* socket) : ConnectionHandler (server, socket) , m_execServer (server) , m_testDriver (DE_NULL) , m_bufferIn (RECV_BUFFER_SIZE) , m_bufferOut (SEND_BUFFER_SIZE) , m_run (false) , m_sendRecvTmpBuf (SEND_RECV_TMP_BUFFER_SIZE) { // Set flags. m_socket->setFlags(DE_SOCKET_NONBLOCKING|DE_SOCKET_KEEPALIVE|DE_SOCKET_CLOSE_ON_EXEC); // Init protocol keepalives. initKeepAlives(); } ExecutionRequestHandler::~ExecutionRequestHandler (void) { if (m_testDriver) m_execServer->releaseTestDriver(m_testDriver); } void ExecutionRequestHandler::handle (void) { DBG_PRINT(("ExecutionRequestHandler::handle()\n")); try { // Process execution session. processSession(); } catch (const std::exception& e) { printf("ExecutionRequestHandler::run(): %s\n", e.what()); } DBG_PRINT(("ExecutionRequestHandler::handle(): Done!\n")); // Release test driver. if (m_testDriver) { try { m_testDriver->reset(); } catch (...) { } m_execServer->releaseTestDriver(m_testDriver); m_testDriver = DE_NULL; } // Close connection. if (m_socket->isConnected()) m_socket->shutdown(); } void ExecutionRequestHandler::acquireTestDriver (void) { DE_ASSERT(!m_testDriver); // Try to acquire test driver - may fail. m_testDriver = m_execServer->acquireTestDriver(); DE_ASSERT(m_testDriver); m_testDriver->reset(); } void ExecutionRequestHandler::processSession (void) { m_run = true; deUint64 lastIoTime = deGetMicroseconds(); while (m_run) { bool anyIO = false; // Read from socket to buffer. anyIO = receive() || anyIO; // Send bytes in buffer. anyIO = send() || anyIO; // Process incoming data. if (m_bufferIn.getNumElements() > 0) { DE_ASSERT(!m_msgBuilder.isComplete()); m_msgBuilder.read(m_bufferIn); } if (m_msgBuilder.isComplete()) { // Process message. processMessage(m_msgBuilder.getMessageType(), m_msgBuilder.getMessageData(), m_msgBuilder.getMessageDataSize()); m_msgBuilder.clear(); } // Keepalives, anyone? pollKeepAlives(); // Poll test driver for IO. if (m_testDriver) anyIO = getTestDriver()->poll(m_bufferOut) || anyIO; // If no IO happens in a reasonable amount of time, go to sleep. { deUint64 curTime = deGetMicroseconds(); if (anyIO) lastIoTime = curTime; else if (curTime-lastIoTime > SERVER_IDLE_THRESHOLD*1000) deSleep(SERVER_IDLE_SLEEP); // Too long since last IO, sleep for a while. else deYield(); // Just give other threads chance to run. } } } void ExecutionRequestHandler::processMessage (MessageType type, const deUint8* data, size_t dataSize) { switch (type) { case MESSAGETYPE_HELLO: { HelloMessage msg(data, dataSize); DBG_PRINT(("HelloMessage: version = %d\n", msg.version)); if (msg.version != PROTOCOL_VERSION) throw ProtocolError("Unsupported protocol version"); break; } case MESSAGETYPE_TEST: { TestMessage msg(data, dataSize); DBG_PRINT(("TestMessage: '%s'\n", msg.test.c_str())); break; } case MESSAGETYPE_KEEPALIVE: { KeepAliveMessage msg(data, dataSize); DBG_PRINT(("KeepAliveMessage\n")); keepAliveReceived(); break; } case MESSAGETYPE_EXECUTE_BINARY: { ExecuteBinaryMessage msg(data, dataSize); DBG_PRINT(("ExecuteBinaryMessage: '%s', '%s', '%s', '%s'\n", msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.substr(0, 10).c_str())); getTestDriver()->startProcess(msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.c_str()); keepAliveReceived(); // \todo [2011-10-11 pyry] Remove this once Candy is fixed. break; } case MESSAGETYPE_STOP_EXECUTION: { StopExecutionMessage msg(data, dataSize); DBG_PRINT(("StopExecutionMessage\n")); getTestDriver()->stopProcess(); break; } default: throw ProtocolError("Unsupported message"); } } void ExecutionRequestHandler::initKeepAlives (void) { deUint64 curTime = deGetMicroseconds(); m_lastKeepAliveSent = curTime; m_lastKeepAliveReceived = curTime; } void ExecutionRequestHandler::keepAliveReceived (void) { m_lastKeepAliveReceived = deGetMicroseconds(); } void ExecutionRequestHandler::pollKeepAlives (void) { deUint64 curTime = deGetMicroseconds(); // Check that we've got keepalives in timely fashion. if (curTime - m_lastKeepAliveReceived > KEEPALIVE_TIMEOUT*1000) throw ProtocolError("Keepalive timeout occurred"); // Send some? if (curTime - m_lastKeepAliveSent > KEEPALIVE_SEND_INTERVAL*1000 && m_bufferOut.getNumFree() >= MESSAGE_HEADER_SIZE) { vector<deUint8> buf; KeepAliveMessage().write(buf); m_bufferOut.pushFront(&buf[0], (int)buf.size()); m_lastKeepAliveSent = deGetMicroseconds(); } } bool ExecutionRequestHandler::receive (void) { size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferIn.getNumFree()); if (maxLen > 0) { size_t numRecv; deSocketResult result = m_socket->receive(&m_sendRecvTmpBuf[0], maxLen, &numRecv); if (result == DE_SOCKETRESULT_SUCCESS) { DE_ASSERT(numRecv > 0); m_bufferIn.pushFront(&m_sendRecvTmpBuf[0], (int)numRecv); return true; } else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED) { m_run = false; return true; } else if (result == DE_SOCKETRESULT_WOULD_BLOCK) return false; else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED) throw ConnectionError("Connection terminated"); else throw ConnectionError("receive() failed"); } else return false; } bool ExecutionRequestHandler::send (void) { size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferOut.getNumElements()); if (maxLen > 0) { m_bufferOut.peekBack(&m_sendRecvTmpBuf[0], (int)maxLen); size_t numSent; deSocketResult result = m_socket->send(&m_sendRecvTmpBuf[0], maxLen, &numSent); if (result == DE_SOCKETRESULT_SUCCESS) { DE_ASSERT(numSent > 0); m_bufferOut.popBack((int)numSent); return true; } else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED) { m_run = false; return true; } else if (result == DE_SOCKETRESULT_WOULD_BLOCK) return false; else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED) throw ConnectionError("Connection terminated"); else throw ConnectionError("send() failed"); } else return false; } } // xs