// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "remoting/host/linux/audio_pipe_reader.h" #include <fcntl.h> #include <sys/stat.h> #include <sys/types.h> #include <unistd.h> #include "base/files/file_path.h" #include "base/logging.h" #include "base/posix/eintr_wrapper.h" #include "base/stl_util.h" namespace remoting { namespace { // PulseAudio's module-pipe-sink must be configured to use the following // parameters for the sink we read from. const int kSamplesPerSecond = 48000; const int kChannels = 2; const int kBytesPerSample = 2; const int kSampleBytesPerSecond = kSamplesPerSecond * kChannels * kBytesPerSample; // Read data from the pipe every 40ms. const int kCapturingPeriodMs = 40; // Size of the pipe buffer in milliseconds. const int kPipeBufferSizeMs = kCapturingPeriodMs * 2; // Size of the pipe buffer in bytes. const int kPipeBufferSizeBytes = kPipeBufferSizeMs * kSampleBytesPerSecond / base::Time::kMillisecondsPerSecond; #if !defined(F_SETPIPE_SZ) // F_SETPIPE_SZ is supported only starting linux 2.6.35, but we want to be able // to compile this code on machines with older kernel. #define F_SETPIPE_SZ 1031 #endif // defined(F_SETPIPE_SZ) } // namespace // static scoped_refptr<AudioPipeReader> AudioPipeReader::Create( scoped_refptr<base::SingleThreadTaskRunner> task_runner, const base::FilePath& pipe_name) { // Create a reference to the new AudioPipeReader before posting the // StartOnAudioThread task, otherwise it may be deleted on the audio // thread before we return. scoped_refptr<AudioPipeReader> pipe_reader = new AudioPipeReader(task_runner); task_runner->PostTask(FROM_HERE, base::Bind( &AudioPipeReader::StartOnAudioThread, pipe_reader, pipe_name)); return pipe_reader; } void AudioPipeReader::StartOnAudioThread(const base::FilePath& pipe_name) { DCHECK(task_runner_->BelongsToCurrentThread()); pipe_fd_ = HANDLE_EINTR(open( pipe_name.value().c_str(), O_RDONLY | O_NONBLOCK)); if (pipe_fd_ < 0) { LOG(ERROR) << "Failed to open " << pipe_name.value(); return; } // Set buffer size for the pipe. int result = HANDLE_EINTR( fcntl(pipe_fd_, F_SETPIPE_SZ, kPipeBufferSizeBytes)); if (result < 0) { PLOG(ERROR) << "fcntl"; } WaitForPipeReadable(); } AudioPipeReader::AudioPipeReader( scoped_refptr<base::SingleThreadTaskRunner> task_runner) : task_runner_(task_runner), observers_(new ObserverListThreadSafe<StreamObserver>()) { } AudioPipeReader::~AudioPipeReader() { } void AudioPipeReader::AddObserver(StreamObserver* observer) { observers_->AddObserver(observer); } void AudioPipeReader::RemoveObserver(StreamObserver* observer) { observers_->RemoveObserver(observer); } void AudioPipeReader::OnFileCanReadWithoutBlocking(int fd) { DCHECK_EQ(fd, pipe_fd_); StartTimer(); } void AudioPipeReader::OnFileCanWriteWithoutBlocking(int fd) { NOTREACHED(); } void AudioPipeReader::StartTimer() { DCHECK(task_runner_->BelongsToCurrentThread()); started_time_ = base::TimeTicks::Now(); last_capture_position_ = 0; timer_.Start(FROM_HERE, base::TimeDelta::FromMilliseconds(kCapturingPeriodMs), this, &AudioPipeReader::DoCapture); } void AudioPipeReader::DoCapture() { DCHECK(task_runner_->BelongsToCurrentThread()); DCHECK_GT(pipe_fd_, 0); // Calculate how much we need read from the pipe. Pulseaudio doesn't control // how much data it writes to the pipe, so we need to pace the stream, so // that we read the exact number of the samples per second we need. base::TimeDelta stream_position = base::TimeTicks::Now() - started_time_; int64 stream_position_bytes = stream_position.InMilliseconds() * kSampleBytesPerSecond / base::Time::kMillisecondsPerSecond; int64 bytes_to_read = stream_position_bytes - last_capture_position_; std::string data = left_over_bytes_; size_t pos = data.size(); left_over_bytes_.clear(); data.resize(pos + bytes_to_read); while (pos < data.size()) { int read_result = HANDLE_EINTR( read(pipe_fd_, string_as_array(&data) + pos, data.size() - pos)); if (read_result > 0) { pos += read_result; } else { if (read_result < 0 && errno != EWOULDBLOCK && errno != EAGAIN) PLOG(ERROR) << "read"; break; } } // Stop reading from the pipe if PulseAudio isn't writing anything. if (pos == 0) { WaitForPipeReadable(); return; } // Save any incomplete samples we've read for later. Each packet should // contain integer number of samples. int incomplete_samples_bytes = pos % (kChannels * kBytesPerSample); left_over_bytes_.assign(data, pos - incomplete_samples_bytes, incomplete_samples_bytes); data.resize(pos - incomplete_samples_bytes); last_capture_position_ += data.size(); // Normally PulseAudio will keep pipe buffer full, so we should always be able // to read |bytes_to_read| bytes, but in case it's misbehaving we need to make // sure that |stream_position_bytes| doesn't go out of sync with the current // stream position. if (stream_position_bytes - last_capture_position_ > kPipeBufferSizeBytes) last_capture_position_ = stream_position_bytes - kPipeBufferSizeBytes; DCHECK_LE(last_capture_position_, stream_position_bytes); // Dispatch asynchronous notification to the stream observers. scoped_refptr<base::RefCountedString> data_ref = base::RefCountedString::TakeString(&data); observers_->Notify(&StreamObserver::OnDataRead, data_ref); } void AudioPipeReader::WaitForPipeReadable() { timer_.Stop(); base::MessageLoopForIO::current()->WatchFileDescriptor( pipe_fd_, false, base::MessageLoopForIO::WATCH_READ, &file_descriptor_watcher_, this); } // static void AudioPipeReaderTraits::Destruct(const AudioPipeReader* audio_pipe_reader) { audio_pipe_reader->task_runner_->DeleteSoon(FROM_HERE, audio_pipe_reader); } } // namespace remoting