// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "mojo/public/cpp/system/wait.h"

#include <string>
#include <vector>

#include "base/bind.h"
#include "base/callback.h"
#include "base/strings/string_piece.h"
#include "base/threading/platform_thread.h"
#include "base/threading/simple_thread.h"
#include "base/time/time.h"
#include "mojo/public/c/system/types.h"
#include "mojo/public/cpp/system/handle_signals_state.h"
#include "mojo/public/cpp/system/message_pipe.h"
#include "testing/gtest/include/gtest/gtest.h"

namespace mojo {
namespace {

using WaitTest = testing::Test;
using WaitManyTest = testing::Test;

void WriteMessage(const ScopedMessagePipeHandle& handle,
                  const base::StringPiece& message) {
  MojoResult rv = WriteMessageRaw(handle.get(), message.data(),
                                  static_cast<uint32_t>(message.size()),
                                  nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE);
  CHECK_EQ(MOJO_RESULT_OK, rv);
}

std::string ReadMessage(const ScopedMessagePipeHandle& handle) {
  std::vector<uint8_t> bytes;
  MojoResult rv = ReadMessageRaw(handle.get(), &bytes, nullptr,
                                 MOJO_READ_MESSAGE_FLAG_NONE);
  CHECK_EQ(MOJO_RESULT_OK, rv);
  return std::string(bytes.begin(), bytes.end());
}

class ThreadedRunner : public base::SimpleThread {
 public:
  explicit ThreadedRunner(const base::Closure& callback)
      : SimpleThread("ThreadedRunner"), callback_(callback) {}
  ~ThreadedRunner() override { Join(); }

  void Run() override { callback_.Run(); }

 private:
  const base::Closure callback_;

  DISALLOW_COPY_AND_ASSIGN(ThreadedRunner);
};

TEST_F(WaitTest, InvalidArguments) {
  Handle invalid_handle;

  EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
            Wait(invalid_handle, MOJO_HANDLE_SIGNAL_READABLE));

  MessagePipe p;
  Handle valid_handles[2] = {p.handle0.get(), p.handle1.get()};
  Handle invalid_handles[2];
  MojoHandleSignals signals[2] = {MOJO_HANDLE_SIGNAL_NONE,
                                  MOJO_HANDLE_SIGNAL_NONE};
  size_t result_index = 0;
  EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
            WaitMany(invalid_handles, signals, 2, &result_index));
  EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
            WaitMany(nullptr, signals, 2, &result_index));
  EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
            WaitMany(valid_handles, nullptr, 2, &result_index));
}

TEST_F(WaitTest, Basic) {
  MessagePipe p;

  // Write to one end of the pipe and wait on the other.
  const char kTestMessage1[] = "how about a nice game of chess?";
  WriteMessage(p.handle0, kTestMessage1);
  EXPECT_EQ(MOJO_RESULT_OK, Wait(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE));

  // And make sure we can also grab the handle signals state (with both the C
  // and C++ library structs.)

  MojoHandleSignalsState c_hss = {0, 0};
  EXPECT_EQ(MOJO_RESULT_OK,
            Wait(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE, &c_hss));
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            c_hss.satisfied_signals);

  HandleSignalsState hss;
  EXPECT_EQ(MOJO_RESULT_OK,
            Wait(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE, &hss));
  EXPECT_TRUE(hss.readable() && hss.writable() && !hss.peer_closed());
  EXPECT_FALSE(hss.never_readable() || hss.never_writable() ||
               hss.never_peer_closed());

  // Now close the writing end and wait for peer closure.

  p.handle0.reset();
  EXPECT_EQ(MOJO_RESULT_OK,
            Wait(p.handle1.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));

  // Still readable as there's still a message queued. No longer writable as
  // peer closure has been detected.
  EXPECT_TRUE(hss.readable() && hss.peer_closed() && !hss.writable());
  EXPECT_TRUE(hss.never_writable() && !hss.never_readable() &&
              !hss.never_peer_closed());

  // Read the message and wait for readable again. Waiting should fail since
  // there are no more messages and the peer is closed.
  EXPECT_EQ(kTestMessage1, ReadMessage(p.handle1));
  EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
            Wait(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE, &hss));

  // Sanity check the signals state again.
  EXPECT_TRUE(hss.peer_closed() && !hss.readable() && !hss.writable());
  EXPECT_TRUE(hss.never_readable() && hss.never_writable() &&
              !hss.never_peer_closed());
}

