# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'spec_helper'

def load_test_certs
  test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')
  files = ['ca.pem', 'server1.key', 'server1.pem']
  files.map { |f| File.open(File.join(test_root, f)).read }
end

def check_md(wanted_md, received_md)
  wanted_md.zip(received_md).each do |w, r|
    w.each do |key, value|
      expect(r[key]).to eq(value)
    end
  end
end

# A test service with no methods.
class EmptyService
  include GRPC::GenericService
end

# A test service without an implementation.
class NoRpcImplementation
  include GRPC::GenericService
  rpc :an_rpc, EchoMsg, EchoMsg
end

# A test service with an implementation that fails with BadStatus
class FailingService
  include GRPC::GenericService
  rpc :an_rpc, EchoMsg, EchoMsg
  attr_reader :details, :code, :md

  def initialize(_default_var = 'ignored')
    @details = 'app error'
    @code = 101
    @md = { 'failed_method' => 'an_rpc' }
  end

  def an_rpc(_req, _call)
    fail GRPC::BadStatus.new(@code, @details, @md)
  end
end

FailingStub = FailingService.rpc_stub_class

# A slow test service.
class SlowService
  include GRPC::GenericService
  rpc :an_rpc, EchoMsg, EchoMsg
  attr_reader :received_md, :delay

  def initialize(_default_var = 'ignored')
    @delay = 0.25
    @received_md = []
  end

  def an_rpc(req, call)
    GRPC.logger.info("starting a slow #{@delay} rpc")
    sleep @delay
    @received_md << call.metadata unless call.metadata.nil?
    req  # send back the req as the response
  end
end

SlowStub = SlowService.rpc_stub_class

# A test service that allows a synchronized RPC cancellation
class SynchronizedCancellationService
  include GRPC::GenericService
  rpc :an_rpc, EchoMsg, EchoMsg
  attr_reader :received_md, :delay

  # notify_request_received and wait_until_rpc_cancelled are
  # callbacks to synchronously allow the client to proceed with
  # cancellation (after the unary request has been received),
  # and to synchronously wait until the client has cancelled the
  # current RPC.
  def initialize(notify_request_received, wait_until_rpc_cancelled)
    @notify_request_received = notify_request_received
    @wait_until_rpc_cancelled = wait_until_rpc_cancelled
  end

  def an_rpc(req, _call)
    GRPC.logger.info('starting a synchronusly cancelled rpc')
    @notify_request_received.call(req)
    @wait_until_rpc_cancelled.call
    req  # send back the req as the response
  end
end

SynchronizedCancellationStub = SynchronizedCancellationService.rpc_stub_class

# a test service that hangs onto call objects
# and uses them after the server-side call has been
# finished
class CheckCallAfterFinishedService
  include GRPC::GenericService
  rpc :an_rpc, EchoMsg, EchoMsg
  rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg
  rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg)
  rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg)
  attr_reader :server_side_call

  def an_rpc(req, call)
    fail 'shouldnt reuse service' unless @server_side_call.nil?
    @server_side_call = call
    req
  end

  def a_client_streaming_rpc(call)
    fail 'shouldnt reuse service' unless @server_side_call.nil?
    @server_side_call = call
    # iterate through requests so call can complete
    call.each_remote_read.each { |r| GRPC.logger.info(r) }
    EchoMsg.new
  end

  def a_server_streaming_rpc(_, call)
    fail 'shouldnt reuse service' unless @server_side_call.nil?
    @server_side_call = call
    [EchoMsg.new, EchoMsg.new]
  end

  def a_bidi_rpc(requests, call)
    fail 'shouldnt reuse service' unless @server_side_call.nil?
    @server_side_call = call
    requests.each { |r| GRPC.logger.info(r) }
    [EchoMsg.new, EchoMsg.new]
  end
end

CheckCallAfterFinishedServiceStub = CheckCallAfterFinishedService.rpc_stub_class

# A service with a bidi streaming method.
class BidiService
  include GRPC::GenericService
  rpc :server_sends_bad_input, stream(EchoMsg), stream(EchoMsg)

  def server_sends_bad_input(_, _)
    'bad response. (not an enumerable, client sees an error)'
  end
end

BidiStub = BidiService.rpc_stub_class

