/*
* Copyright (C) 2017 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "chre_host/socket_client.h"
#include <inttypes.h>
#include <string.h>
#include <chrono>
#include <cutils/sockets.h>
#include <utils/RefBase.h>
#include <utils/StrongPointer.h>
#include "chre_host/log.h"
namespace android {
namespace chre {
SocketClient::SocketClient() {
std::atomic_init(&mSockFd, INVALID_SOCKET);
}
SocketClient::~SocketClient() {
disconnect();
}
bool SocketClient::connect(const char *socketName,
const sp<ICallbacks>& callbacks) {
return doConnect(socketName, callbacks, false /* connectInBackground */);
}
bool SocketClient::connectInBackground(const char *socketName,
const sp<ICallbacks>& callbacks) {
return doConnect(socketName, callbacks, true /* connectInBackground */);
}
void SocketClient::disconnect() {
if (inReceiveThread()) {
LOGE("disconnect() can't be called from a receive thread callback");
} else if (receiveThreadRunning()) {
// Inform the RX thread that we're requesting a shutdown, breaking it out of
// the retry wait if it's currently blocked there
{
std::lock_guard<std::mutex> lock(mShutdownMutex);
mGracefulShutdown = true;
}
mShutdownCond.notify_all();
// Invalidate the socket (will kick the RX thread out of recv if it's
// currently blocked there)
if (mSockFd != INVALID_SOCKET && shutdown(mSockFd, SHUT_RDWR) != 0) {
LOG_ERROR("Couldn't shut down socket", errno);
}
if (mRxThread.joinable()) {
LOGD("Waiting for RX thread to exit");
mRxThread.join();
}
}
}
bool SocketClient::isConnected() const {
return (mSockFd != INVALID_SOCKET);
}
bool SocketClient::sendMessage(const void *data, size_t length) {
bool success = false;
if (mSockFd == INVALID_SOCKET) {
LOGW("Tried sending a message, but don't have a valid socket handle");
} else {
ssize_t bytesSent = send(mSockFd, data, length, 0);
if (bytesSent < 0) {
LOGE("Failed to send %zu bytes of data: %s", length, strerror(errno));
} else if (bytesSent == 0) {
LOGW("Failed to send data; remote side disconnected");
} else if (static_cast<size_t>(bytesSent) != length) {
LOGW("Truncated packet, tried sending %zu bytes, only %zd went through",
length, bytesSent);
} else {
success = true;
}
}
return success;
}
bool SocketClient::doConnect(const char *socketName,
const sp<ICallbacks>& callbacks,
bool connectInBackground) {
bool success = false;
if (inReceiveThread()) {
LOGE("Can't attempt to connect from a receive thread callback");
} else {
if (receiveThreadRunning()) {
LOGW("Re-connecting socket with implicit disconnect");
disconnect();
}
size_t socketNameLen = strlcpy(mSocketName, socketName,
sizeof(mSocketName));
if (socketNameLen >= sizeof(mSocketName)) {
LOGE("Socket name length parameter is too long (%zu, max %zu)",
socketNameLen, sizeof(mSocketName));
} else if (callbacks == nullptr) {
LOGE("Callbacks parameter must be provided");
} else if (connectInBackground || tryConnect()) {
mGracefulShutdown = false;
mCallbacks = callbacks;
mRxThread = std::thread([this]() {
receiveThread();
});
success = true;
}
}
return success;
}
bool SocketClient::inReceiveThread() const {
return (std::this_thread::get_id() == mRxThread.get_id());
}
void SocketClient::receiveThread() {
constexpr size_t kReceiveBufferSize = 4096;
uint8_t buffer[kReceiveBufferSize];
LOGV("Receive thread started");
while (!mGracefulShutdown && (mSockFd != INVALID_SOCKET || reconnect())) {
while (!mGracefulShutdown) {
ssize_t bytesReceived = recv(mSockFd, buffer, sizeof(buffer), 0);
if (bytesReceived < 0) {
LOG_ERROR("Exiting RX thread", errno);
break;
} else if (bytesReceived == 0) {
if (!mGracefulShutdown) {
LOGI("Socket disconnected on remote end");
mCallbacks->onDisconnected();
}
break;
}
mCallbacks->onMessageReceived(buffer, bytesReceived);
}
if (close(mSockFd) != 0) {
LOG_ERROR("Couldn't close socket", errno);
}
mSockFd = INVALID_SOCKET;
}
if (!mGracefulShutdown) {
mCallbacks->onConnectionAborted();
}
mCallbacks.clear();
LOGV("Exiting receive thread");
}
bool SocketClient::receiveThreadRunning() const {
return mRxThread.joinable();
}
bool SocketClient::reconnect() {
auto delay = std::chrono::duration<int32_t, std::milli>(500);
constexpr auto kMaxDelay = std::chrono::minutes(5);
int retryLimit = 40; // ~2.5 hours total
while (--retryLimit > 0) {
{
std::unique_lock<std::mutex> lock(mShutdownMutex);
mShutdownCond.wait_for(lock, delay,
[this]() { return mGracefulShutdown.load(); });
if (mGracefulShutdown) {
break;
}
}
if (!tryConnect()) {
LOGW("Failed to (re)connect, next try in %" PRId32 " ms", delay.count());
delay *= 2;
if (delay > kMaxDelay) {
delay = kMaxDelay;
}
} else {
LOGD("Successfully (re)connected");
mCallbacks->onConnected();
return true;
}
}
return false;
}
bool SocketClient::tryConnect() {
errno = 0;
mSockFd = socket_local_client(mSocketName,
ANDROID_SOCKET_NAMESPACE_RESERVED,
SOCK_SEQPACKET);
if (mSockFd == INVALID_SOCKET) {
LOGE("Couldn't create/connect client socket to '%s': %s",
mSocketName, strerror(errno));
}
return (mSockFd != INVALID_SOCKET);
}
} // namespace chre
} // namespace android