// sttable.h // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // Copyright 2005-2010 Google, Inc. // Author: allauzen@google.com (Cyril Allauzen) // // \file // A generic string-to-type table file format // // This is not meant as a generalization of SSTable. This is more of // a simple replacement for SSTable in order to provide an open-source // implementation of the FAR format for the external version of the // FST Library. #ifndef FST_EXTENSIONS_FAR_STTABLE_H_ #define FST_EXTENSIONS_FAR_STTABLE_H_ #include <algorithm> #include <iostream> #include <fstream> #include <fst/util.h> namespace fst { static const int32 kSTTableMagicNumber = 2125656924; static const int32 kSTTableFileVersion = 1; // String-to-type table writing class for object of type 'T' using functor 'W' // to write an object of type 'T' from a stream. 'W' must conform to the // following interface: // // struct Writer { // void operator()(ostream &, const T &) const; // }; // template <class T, class W> class STTableWriter { public: typedef T EntryType; typedef W EntryWriter; explicit STTableWriter(const string &filename) : stream_(filename.c_str(), ofstream::out | ofstream::binary), error_(false) { WriteType(stream_, kSTTableMagicNumber); WriteType(stream_, kSTTableFileVersion); if (!stream_) { FSTERROR() << "STTableWriter::STTableWriter: error writing to file: " << filename; error_=true; } } static STTableWriter<T, W> *Create(const string &filename) { if (filename.empty()) { LOG(ERROR) << "STTableWriter: writing to standard out unsupported."; return 0; } return new STTableWriter<T, W>(filename); } void Add(const string &key, const T &t) { if (key == "") { FSTERROR() << "STTableWriter::Add: key empty: " << key; error_ = true; } else if (key < last_key_) { FSTERROR() << "STTableWriter::Add: key disorder: " << key; error_ = true; } if (error_) return; last_key_ = key; positions_.push_back(stream_.tellp()); WriteType(stream_, key); entry_writer_(stream_, t); } bool Error() const { return error_; } ~STTableWriter() { WriteType(stream_, positions_); WriteType(stream_, static_cast<int64>(positions_.size())); } private: EntryWriter entry_writer_; // Write functor for 'EntryType' ofstream stream_; // Output stream vector<int64> positions_; // Position in file of each key-entry pair string last_key_; // Last key bool error_; DISALLOW_COPY_AND_ASSIGN(STTableWriter); }; // String-to-type table reading class for object of type 'T' using functor 'R' // to read an object of type 'T' form a stream. 'R' must conform to the // following interface: // // struct Reader { // T *operator()(istream &) const; // }; // template <class T, class R> class STTableReader { public: typedef T EntryType; typedef R EntryReader; explicit STTableReader(const vector<string> &filenames) : sources_(filenames), entry_(0), error_(false) { compare_ = new Compare(&keys_); keys_.resize(filenames.size()); streams_.resize(filenames.size(), 0); positions_.resize(filenames.size()); for (size_t i = 0; i < filenames.size(); ++i) { streams_[i] = new ifstream( filenames[i].c_str(), ifstream::in | ifstream::binary); int32 magic_number = 0, file_version = 0; ReadType(*streams_[i], &magic_number); ReadType(*streams_[i], &file_version); if (magic_number != kSTTableMagicNumber) { FSTERROR() << "STTableReader::STTableReader: wrong file type: " << filenames[i]; error_ = true; return; } if (file_version != kSTTableFileVersion) { FSTERROR() << "STTableReader::STTableReader: wrong file version: " << filenames[i]; error_ = true; return; } int64 num_entries; streams_[i]->seekg(-static_cast<int>(sizeof(int64)), ios_base::end); ReadType(*streams_[i], &num_entries); streams_[i]->seekg(-static_cast<int>(sizeof(int64)) * (num_entries + 1), ios_base::end); positions_[i].resize(num_entries); for (size_t j = 0; (j < num_entries) && (*streams_[i]); ++j) ReadType(*streams_[i], &(positions_[i][j])); streams_[i]->seekg(positions_[i][0]); if (!*streams_[i]) { FSTERROR() << "STTableReader::STTableReader: error reading file: " << filenames[i]; error_ = true; return; } } MakeHeap(); } ~STTableReader() { for (size_t i = 0; i < streams_.size(); ++i) delete streams_[i]; delete compare_; if (entry_) delete entry_; } static STTableReader<T, R> *Open(const string &filename) { if (filename.empty()) { LOG(ERROR) << "STTableReader: reading from standard in not supported"; return 0; } vector<string> filenames; filenames.push_back(filename); return new STTableReader<T, R>(filenames); } static STTableReader<T, R> *Open(const vector<string> &filenames) { return new STTableReader<T, R>(filenames); } void Reset() { if (error_) return; for (size_t i = 0; i < streams_.size(); ++i) streams_[i]->seekg(positions_[i].front()); MakeHeap(); } bool Find(const string &key) { if (error_) return false; for (size_t i = 0; i < streams_.size(); ++i) LowerBound(i, key); MakeHeap(); return keys_[current_] == key; } bool Done() const { return error_ || heap_.empty(); } void Next() { if (error_) return; if (streams_[current_]->tellg() <= positions_[current_].back()) { ReadType(*(streams_[current_]), &(keys_[current_])); if (!*streams_[current_]) { FSTERROR() << "STTableReader: error reading file: " << sources_[current_]; error_ = true; return; } push_heap(heap_.begin(), heap_.end(), *compare_); } else { heap_.pop_back(); } if (!heap_.empty()) PopHeap(); } const string &GetKey() const { return keys_[current_]; } const EntryType &GetEntry() const { return *entry_; } bool Error() const { return error_; } private: // Comparison functor used to compare stream IDs in the heap struct Compare { Compare(const vector<string> *keys) : keys_(keys) {} bool operator()(size_t i, size_t j) const { return (*keys_)[i] > (*keys_)[j]; }; private: const vector<string> *keys_; }; // Position the stream with ID 'id' at the position corresponding // to the lower bound for key 'find_key' void LowerBound(size_t id, const string &find_key) { ifstream *strm = streams_[id]; const vector<int64> &positions = positions_[id]; size_t low = 0, high = positions.size() - 1; while (low < high) { size_t mid = (low + high)/2; strm->seekg(positions[mid]); string key; ReadType(*strm, &key); if (key > find_key) { high = mid; } else if (key < find_key) { low = mid + 1; } else { for (size_t i = mid; i > low; --i) { strm->seekg(positions[i - 1]); ReadType(*strm, &key); if (key != find_key) { strm->seekg(positions[i]); return; } } strm->seekg(positions[low]); return; } } strm->seekg(positions[low]); } // Add all streams to the heap void MakeHeap() { heap_.clear(); for (size_t i = 0; i < streams_.size(); ++i) { ReadType(*streams_[i], &(keys_[i])); if (!*streams_[i]) { FSTERROR() << "STTableReader: error reading file: " << sources_[i]; error_ = true; return; } heap_.push_back(i); } make_heap(heap_.begin(), heap_.end(), *compare_); PopHeap(); } // Position the stream with the lowest key at the top // of the heap, set 'current_' to the ID of that stream // and read the current entry from that stream void PopHeap() { pop_heap(heap_.begin(), heap_.end(), *compare_); current_ = heap_.back(); if (entry_) delete entry_; entry_ = entry_reader_(*streams_[current_]); if (!entry_) error_ = true; if (!*streams_[current_]) { FSTERROR() << "STTableReader: error reading entry for key: " << keys_[current_] << ", file: " << sources_[current_]; error_ = true; } } EntryReader entry_reader_; // Read functor for 'EntryType' vector<ifstream*> streams_; // Input streams vector<string> sources_; // and corresponding file names vector<vector<int64> > positions_; // Index of positions for each stream vector<string> keys_; // Lowest unread key for each stream vector<int64> heap_; // Heap containing ID of streams with unread keys int64 current_; // Id of current stream to be read Compare *compare_; // Functor comparing stream IDs for the heap mutable EntryType *entry_; // Pointer to the currently read entry bool error_; DISALLOW_COPY_AND_ASSIGN(STTableReader); }; // String-to-type table header reading function template on the entry header // type 'H' having a member function: // Read(istream &strm, const string &filename); // Checks that 'filename' is an STTable and call the H::Read() on the last // entry in the STTable. template <class H> bool ReadSTTableHeader(const string &filename, H *header) { ifstream strm(filename.c_str(), ifstream::in | ifstream::binary); int32 magic_number = 0, file_version = 0; ReadType(strm, &magic_number); ReadType(strm, &file_version); if (magic_number != kSTTableMagicNumber) { LOG(ERROR) << "ReadSTTableHeader: wrong file type: " << filename; return false; } if (file_version != kSTTableFileVersion) { LOG(ERROR) << "ReadSTTableHeader: wrong file version: " << filename; return false; } int64 i = -1; strm.seekg(-static_cast<int>(sizeof(int64)), ios_base::end); ReadType(strm, &i); // Read number of entries if (!strm) { LOG(ERROR) << "ReadSTTableHeader: error reading file: " << filename; return false; } if (i == 0) return true; // No entry header to read strm.seekg(-2 * static_cast<int>(sizeof(int64)), ios_base::end); ReadType(strm, &i); // Read position for last entry in file strm.seekg(i); string key; ReadType(strm, &key); header->Read(strm, filename + ":" + key); if (!strm) { LOG(ERROR) << "ReadSTTableHeader: error reading file: " << filename; return false; } return true; } bool IsSTTable(const string &filename); } // namespace fst #endif // FST_EXTENSIONS_FAR_STTABLE_H_