// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "jingle/glue/channel_socket_adapter.h" #include <limits> #include "base/callback.h" #include "base/logging.h" #include "base/message_loop/message_loop.h" #include "net/base/io_buffer.h" #include "net/base/net_errors.h" #include "third_party/libjingle/source/talk/p2p/base/transportchannel.h" namespace jingle_glue { TransportChannelSocketAdapter::TransportChannelSocketAdapter( cricket::TransportChannel* channel) : message_loop_(base::MessageLoop::current()), channel_(channel), closed_error_code_(net::OK) { DCHECK(channel_); channel_->SignalReadPacket.connect( this, &TransportChannelSocketAdapter::OnNewPacket); channel_->SignalWritableState.connect( this, &TransportChannelSocketAdapter::OnWritableState); channel_->SignalDestroyed.connect( this, &TransportChannelSocketAdapter::OnChannelDestroyed); } TransportChannelSocketAdapter::~TransportChannelSocketAdapter() { if (!destruction_callback_.is_null()) destruction_callback_.Run(); } void TransportChannelSocketAdapter::SetOnDestroyedCallback( const base::Closure& callback) { destruction_callback_ = callback; } int TransportChannelSocketAdapter::Read( net::IOBuffer* buf, int buffer_size, const net::CompletionCallback& callback) { DCHECK_EQ(base::MessageLoop::current(), message_loop_); DCHECK(buf); DCHECK(!callback.is_null()); CHECK(read_callback_.is_null()); if (!channel_) { DCHECK(closed_error_code_ != net::OK); return closed_error_code_; } read_callback_ = callback; read_buffer_ = buf; read_buffer_size_ = buffer_size; return net::ERR_IO_PENDING; } int TransportChannelSocketAdapter::Write( net::IOBuffer* buffer, int buffer_size, const net::CompletionCallback& callback) { DCHECK_EQ(base::MessageLoop::current(), message_loop_); DCHECK(buffer); DCHECK(!callback.is_null()); CHECK(write_callback_.is_null()); if (!channel_) { DCHECK(closed_error_code_ != net::OK); return closed_error_code_; } int result; if (channel_->writable()) { result = channel_->SendPacket(buffer->data(), buffer_size, talk_base::DSCP_NO_CHANGE); if (result < 0) { result = net::MapSystemError(channel_->GetError()); // If the underlying socket returns IO pending where it shouldn't we // pretend the packet is dropped and return as succeeded because no // writeable callback will happen. if (result == net::ERR_IO_PENDING) result = net::OK; } } else { // Channel is not writable yet. result = net::ERR_IO_PENDING; write_callback_ = callback; write_buffer_ = buffer; write_buffer_size_ = buffer_size; } return result; } bool TransportChannelSocketAdapter::SetReceiveBufferSize(int32 size) { DCHECK_EQ(base::MessageLoop::current(), message_loop_); return channel_->SetOption(talk_base::Socket::OPT_RCVBUF, size) == 0; } bool TransportChannelSocketAdapter::SetSendBufferSize(int32 size) { DCHECK_EQ(base::MessageLoop::current(), message_loop_); return channel_->SetOption(talk_base::Socket::OPT_SNDBUF, size) == 0; } void TransportChannelSocketAdapter::Close(int error_code) { DCHECK_EQ(base::MessageLoop::current(), message_loop_); if (!channel_) // Already closed. return; DCHECK(error_code != net::OK); closed_error_code_ = error_code; channel_->SignalReadPacket.disconnect(this); channel_->SignalDestroyed.disconnect(this); channel_ = NULL; if (!read_callback_.is_null()) { net::CompletionCallback callback = read_callback_; read_callback_.Reset(); read_buffer_ = NULL; callback.Run(error_code); } if (!write_callback_.is_null()) { net::CompletionCallback callback = write_callback_; write_callback_.Reset(); write_buffer_ = NULL; callback.Run(error_code); } } void TransportChannelSocketAdapter::OnNewPacket( cricket::TransportChannel* channel, const char* data, size_t data_size, const talk_base::PacketTime& packet_time, int flags) { DCHECK_EQ(base::MessageLoop::current(), message_loop_); DCHECK_EQ(channel, channel_); if (!read_callback_.is_null()) { DCHECK(read_buffer_.get()); CHECK_LT(data_size, static_cast<size_t>(std::numeric_limits<int>::max())); if (read_buffer_size_ < static_cast<int>(data_size)) { LOG(WARNING) << "Data buffer is smaller than the received packet. " << "Dropping the data that doesn't fit."; data_size = read_buffer_size_; } memcpy(read_buffer_->data(), data, data_size); net::CompletionCallback callback = read_callback_; read_callback_.Reset(); read_buffer_ = NULL; callback.Run(data_size); } else { LOG(WARNING) << "Data was received without a callback. Dropping the packet."; } } void TransportChannelSocketAdapter::OnWritableState( cricket::TransportChannel* channel) { DCHECK_EQ(base::MessageLoop::current(), message_loop_); // Try to send the packet if there is a pending write. if (!write_callback_.is_null()) { int result = channel_->SendPacket(write_buffer_->data(), write_buffer_size_, talk_base::DSCP_NO_CHANGE); if (result < 0) result = net::MapSystemError(channel_->GetError()); if (result != net::ERR_IO_PENDING) { net::CompletionCallback callback = write_callback_; write_callback_.Reset(); write_buffer_ = NULL; callback.Run(result); } } } void TransportChannelSocketAdapter::OnChannelDestroyed( cricket::TransportChannel* channel) { DCHECK_EQ(base::MessageLoop::current(), message_loop_); DCHECK_EQ(channel, channel_); Close(net::ERR_CONNECTION_ABORTED); } } // namespace jingle_glue