TEST_F(WaitTest, DelayedWrite) {
  MessagePipe p;

  ThreadedRunner write_after_delay(base::Bind(
      [](ScopedMessagePipeHandle* handle) {
        // Wait a little while, then write a message.
        base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200));
        WriteMessage(*handle, "wakey wakey");
      },
      &p.handle0));
  write_after_delay.Start();

  HandleSignalsState hss;
  EXPECT_EQ(MOJO_RESULT_OK,
            Wait(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE, &hss));
  EXPECT_TRUE(hss.readable() && hss.writable() && !hss.peer_closed());
  EXPECT_TRUE(!hss.never_readable() && !hss.never_writable() &&
              !hss.never_peer_closed());
}

TEST_F(WaitTest, DelayedPeerClosure) {
  MessagePipe p;

  ThreadedRunner close_after_delay(base::Bind(
      [](ScopedMessagePipeHandle* handle) {
        // Wait a little while, then close the handle.
        base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200));
        handle->reset();
      },
      &p.handle0));
  close_after_delay.Start();

  HandleSignalsState hss;
  EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
            Wait(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE, &hss));
  EXPECT_TRUE(!hss.readable() && !hss.writable() && hss.peer_closed());
  EXPECT_TRUE(hss.never_readable() && hss.never_writable() &&
              !hss.never_peer_closed());
}

TEST_F(WaitTest, CloseWhileWaiting) {
  MessagePipe p;
  ThreadedRunner close_after_delay(base::Bind(
      [](ScopedMessagePipeHandle* handle) {
        base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200));
        handle->reset();
      },
      &p.handle0));
  close_after_delay.Start();
  EXPECT_EQ(MOJO_RESULT_CANCELLED,
            Wait(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE));
}

TEST_F(WaitManyTest, Basic) {
  MessagePipe p;

  const char kTestMessage1[] = "hello";
  WriteMessage(p.handle0, kTestMessage1);

  // Wait for either handle to become readable. Wait twice, just to verify that
  // we can use either the C or C++ signaling state structure for the last
  // argument.

  Handle handles[2] = {p.handle0.get(), p.handle1.get()};
  MojoHandleSignals signals[2] = {MOJO_HANDLE_SIGNAL_READABLE,
                                  MOJO_HANDLE_SIGNAL_READABLE};
  size_t result_index = 0;
  MojoHandleSignalsState c_hss[2];
  HandleSignalsState hss[2];

  EXPECT_EQ(MOJO_RESULT_OK,
            WaitMany(handles, signals, 2, &result_index, c_hss));
  EXPECT_EQ(1u, result_index);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, c_hss[0].satisfied_signals);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            c_hss[1].satisfied_signals);

  EXPECT_EQ(MOJO_RESULT_OK, WaitMany(handles, signals, 2, &result_index, hss));
  EXPECT_EQ(1u, result_index);
  EXPECT_TRUE(!hss[0].readable() && hss[0].writable() && !hss[0].peer_closed());
  EXPECT_TRUE(!hss[0].never_readable() && !hss[0].never_writable() &&
              !hss[0].never_peer_closed());
  EXPECT_TRUE(hss[1].readable() && hss[1].writable() && !hss[1].peer_closed());
  EXPECT_TRUE(!hss[1].never_readable() && !hss[1].never_writable() &&
              !hss[1].never_peer_closed());

  // Close the writer and read the message. Try to wait again, and it should
  // fail due to the conditions being unsatisfiable.

  EXPECT_EQ(kTestMessage1, ReadMessage(p.handle1));
  p.handle0.reset();

  // handles[0] is invalid.
  EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
            WaitMany(handles, signals, 2, &result_index, hss));
  handles[0] = handles[1];
  EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
            WaitMany(handles, signals, 1, &result_index, hss));
  EXPECT_EQ(0u, result_index);
  EXPECT_TRUE(!hss[0].readable() && !hss[0].writable() && hss[0].peer_closed());
  EXPECT_TRUE(hss[0].never_readable() && hss[0].never_writable() &&
              !hss[0].never_peer_closed());
}

