/*
* Copyright (C) 2011 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define LOG_TAG "common_time"
#include <utils/Log.h>
#include <fcntl.h>
#include <linux/in.h>
#include <linux/tcp.h>
#include <poll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <utils/Errors.h>
#include <utils/misc.h>
#include <common_time/local_clock.h>
#include "common_clock.h"
#include "diag_thread.h"
#define kMaxEvents 16
#define kListenPort 9876
static bool setNonblocking(int fd) {
int flags = fcntl(fd, F_GETFL);
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
ALOGE("Failed to set socket (%d) to non-blocking mode (errno %d)",
fd, errno);
return false;
}
return true;
}
static bool setNodelay(int fd) {
int tmp = 1;
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &tmp, sizeof(tmp)) < 0) {
ALOGE("Failed to set socket (%d) to no-delay mode (errno %d)",
fd, errno);
return false;
}
return true;
}
namespace android {
DiagThread::DiagThread(CommonClock* common_clock, LocalClock* local_clock) {
common_clock_ = common_clock;
local_clock_ = local_clock;
listen_fd_ = -1;
data_fd_ = -1;
kernel_logID_basis_known_ = false;
discipline_log_ID_ = 0;
}
DiagThread::~DiagThread() {
}
status_t DiagThread::startWorkThread() {
status_t res;
stopWorkThread();
res = run("Diag");
if (res != OK)
ALOGE("Failed to start work thread (res = %d)", res);
return res;
}
void DiagThread::stopWorkThread() {
status_t res;
res = requestExitAndWait(); // block until thread exit.
if (res != OK)
ALOGE("Failed to stop work thread (res = %d)", res);
}
bool DiagThread::openListenSocket() {
bool ret = false;
int flags;
cleanupListenSocket();
if ((listen_fd_ = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
ALOGE("Socket failed.");
goto bailout;
}
// Set non-blocking operation
if (!setNonblocking(listen_fd_))
goto bailout;
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(kListenPort);
if (bind(listen_fd_, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
ALOGE("Bind failed.");
goto bailout;
}
if (listen(listen_fd_, 1) < 0) {
ALOGE("Listen failed.");
goto bailout;
}
ret = true;
bailout:
if (!ret)
cleanupListenSocket();
return ret;
}
void DiagThread::cleanupListenSocket() {
if (listen_fd_ >= 0) {
int res;
struct linger l;
l.l_onoff = 1;
l.l_linger = 0;
setsockopt(listen_fd_, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
shutdown(listen_fd_, SHUT_RDWR);
close(listen_fd_);
listen_fd_ = -1;
}
}
void DiagThread::cleanupDataSocket() {
if (data_fd_ >= 0) {
int res;
struct linger l;
l.l_onoff = 1;
l.l_linger = 0;
setsockopt(data_fd_, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
shutdown(data_fd_, SHUT_RDWR);
close(data_fd_);
data_fd_ = -1;
}
}
void DiagThread::resetLogIDs() {
// Drain and discard all of the events from the kernel
struct local_time_debug_event events[kMaxEvents];
while(local_clock_->getDebugLog(events, kMaxEvents) > 0)
;
{
Mutex::Autolock lock(&discipline_log_lock_);
discipline_log_.clear();
discipline_log_ID_ = 0;
}
kernel_logID_basis_known_ = false;
}
void DiagThread::pushDisciplineEvent(int64_t observed_local_time,
int64_t observed_common_time,
int64_t nominal_common_time,
int32_t total_correction,
int32_t rtt) {
Mutex::Autolock lock(&discipline_log_lock_);
DisciplineEventRecord evt;
evt.event_id = discipline_log_ID_++;
evt.action_local_time = local_clock_->getLocalTime();
common_clock_->localToCommon(evt.action_local_time,
&evt.action_common_time);
evt.observed_local_time = observed_local_time;
evt.observed_common_time = observed_common_time;
evt.nominal_common_time = nominal_common_time;
evt.total_correction = total_correction;
evt.rtt = rtt;
discipline_log_.push_back(evt);
while (discipline_log_.size() > kMaxDisciplineLogSize)
discipline_log_.erase(discipline_log_.begin());
}
bool DiagThread::threadLoop() {
struct pollfd poll_fds[1];
if (!openListenSocket()) {
ALOGE("Failed to open listen socket");
goto bailout;
}
while (!exitPending()) {
memset(&poll_fds, 0, sizeof(poll_fds));
if (data_fd_ < 0) {
poll_fds[0].fd = listen_fd_;
poll_fds[0].events = POLLIN;
} else {
poll_fds[0].fd = data_fd_;
poll_fds[0].events = POLLRDHUP | POLLIN;
}
int poll_res = poll(poll_fds, NELEM(poll_fds), 50);
if (poll_res < 0) {
ALOGE("Fatal error (%d,%d) while waiting on events",
poll_res, errno);
goto bailout;
}
if (exitPending())
break;
if (poll_fds[0].revents) {
if (poll_fds[0].fd == listen_fd_) {
data_fd_ = accept(listen_fd_, NULL, NULL);
if (data_fd_ < 0) {
ALOGW("Failed accept on socket %d with err %d",
listen_fd_, errno);
} else {
if (!setNonblocking(data_fd_))
cleanupDataSocket();
if (!setNodelay(data_fd_))
cleanupDataSocket();
}
} else
if (poll_fds[0].fd == data_fd_) {
if (poll_fds[0].revents & POLLRDHUP) {
// Connection hung up; time to clean up.
cleanupDataSocket();
} else
if (poll_fds[0].revents & POLLIN) {
uint8_t cmd;
if (read(data_fd_, &cmd, sizeof(cmd)) > 0) {
switch(cmd) {
case 'r':
case 'R':
resetLogIDs();
break;
}
}
}
}
}
struct local_time_debug_event events[kMaxEvents];
int amt = local_clock_->getDebugLog(events, kMaxEvents);
if (amt > 0) {
for (int i = 0; i < amt; i++) {
struct local_time_debug_event& e = events[i];
if (!kernel_logID_basis_known_) {
kernel_logID_basis_ = e.local_timesync_event_id;
kernel_logID_basis_known_ = true;
}
char buf[1024];
int64_t common_time;
status_t res = common_clock_->localToCommon(e.local_time,
&common_time);
snprintf(buf, sizeof(buf), "E,%lld,%lld,%lld,%d\n",
e.local_timesync_event_id - kernel_logID_basis_,
e.local_time,
common_time,
(OK == res) ? 1 : 0);
buf[sizeof(buf) - 1] = 0;
if (data_fd_ >= 0)
write(data_fd_, buf, strlen(buf));
}
}
{ // scope for autolock pattern
Mutex::Autolock lock(&discipline_log_lock_);
while (discipline_log_.size() > 0) {
char buf[1024];
DisciplineEventRecord& e = *discipline_log_.begin();
snprintf(buf, sizeof(buf),
"D,%lld,%lld,%lld,%lld,%lld,%lld,%d,%d\n",
e.event_id,
e.action_local_time,
e.action_common_time,
e.observed_local_time,
e.observed_common_time,
e.nominal_common_time,
e.total_correction,
e.rtt);
buf[sizeof(buf) - 1] = 0;
if (data_fd_ >= 0)
write(data_fd_, buf, strlen(buf));
discipline_log_.erase(discipline_log_.begin());
}
}
}
bailout:
cleanupDataSocket();
cleanupListenSocket();
return false;
}
} // namespace android