describe GRPC::RpcServer do
  RpcServer = GRPC::RpcServer
  StatusCodes = GRPC::Core::StatusCodes

  before(:each) do
    @method = 'an_rpc_method'
    @pass = 0
    @fail = 1
    @noop = proc { |x| x }
  end

  describe '#new' do
    it 'can be created with just some args' do
      opts = { server_args: { a_channel_arg: 'an_arg' } }
      blk = proc do
        new_rpc_server_for_testing(**opts)
      end
      expect(&blk).not_to raise_error
    end

    it 'cannot be created with invalid ServerCredentials' do
      blk = proc do
        opts = {
          server_args: { a_channel_arg: 'an_arg' },
          creds: Object.new
        }
        new_rpc_server_for_testing(**opts)
      end
      expect(&blk).to raise_error
    end
  end

  describe '#stopped?' do
    before(:each) do
      opts = { server_args: { a_channel_arg: 'an_arg' }, poll_period: 1.5 }
      @srv = new_rpc_server_for_testing(**opts)
      @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
    end

    it 'starts out false' do
      expect(@srv.stopped?).to be(false)
    end

    it 'stays false after the server starts running', server: true do
      @srv.handle(EchoService)
      t = Thread.new { @srv.run }
      @srv.wait_till_running
      expect(@srv.stopped?).to be(false)
      @srv.stop
      t.join
    end

    it 'is true after a running server is stopped', server: true do
      @srv.handle(EchoService)
      t = Thread.new { @srv.run }
      @srv.wait_till_running
      @srv.stop
      t.join
      expect(@srv.stopped?).to be(true)
    end
  end

  describe '#running?' do
    it 'starts out false' do
      opts = {
        server_args: { a_channel_arg: 'an_arg' }
      }
      r = new_rpc_server_for_testing(**opts)
      expect(r.running?).to be(false)
    end

    it 'is false if run is called with no services registered', server: true do
      opts = {
        server_args: { a_channel_arg: 'an_arg' },
        poll_period: 2
      }
      r = new_rpc_server_for_testing(**opts)
      r.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
      expect { r.run }.to raise_error(RuntimeError)
    end

    it 'is true after run is called with a registered service' do
      opts = {
        server_args: { a_channel_arg: 'an_arg' },
        poll_period: 2.5
      }
      r = new_rpc_server_for_testing(**opts)
      r.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
      r.handle(EchoService)
      t = Thread.new { r.run }
      r.wait_till_running
      expect(r.running?).to be(true)
      r.stop
      t.join
    end
  end

  describe '#handle' do
    before(:each) do
      @opts = { server_args: { a_channel_arg: 'an_arg' }, poll_period: 1 }
      @srv = new_rpc_server_for_testing(**@opts)
      @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
    end

    it 'raises if #run has already been called' do
      @srv.handle(EchoService)
      t = Thread.new { @srv.run }
      @srv.wait_till_running
      expect { @srv.handle(EchoService) }.to raise_error
      @srv.stop
      t.join
    end

    it 'raises if the server has been run and stopped' do
      @srv.handle(EchoService)
      t = Thread.new { @srv.run }
      @srv.wait_till_running
      @srv.stop
      t.join
      expect { @srv.handle(EchoService) }.to raise_error
    end

    it 'raises if the service does not include GenericService ' do
      expect { @srv.handle(Object) }.to raise_error
    end

    it 'raises if the service does not declare any rpc methods' do
      expect { @srv.handle(EmptyService) }.to raise_error
    end

    it 'raises if a handler method is already registered' do
      @srv.handle(EchoService)
      expect { r.handle(EchoService) }.to raise_error
    end
  end

  describe '#run' do
    let(:client_opts) { { channel_override: @ch } }
    let(:marshal) { EchoService.rpc_descs[:an_rpc].marshal_proc }
    let(:unmarshal) { EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) }

    context 'with no connect_metadata' do
      before(:each) do
        server_opts = {
          poll_period: 1
        }
        @srv = new_rpc_server_for_testing(**server_opts)
        server_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
        @host = "localhost:#{server_port}"
        @ch = GRPC::Core::Channel.new(@host, nil, :this_channel_is_insecure)
      end

      it 'should return NOT_FOUND status on unknown methods', server: true do
        @srv.handle(EchoService)
        t = Thread.new { @srv.run }
        @srv.wait_till_running
        req = EchoMsg.new
        blk = proc do
          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure,
                                      **client_opts)
          stub.request_response('/unknown', req, marshal, unmarshal)
        end
        expect(&blk).to raise_error GRPC::BadStatus
        @srv.stop
        t.join
      end

      it 'should return UNIMPLEMENTED on unimplemented methods', server: true do
        @srv.handle(NoRpcImplementation)
        t = Thread.new { @srv.run }
        @srv.wait_till_running
        req = EchoMsg.new
        blk = proc do
          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure,
                                      **client_opts)
          stub.request_response('/an_rpc', req, marshal, unmarshal)
        end
        expect(&blk).to raise_error do |error|
          expect(error).to be_a(GRPC::BadStatus)
          expect(error.code).to be(GRPC::Core::StatusCodes::UNIMPLEMENTED)
        end
        @srv.stop
        t.join
      end

      it 'should handle multiple sequential requests', server: true do
        @srv.handle(EchoService)
        t = Thread.new { @srv.run }
        @srv.wait_till_running
        req = EchoMsg.new
        n = 5  # arbitrary
        stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
        n.times { expect(stub.an_rpc(req)).to be_a(EchoMsg) }
        @srv.stop
        t.join
      end

      it 'should receive metadata sent as rpc keyword args', server: true do
        service = EchoService.new
        @srv.handle(service)
        t = Thread.new { @srv.run }
        @srv.wait_till_running
        req = EchoMsg.new
        stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
        expect(stub.an_rpc(req, metadata: { k1: 'v1', k2: 'v2' }))
          .to be_a(EchoMsg)
        wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
        check_md(wanted_md, service.received_md)
        @srv.stop
        t.join
      end

      it 'should receive metadata if a deadline is specified', server: true do
        service = SlowService.new
        @srv.handle(service)
        t = Thread.new { @srv.run }
        @srv.wait_till_running
        req = EchoMsg.new
        stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts)
        timeout = service.delay + 1.0
        deadline = GRPC::Core::TimeConsts.from_relative_time(timeout)
        resp = stub.an_rpc(req,
                           deadline: deadline,
                           metadata: { k1: 'v1', k2: 'v2' })
        expect(resp).to be_a(EchoMsg)
        wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
        check_md(wanted_md, service.received_md)
        @srv.stop
        t.join
      end

      it 'should handle cancellation correctly', server: true do
        request_received = false
        request_received_mu = Mutex.new
        request_received_cv = ConditionVariable.new
        notify_request_received = proc do |req|
          request_received_mu.synchronize do
            fail 'req is nil' if req.nil?
            expect(req.is_a?(EchoMsg)).to be true
            fail 'test bug - already set' if request_received
            request_received = true
            request_received_cv.signal
          end
        end

        rpc_cancelled = false
        rpc_cancelled_mu = Mutex.new
        rpc_cancelled_cv = ConditionVariable.new
        wait_until_rpc_cancelled = proc do
          rpc_cancelled_mu.synchronize do
            loop do
              break if rpc_cancelled
              rpc_cancelled_cv.wait(rpc_cancelled_mu)
            end
          end
        end

        service = SynchronizedCancellationService.new(notify_request_received,
                                                      wait_until_rpc_cancelled)
        @srv.handle(service)
        srv_thd = Thread.new { @srv.run }
        @srv.wait_till_running
        req = EchoMsg.new
        stub = SynchronizedCancellationStub.new(@host,
                                                :this_channel_is_insecure,
                                                **client_opts)
        op = stub.an_rpc(req, return_op: true)

        client_thd = Thread.new do
          expect { op.execute }.to raise_error GRPC::Cancelled
        end

        request_received_mu.synchronize do
          loop do
            break if request_received
            request_received_cv.wait(request_received_mu)
          end
        end

        op.cancel

        rpc_cancelled_mu.synchronize do
          fail 'test bug - already set' if rpc_cancelled
          rpc_cancelled = true
          rpc_cancelled_cv.signal
        end

        client_thd.join
        @srv.stop
        srv_thd.join
      end

      it 'should handle multiple parallel requests', server: true do
        @srv.handle(EchoService)
        t = Thread.new { @srv.run }
        @srv.wait_till_running
        req, q = EchoMsg.new, Queue.new
        n = 5  # arbitrary
        threads = [t]
        n.times do
          threads << Thread.new do
            stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
            q << stub.an_rpc(req)
          end
        end
        n.times { expect(q.pop).to be_a(EchoMsg) }
        @srv.stop
        threads.each(&:join)
      end

      it 'should return RESOURCE_EXHAUSTED on too many jobs', server: true do
        opts = {
          server_args: { a_channel_arg: 'an_arg' },
          pool_size: 2,
          poll_period: 1,
          max_waiting_requests: 1
        }
        alt_srv = new_rpc_server_for_testing(**opts)
        alt_srv.handle(SlowService)
        alt_port = alt_srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
        alt_host = "0.0.0.0:#{alt_port}"
        t = Thread.new { alt_srv.run }
        alt_srv.wait_till_running
        req = EchoMsg.new
        n = 20 # arbitrary, use as many to ensure the server pool is exceeded
        threads = []
        one_failed_as_unavailable = false
        n.times do
          threads << Thread.new do
            stub = SlowStub.new(alt_host, :this_channel_is_insecure)
            begin
              stub.an_rpc(req)
            rescue GRPC::ResourceExhausted
              one_failed_as_unavailable = true
            end
          end
        end
        threads.each(&:join)
        alt_srv.stop
        t.join
        expect(one_failed_as_unavailable).to be(true)
      end

      it 'should send a status UNKNOWN with a relevant message when the' \
        'servers response stream is not an enumerable' do
        @srv.handle(BidiService)
        t = Thread.new { @srv.run }
        @srv.wait_till_running
        stub = BidiStub.new(@host, :this_channel_is_insecure, **client_opts)
        responses = stub.server_sends_bad_input([])
        exception = nil
        begin
          responses.each { |r| r }
        rescue GRPC::Unknown => e
          exception = e
        end
        # Erroneous responses sent from the server handler should cause an
        # exception on the client with relevant info.
        expected_details = 'NoMethodError: undefined method `each\' for '\
          '"bad response. (not an enumerable, client sees an error)"'

        expect(exception.inspect.include?(expected_details)).to be true
        @srv.stop
        t.join
      end
    end

    context 'with connect metadata' do
      let(:test_md_proc) do
        proc do |mth, md|
          res = md.clone
          res['method'] = mth
          res['connect_k1'] = 'connect_v1'
          res
        end
      end
      before(:each) do
        server_opts = {
          poll_period: 1,
          connect_md_proc: test_md_proc
        }
        @srv = new_rpc_server_for_testing(**server_opts)
        alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
        @alt_host = "0.0.0.0:#{alt_port}"
      end

      it 'should send connect metadata to the client', server: true do
        service = EchoService.new
        @srv.handle(service)
        t = Thread.new { @srv.run }
        @srv.wait_till_running
        req = EchoMsg.new
        stub = EchoStub.new(@alt_host, :this_channel_is_insecure)
        op = stub.an_rpc(req, metadata: { k1: 'v1', k2: 'v2' }, return_op: true)
        expect(op.metadata).to be nil
        expect(op.execute).to be_a(EchoMsg)
        wanted_md = {
          'k1' => 'v1',
          'k2' => 'v2',
          'method' => '/EchoService/an_rpc',
          'connect_k1' => 'connect_v1'
        }
        wanted_md.each do |key, value|
          GRPC.logger.info("key: #{key}")
          expect(op.metadata[key]).to eq(value)
        end
        @srv.stop
        t.join
      end
    end

    context 'with trailing metadata' do
      before(:each) do
        server_opts = {
          poll_period: 1
        }
        @srv = new_rpc_server_for_testing(**server_opts)
        alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
        @alt_host = "0.0.0.0:#{alt_port}"
      end

      it 'should be added to BadStatus when requests fail', server: true do
        service = FailingService.new
        @srv.handle(service)
        t = Thread.new { @srv.run }
        @srv.wait_till_running
        req = EchoMsg.new
        stub = FailingStub.new(@alt_host, :this_channel_is_insecure)
        blk = proc { stub.an_rpc(req) }

        # confirm it raise the expected error
        expect(&blk).to raise_error GRPC::BadStatus

        # call again and confirm exception contained the trailing metadata.
        begin
          blk.call
        rescue GRPC::BadStatus => e
          expect(e.code).to eq(service.code)
          expect(e.details).to eq(service.details)
          expect(e.metadata).to eq(service.md)
        end
        @srv.stop
        t.join
      end

      it 'should be received by the client', server: true do
        wanted_trailers = { 'k1' => 'out_v1', 'k2' => 'out_v2' }
        service = EchoService.new(k1: 'out_v1', k2: 'out_v2')
        @srv.handle(service)
        t = Thread.new { @srv.run }
        @srv.wait_till_running
        req = EchoMsg.new
        stub = EchoStub.new(@alt_host, :this_channel_is_insecure)
        op = stub.an_rpc(req, return_op: true, metadata: { k1: 'v1', k2: 'v2' })
        expect(op.metadata).to be nil
        expect(op.execute).to be_a(EchoMsg)
        expect(op.trailing_metadata).to eq(wanted_trailers)
        @srv.stop
        t.join
      end
    end

    context 'when call objects are used after calls have completed' do
      before(:each) do
        server_opts = {
          poll_period: 1
        }
        @srv = new_rpc_server_for_testing(**server_opts)
        alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
        @alt_host = "0.0.0.0:#{alt_port}"

        @service = CheckCallAfterFinishedService.new
        @srv.handle(@service)
        @srv_thd  = Thread.new { @srv.run }
        @srv.wait_till_running
      end

      # check that the server-side call is still in a usable state even
      # after it has finished
      def check_single_req_view_of_finished_call(call)
        common_check_of_finished_server_call(call)

        expect(call.peer).to be_a(String)
        expect(call.peer_cert).to be(nil)
      end

      def check_multi_req_view_of_finished_call(call)
        common_check_of_finished_server_call(call)

        expect do
          call.each_remote_read.each { |r| p r }
        end.to raise_error(GRPC::Core::CallError)
      end

      def common_check_of_finished_server_call(call)
        expect do
          call.merge_metadata_to_send({})
        end.to raise_error(RuntimeError)

        expect do
          call.send_initial_metadata
        end.to_not raise_error

        expect(call.cancelled?).to be(false)
        expect(call.metadata).to be_a(Hash)
        expect(call.metadata['user-agent']).to be_a(String)

        expect(call.metadata_sent).to be(true)
        expect(call.output_metadata).to eq({})
        expect(call.metadata_to_send).to eq({})
        expect(call.deadline.is_a?(Time)).to be(true)
      end

      it 'should not crash when call used after an unary call is finished' do
        req = EchoMsg.new
        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
                                                     :this_channel_is_insecure)
        resp = stub.an_rpc(req)
        expect(resp).to be_a(EchoMsg)
        @srv.stop
        @srv_thd.join

        check_single_req_view_of_finished_call(@service.server_side_call)
      end

      it 'should not crash when call used after client streaming finished' do
        requests = [EchoMsg.new, EchoMsg.new]
        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
                                                     :this_channel_is_insecure)
        resp = stub.a_client_streaming_rpc(requests)
        expect(resp).to be_a(EchoMsg)
        @srv.stop
        @srv_thd.join

        check_multi_req_view_of_finished_call(@service.server_side_call)
      end

      it 'should not crash when call used after server streaming finished' do
        req = EchoMsg.new
        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
                                                     :this_channel_is_insecure)
        responses = stub.a_server_streaming_rpc(req)
        responses.each do |r|
          expect(r).to be_a(EchoMsg)
        end
        @srv.stop
        @srv_thd.join

        check_single_req_view_of_finished_call(@service.server_side_call)
      end

      it 'should not crash when call used after a bidi call is finished' do
        requests = [EchoMsg.new, EchoMsg.new]
        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
                                                     :this_channel_is_insecure)
        responses = stub.a_bidi_rpc(requests)
        responses.each do |r|
          expect(r).to be_a(EchoMsg)
        end
        @srv.stop
        @srv_thd.join

        check_multi_req_view_of_finished_call(@service.server_side_call)
      end
    end
  end
end