TEST_F(WaitManyTest, CloseWhileWaiting) {
  MessagePipe p, q;

  Handle handles[3] = {q.handle0.get(), q.handle1.get(), p.handle1.get()};
  MojoHandleSignals signals[3] = {MOJO_HANDLE_SIGNAL_READABLE,
                                  MOJO_HANDLE_SIGNAL_READABLE,
                                  MOJO_HANDLE_SIGNAL_READABLE};

  ThreadedRunner close_after_delay(base::Bind(
      [](ScopedMessagePipeHandle* handle) {
        base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200));
        handle->reset();
      },
      &p.handle1));
  close_after_delay.Start();

  size_t result_index = 0;
  EXPECT_EQ(MOJO_RESULT_CANCELLED,
            WaitMany(handles, signals, 3, &result_index));
  EXPECT_EQ(2u, result_index);
}

TEST_F(WaitManyTest, DelayedWrite) {
  MessagePipe p;

  ThreadedRunner write_after_delay(base::Bind(
      [](ScopedMessagePipeHandle* handle) {
        // Wait a little while, then write a message.
        base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200));
        WriteMessage(*handle, "wakey wakey");
      },
      &p.handle0));
  write_after_delay.Start();

  Handle handles[2] = {p.handle0.get(), p.handle1.get()};
  MojoHandleSignals signals[2] = {MOJO_HANDLE_SIGNAL_READABLE,
                                  MOJO_HANDLE_SIGNAL_READABLE};
  size_t result_index = 0;
  HandleSignalsState hss[2];
  EXPECT_EQ(MOJO_RESULT_OK, WaitMany(handles, signals, 2, &result_index, hss));
  EXPECT_EQ(1u, result_index);
  EXPECT_TRUE(!hss[0].readable() && hss[0].writable() && !hss[0].peer_closed());
  EXPECT_TRUE(!hss[0].never_readable() && !hss[0].never_writable() &&
              !hss[0].never_peer_closed());
  EXPECT_TRUE(hss[1].readable() && hss[1].writable() && !hss[1].peer_closed());
  EXPECT_TRUE(!hss[1].never_readable() && !hss[1].never_writable() &&
              !hss[1].never_peer_closed());
}

TEST_F(WaitManyTest, DelayedPeerClosure) {
  MessagePipe p, q;

  ThreadedRunner close_after_delay(base::Bind(
      [](ScopedMessagePipeHandle* handle) {
        // Wait a little while, then close the handle.
        base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200));
        handle->reset();
      },
      &p.handle0));
  close_after_delay.Start();

  Handle handles[3] = {q.handle0.get(), q.handle1.get(), p.handle1.get()};
  MojoHandleSignals signals[3] = {MOJO_HANDLE_SIGNAL_READABLE,
                                  MOJO_HANDLE_SIGNAL_READABLE,
                                  MOJO_HANDLE_SIGNAL_READABLE};
  size_t result_index = 0;
  HandleSignalsState hss[3];
  EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
            WaitMany(handles, signals, 3, &result_index, hss));
  EXPECT_EQ(2u, result_index);
  EXPECT_TRUE(!hss[2].readable() && !hss[2].writable() && hss[2].peer_closed());
  EXPECT_TRUE(hss[2].never_readable() && hss[2].never_writable() &&
              !hss[2].never_peer_closed());
}

}  // namespace
}  // namespace mojo