/*
* Copyright (c) 2011 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "data_log.h"
#include <assert.h>
#include <algorithm>
#include <list>
#include "critical_section_wrapper.h"
#include "event_wrapper.h"
#include "file_wrapper.h"
#include "rw_lock_wrapper.h"
#include "thread_wrapper.h"
namespace webrtc {
DataLogImpl::CritSectScopedPtr DataLogImpl::crit_sect_(
CriticalSectionWrapper::CreateCriticalSection());
DataLogImpl* DataLogImpl::instance_ = NULL;
// A Row contains cells, which are indexed by the column names as std::string.
// The string index is treated in a case sensitive way.
class Row {
public:
Row();
~Row();
// Inserts a Container into the cell of the column specified with
// column_name.
// column_name is treated in a case sensitive way.
int InsertCell(const std::string& column_name,
const Container* value_container);
// Converts the value at the column specified by column_name to a string
// stored in value_string.
// column_name is treated in a case sensitive way.
void ToString(const std::string& column_name, std::string* value_string);
private:
// Collection of containers indexed by column name as std::string
typedef std::map<std::string, const Container*> CellMap;
CellMap cells_;
CriticalSectionWrapper* cells_lock_;
};
// A LogTable contains multiple rows, where only the latest row is active for
// editing. The rows are defined by the ColumnMap, which contains the name of
// each column and the length of the column (1 for one-value-columns and greater
// than 1 for multi-value-columns).
class LogTable {
public:
LogTable();
~LogTable();
// Adds the column with name column_name to the table. The column will be a
// multi-value-column if multi_value_length is greater than 1.
// column_name is treated in a case sensitive way.
int AddColumn(const std::string& column_name, int multi_value_length);
// Buffers the current row while it is waiting to be written to file,
// which is done by a call to Flush(). A new row is available when the
// function returns
void NextRow();
// Inserts a Container into the cell of the column specified with
// column_name.
// column_name is treated in a case sensitive way.
int InsertCell(const std::string& column_name,
const Container* value_container);
// Creates a log file, named as specified in the string file_name, to
// where the table will be written when calling Flush().
int CreateLogFile(const std::string& file_name);
// Write all complete rows to file.
// May not be called by two threads simultaneously (doing so may result in
// a race condition). Will be called by the file_writer_thread_ when that
// thread is running.
void Flush();
private:
// Collection of multi_value_lengths indexed by column name as std::string
typedef std::map<std::string, int> ColumnMap;
typedef std::list<Row*> RowList;
ColumnMap columns_;
RowList rows_[2];
RowList* rows_history_;
RowList* rows_flush_;
Row* current_row_;
FileWrapper* file_;
bool write_header_;
CriticalSectionWrapper* table_lock_;
};
Row::Row()
: cells_(),
cells_lock_(CriticalSectionWrapper::CreateCriticalSection()) {
}
Row::~Row() {
for (CellMap::iterator it = cells_.begin(); it != cells_.end();) {
delete it->second;
// For maps all iterators (except the erased) are valid after an erase
cells_.erase(it++);
}
delete cells_lock_;
}
int Row::InsertCell(const std::string& column_name,
const Container* value_container) {
CriticalSectionScoped synchronize(cells_lock_);
assert(cells_.count(column_name) == 0);
if (cells_.count(column_name) > 0)
return -1;
cells_[column_name] = value_container;
return 0;
}
void Row::ToString(const std::string& column_name,
std::string* value_string) {
CriticalSectionScoped synchronize(cells_lock_);
const Container* container = cells_[column_name];
if (container == NULL) {
*value_string = "NaN,";
return;
}
container->ToString(value_string);
}
LogTable::LogTable()
: columns_(),
rows_(),
rows_history_(&rows_[0]),
rows_flush_(&rows_[1]),
current_row_(new Row),
file_(FileWrapper::Create()),
write_header_(true),
table_lock_(CriticalSectionWrapper::CreateCriticalSection()) {
}
LogTable::~LogTable() {
for (RowList::iterator row_it = rows_history_->begin();
row_it != rows_history_->end();) {
delete *row_it;
row_it = rows_history_->erase(row_it);
}
for (ColumnMap::iterator col_it = columns_.begin();
col_it != columns_.end();) {
// For maps all iterators (except the erased) are valid after an erase
columns_.erase(col_it++);
}
if (file_ != NULL) {
file_->Flush();
file_->CloseFile();
delete file_;
}
delete current_row_;
delete table_lock_;
}
int LogTable::AddColumn(const std::string& column_name,
int multi_value_length) {
assert(multi_value_length > 0);
if (!write_header_) {
// It's not allowed to add new columns after the header
// has been written.
assert(false);
return -1;
} else {
CriticalSectionScoped synchronize(table_lock_);
if (write_header_)
columns_[column_name] = multi_value_length;
else
return -1;
}
return 0;
}
void LogTable::NextRow() {
CriticalSectionScoped sync_rows(table_lock_);
rows_history_->push_back(current_row_);
current_row_ = new Row;
}
int LogTable::InsertCell(const std::string& column_name,
const Container* value_container) {
CriticalSectionScoped synchronize(table_lock_);
assert(columns_.count(column_name) > 0);
if (columns_.count(column_name) == 0)
return -1;
return current_row_->InsertCell(column_name, value_container);
}
int LogTable::CreateLogFile(const std::string& file_name) {
if (file_name.length() == 0)
return -1;
if (file_->Open())
return -1;
file_->OpenFile(file_name.c_str(),
false, // Open with read/write permissions
false, // Don't wraparound and write at the beginning when
// the file is full
true); // Open as a text file
if (file_ == NULL)
return -1;
return 0;
}
void LogTable::Flush() {
ColumnMap::iterator column_it;
bool commit_header = false;
if (write_header_) {
CriticalSectionScoped synchronize(table_lock_);
if (write_header_) {
commit_header = true;
write_header_ = false;
}
}
if (commit_header) {
for (column_it = columns_.begin();
column_it != columns_.end(); ++column_it) {
if (column_it->second > 1) {
file_->WriteText("%s[%u],", column_it->first.c_str(),
column_it->second);
for (int i = 1; i < column_it->second; ++i)
file_->WriteText(",");
} else {
file_->WriteText("%s,", column_it->first.c_str());
}
}
if (columns_.size() > 0)
file_->WriteText("\n");
}
// Swap the list used for flushing with the list containing the row history
// and clear the history. We also create a local pointer to the new
// list used for flushing to avoid race conditions if another thread
// calls this function while we are writing.
// We don't want to block the list while we're writing to file.
{
CriticalSectionScoped synchronize(table_lock_);
RowList* tmp = rows_flush_;
rows_flush_ = rows_history_;
rows_history_ = tmp;
rows_history_->clear();
}
// Write all complete rows to file and delete them
for (RowList::iterator row_it = rows_flush_->begin();
row_it != rows_flush_->end();) {
for (column_it = columns_.begin();
column_it != columns_.end(); ++column_it) {
std::string row_string;
(*row_it)->ToString(column_it->first, &row_string);
file_->WriteText("%s", row_string.c_str());
}
if (columns_.size() > 0)
file_->WriteText("\n");
delete *row_it;
row_it = rows_flush_->erase(row_it);
}
}
int DataLog::CreateLog() {
return DataLogImpl::CreateLog();
}
void DataLog::ReturnLog() {
return DataLogImpl::ReturnLog();
}
std::string DataLog::Combine(const std::string& table_name, int table_id) {
std::stringstream ss;
std::string combined_id = table_name;
std::string number_suffix;
ss << "_" << table_id;
ss >> number_suffix;
combined_id += number_suffix;
std::transform(combined_id.begin(), combined_id.end(), combined_id.begin(),
::tolower);
return combined_id;
}
int DataLog::AddTable(const std::string& table_name) {
DataLogImpl* data_log = DataLogImpl::StaticInstance();
if (data_log == NULL)
return -1;
return data_log->AddTable(table_name);
}
int DataLog::AddColumn(const std::string& table_name,
const std::string& column_name,
int multi_value_length) {
DataLogImpl* data_log = DataLogImpl::StaticInstance();
if (data_log == NULL)
return -1;
return data_log->DataLogImpl::StaticInstance()->AddColumn(table_name,
column_name,
multi_value_length);
}
int DataLog::NextRow(const std::string& table_name) {
DataLogImpl* data_log = DataLogImpl::StaticInstance();
if (data_log == NULL)
return -1;
return data_log->DataLogImpl::StaticInstance()->NextRow(table_name);
}
DataLogImpl::DataLogImpl()
: counter_(1),
tables_(),
flush_event_(EventWrapper::Create()),
file_writer_thread_(NULL),
tables_lock_(RWLockWrapper::CreateRWLock()) {
}
DataLogImpl::~DataLogImpl() {
StopThread();
Flush(); // Write any remaining rows
delete file_writer_thread_;
delete flush_event_;
for (TableMap::iterator it = tables_.begin(); it != tables_.end();) {
delete static_cast<LogTable*>(it->second);
// For maps all iterators (except the erased) are valid after an erase
tables_.erase(it++);
}
delete tables_lock_;
}
int DataLogImpl::CreateLog() {
CriticalSectionScoped synchronize(crit_sect_.get());
if (instance_ == NULL) {
instance_ = new DataLogImpl();
return instance_->Init();
} else {
++instance_->counter_;
}
return 0;
}
int DataLogImpl::Init() {
file_writer_thread_ = ThreadWrapper::CreateThread(
DataLogImpl::Run,
instance_,
kHighestPriority,
"DataLog");
if (file_writer_thread_ == NULL)
return -1;
unsigned int thread_id = 0;
bool success = file_writer_thread_->Start(thread_id);
if (!success)
return -1;
return 0;
}
DataLogImpl* DataLogImpl::StaticInstance() {
return instance_;
}
void DataLogImpl::ReturnLog() {
CriticalSectionScoped synchronize(crit_sect_.get());
if (instance_ && instance_->counter_ > 1) {
--instance_->counter_;
return;
}
delete instance_;
instance_ = NULL;
}
int DataLogImpl::AddTable(const std::string& table_name) {
WriteLockScoped synchronize(*tables_lock_);
// Make sure we don't add a table which already exists
if (tables_.count(table_name) > 0)
return -1;
tables_[table_name] = new LogTable();
if (tables_[table_name]->CreateLogFile(table_name + ".txt") == -1)
return -1;
return 0;
}
int DataLogImpl::AddColumn(const std::string& table_name,
const std::string& column_name,
int multi_value_length) {
ReadLockScoped synchronize(*tables_lock_);
if (tables_.count(table_name) == 0)
return -1;
return tables_[table_name]->AddColumn(column_name, multi_value_length);
}
int DataLogImpl::InsertCell(const std::string& table_name,
const std::string& column_name,
const Container* value_container) {
ReadLockScoped synchronize(*tables_lock_);
assert(tables_.count(table_name) > 0);
if (tables_.count(table_name) == 0)
return -1;
return tables_[table_name]->InsertCell(column_name, value_container);
}
int DataLogImpl::NextRow(const std::string& table_name) {
ReadLockScoped synchronize(*tables_lock_);
if (tables_.count(table_name) == 0)
return -1;
tables_[table_name]->NextRow();
if (file_writer_thread_ == NULL) {
// Write every row to file as they get complete.
tables_[table_name]->Flush();
} else {
// Signal a complete row
flush_event_->Set();
}
return 0;
}
void DataLogImpl::Flush() {
ReadLockScoped synchronize(*tables_lock_);
for (TableMap::iterator it = tables_.begin(); it != tables_.end(); ++it) {
it->second->Flush();
}
}
bool DataLogImpl::Run(void* obj) {
static_cast<DataLogImpl*>(obj)->Process();
return true;
}
void DataLogImpl::Process() {
// Wait for a row to be complete
flush_event_->Wait(WEBRTC_EVENT_INFINITE);
Flush();
}
void DataLogImpl::StopThread() {
if (file_writer_thread_ != NULL) {
file_writer_thread_->SetNotAlive();
flush_event_->Set();
// Call Stop() repeatedly, waiting for the Flush() call in Process() to
// finish.
while (!file_writer_thread_->Stop()) continue;
}
}
} // namespace webrtc