#!/usr/bin/env python
#
# Copyright 2012, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Test for end-to-end."""
import logging
import os
import signal
import socket
import subprocess
import sys
import time
import unittest
import set_sys_path # Update sys.path to locate mod_pywebsocket module.
from test import client_for_testing
from test import mux_client_for_testing
# Special message that tells the echo server to start closing handshake
_GOODBYE_MESSAGE = 'Goodbye'
# If you want to use external server to run end to end tests, set following
# parameters correctly.
_use_external_server = False
_external_server_port = 0
# Test body functions
def _echo_check_procedure(client):
client.connect()
client.send_message('test')
client.assert_receive('test')
client.send_message('helloworld')
client.assert_receive('helloworld')
client.send_close()
client.assert_receive_close()
client.assert_connection_closed()
def _echo_check_procedure_with_binary(client):
client.connect()
client.send_message('binary', binary=True)
client.assert_receive('binary', binary=True)
client.send_message('\x00\x80\xfe\xff\x00\x80', binary=True)
client.assert_receive('\x00\x80\xfe\xff\x00\x80', binary=True)
client.send_close()
client.assert_receive_close()
client.assert_connection_closed()
def _echo_check_procedure_with_goodbye(client):
client.connect()
client.send_message('test')
client.assert_receive('test')
client.send_message(_GOODBYE_MESSAGE)
client.assert_receive(_GOODBYE_MESSAGE)
client.assert_receive_close()
client.send_close()
client.assert_connection_closed()
def _echo_check_procedure_with_code_and_reason(client, code, reason):
client.connect()
client.send_close(code, reason)
client.assert_receive_close(code, reason)
client.assert_connection_closed()
def _unmasked_frame_check_procedure(client):
client.connect()
client.send_message('test', mask=False)
client.assert_receive_close(client_for_testing.STATUS_PROTOCOL_ERROR, '')
client.assert_connection_closed()
def _mux_echo_check_procedure(mux_client):
mux_client.connect()
mux_client.send_flow_control(1, 1024)
logical_channel_options = client_for_testing.ClientOptions()
logical_channel_options.server_host = 'localhost'
logical_channel_options.server_port = 80
logical_channel_options.origin = 'http://localhost'
logical_channel_options.resource = '/echo'
mux_client.add_channel(2, logical_channel_options)
mux_client.send_flow_control(2, 1024)
mux_client.send_message(2, 'test')
mux_client.assert_receive(2, 'test')
mux_client.add_channel(3, logical_channel_options)
mux_client.send_flow_control(3, 1024)
mux_client.send_message(2, 'hello')
mux_client.send_message(3, 'world')
mux_client.assert_receive(2, 'hello')
mux_client.assert_receive(3, 'world')
# Don't send close message on channel id 1 so that server-initiated
# closing handshake won't occur.
mux_client.send_close(2)
mux_client.send_close(3)
mux_client.assert_receive_close(2)
mux_client.assert_receive_close(3)
mux_client.send_physical_connection_close()
mux_client.assert_physical_connection_receive_close()
class EndToEndTest(unittest.TestCase):
"""An end-to-end test that launches pywebsocket standalone server as a
separate process, connects to it using the client_for_testing module, and
checks if the server behaves correctly by exchanging opening handshake and
frames over a TCP connection.
"""
def setUp(self):
self.server_stderr = None
self.top_dir = os.path.join(os.path.split(__file__)[0], '..')
os.putenv('PYTHONPATH', os.path.pathsep.join(sys.path))
self.standalone_command = os.path.join(
self.top_dir, 'mod_pywebsocket', 'standalone.py')
self.document_root = os.path.join(self.top_dir, 'example')
s = socket.socket()
s.bind(('localhost', 0))
(_, self.test_port) = s.getsockname()
s.close()
self._options = client_for_testing.ClientOptions()
self._options.server_host = 'localhost'
self._options.origin = 'http://localhost'
self._options.resource = '/echo'
# TODO(toyoshim): Eliminate launching a standalone server on using
# external server.
if _use_external_server:
self._options.server_port = _external_server_port
else:
self._options.server_port = self.test_port
def _run_python_command(self, commandline, stdout=None, stderr=None):
return subprocess.Popen([sys.executable] + commandline, close_fds=True,
stdout=stdout, stderr=stderr)
def _run_server(self, allow_draft75=False):
args = [self.standalone_command,
'-H', 'localhost',
'-V', 'localhost',
'-p', str(self.test_port),
'-P', str(self.test_port),
'-d', self.document_root]
# Inherit the level set to the root logger by test runner.
root_logger = logging.getLogger()
log_level = root_logger.getEffectiveLevel()
if log_level != logging.NOTSET:
args.append('--log-level')
args.append(logging.getLevelName(log_level).lower())
if allow_draft75:
args.append('--allow-draft75')
return self._run_python_command(args,
stderr=self.server_stderr)
def _kill_process(self, pid):
if sys.platform in ('win32', 'cygwin'):
subprocess.call(
('taskkill.exe', '/f', '/pid', str(pid)), close_fds=True)
else:
os.kill(pid, signal.SIGKILL)
def _run_hybi_test_with_client_options(self, test_function, options):
server = self._run_server()
try:
# TODO(tyoshino): add some logic to poll the server until it
# becomes ready
time.sleep(0.2)
client = client_for_testing.create_client(options)
try:
test_function(client)
finally:
client.close_socket()
finally:
self._kill_process(server.pid)
def _run_hybi_test(self, test_function):
self._run_hybi_test_with_client_options(test_function, self._options)
def _run_hybi_deflate_test(self, test_function):
server = self._run_server()
try:
time.sleep(0.2)
self._options.enable_deflate_stream()
client = client_for_testing.create_client(self._options)
try:
test_function(client)
finally:
client.close_socket()
finally:
self._kill_process(server.pid)
def _run_hybi_deflate_frame_test(self, test_function):
server = self._run_server()
try:
time.sleep(0.2)
self._options.enable_deflate_frame()
client = client_for_testing.create_client(self._options)
try:
test_function(client)
finally:
client.close_socket()
finally:
self._kill_process(server.pid)
def _run_hybi_close_with_code_and_reason_test(self, test_function, code,
reason):
server = self._run_server()
try:
time.sleep(0.2)
client = client_for_testing.create_client(self._options)
try:
test_function(client, code, reason)
finally:
client.close_socket()
finally:
self._kill_process(server.pid)
def _run_hybi_http_fallback_test(self, options, status):
server = self._run_server()
try:
time.sleep(0.2)
client = client_for_testing.create_client(options)
try:
client.connect()
self.fail('Could not catch HttpStatusException')
except client_for_testing.HttpStatusException, e:
self.assertEqual(status, e.status)
except Exception, e:
self.fail('Catch unexpected exception')
finally:
client.close_socket()
finally:
self._kill_process(server.pid)
def _run_hybi_mux_test(self, test_function):
server = self._run_server()
try:
time.sleep(0.2)
client = mux_client_for_testing.MuxClient(self._options)
try:
test_function(client)
finally:
client.close_socket()
finally:
self._kill_process(server.pid)
def test_echo(self):
self._run_hybi_test(_echo_check_procedure)
def test_echo_binary(self):
self._run_hybi_test(_echo_check_procedure_with_binary)
def test_echo_server_close(self):
self._run_hybi_test(_echo_check_procedure_with_goodbye)
def test_unmasked_frame(self):
self._run_hybi_test(_unmasked_frame_check_procedure)
def test_echo_deflate(self):
self._run_hybi_deflate_test(_echo_check_procedure)
def test_echo_deflate_server_close(self):
self._run_hybi_deflate_test(_echo_check_procedure_with_goodbye)
def test_echo_deflate_frame(self):
self._run_hybi_deflate_frame_test(_echo_check_procedure)
def test_echo_deflate_frame_server_close(self):
self._run_hybi_deflate_frame_test(
_echo_check_procedure_with_goodbye)
def test_echo_close_with_code_and_reason(self):
self._options.resource = '/close'
self._run_hybi_close_with_code_and_reason_test(
_echo_check_procedure_with_code_and_reason, 3333, 'sunsunsunsun')
def test_echo_close_with_empty_body(self):
self._options.resource = '/close'
self._run_hybi_close_with_code_and_reason_test(
_echo_check_procedure_with_code_and_reason, None, '')
def test_mux_echo(self):
self._run_hybi_mux_test(_mux_echo_check_procedure)
def test_close_on_protocol_error(self):
"""Tests that the server sends a close frame with protocol error status
code when the client sends data with some protocol error.
"""
def test_function(client):
client.connect()
# Intermediate frame without any preceding start of fragmentation
# frame.
client.send_frame_of_arbitrary_bytes('\x80\x80', '')
client.assert_receive_close(
client_for_testing.STATUS_PROTOCOL_ERROR)
self._run_hybi_test(test_function)
def test_close_on_unsupported_frame(self):
"""Tests that the server sends a close frame with unsupported operation
status code when the client sends data asking some operation that is
not supported by the server.
"""
def test_function(client):
client.connect()
# Text frame with RSV3 bit raised.
client.send_frame_of_arbitrary_bytes('\x91\x80', '')
client.assert_receive_close(
client_for_testing.STATUS_UNSUPPORTED_DATA)
self._run_hybi_test(test_function)
def test_close_on_invalid_frame(self):
"""Tests that the server sends a close frame with invalid frame payload
data status code when the client sends an invalid frame like containing
invalid UTF-8 character.
"""
def test_function(client):
client.connect()
# Text frame with invalid UTF-8 string.
client.send_message('\x80', raw=True)
client.assert_receive_close(
client_for_testing.STATUS_INVALID_FRAME_PAYLOAD_DATA)
self._run_hybi_test(test_function)
def _run_hybi00_test(self, test_function):
server = self._run_server()
try:
time.sleep(0.2)
client = client_for_testing.create_client_hybi00(self._options)
try:
test_function(client)
finally:
client.close_socket()
finally:
self._kill_process(server.pid)
def test_echo_hybi00(self):
self._run_hybi00_test(_echo_check_procedure)
def test_echo_server_close_hybi00(self):
self._run_hybi00_test(_echo_check_procedure_with_goodbye)
def _run_hixie75_test(self, test_function):
server = self._run_server(allow_draft75=True)
try:
time.sleep(0.2)
client = client_for_testing.create_client_hixie75(self._options)
try:
test_function(client)
finally:
client.close_socket()
finally:
self._kill_process(server.pid)
def test_echo_hixie75(self):
"""Tests that the server can talk draft-hixie-thewebsocketprotocol-75
protocol.
"""
def test_function(client):
client.connect()
client.send_message('test')
client.assert_receive('test')
self._run_hixie75_test(test_function)
def test_echo_server_close_hixie75(self):
"""Tests that the server can talk draft-hixie-thewebsocketprotocol-75
protocol. At the end of message exchanging, the client sends a keyword
message that requests the server to close the connection, and then
checks if the connection is really closed.
"""
def test_function(client):
client.connect()
client.send_message('test')
client.assert_receive('test')
client.send_message(_GOODBYE_MESSAGE)
client.assert_receive(_GOODBYE_MESSAGE)
self._run_hixie75_test(test_function)
# TODO(toyoshim): Add tests to verify invalid absolute uri handling like
# host unmatch, port unmatch and invalid port description (':' without port
# number).
def test_absolute_uri(self):
"""Tests absolute uri request."""
options = self._options
options.resource = 'ws://localhost:%d/echo' % options.server_port
self._run_hybi_test_with_client_options(_echo_check_procedure, options)
def test_origin_check(self):
"""Tests http fallback on origin check fail."""
options = self._options
options.resource = '/origin_check'
# Server shows warning message for http 403 fallback. This warning
# message is confusing. Following pipe disposes warning messages.
self.server_stderr = subprocess.PIPE
self._run_hybi_http_fallback_test(options, 403)
def test_version_check(self):
"""Tests http fallback on version check fail."""
options = self._options
options.version = 99
self.server_stderr = subprocess.PIPE
self._run_hybi_http_fallback_test(options, 400)
def _check_example_echo_client_result(
self, expected, stdoutdata, stderrdata):
actual = stdoutdata.decode("utf-8")
if actual != expected:
raise Exception('Unexpected result on example echo client: '
'%r (expected) vs %r (actual)' %
(expected, actual))
if stderrdata is not None:
raise Exception('Unexpected error message on example echo '
'client: %r' % stderrdata)
def test_example_echo_client(self):
"""Tests that the echo_client.py example can talk with the server."""
server = self._run_server()
try:
time.sleep(0.2)
client_command = os.path.join(
self.top_dir, 'example', 'echo_client.py')
args = [client_command,
'-p', str(self._options.server_port)]
client = self._run_python_command(args, stdout=subprocess.PIPE)
stdoutdata, stderrdata = client.communicate()
expected = ('Send: Hello\n' 'Recv: Hello\n'
u'Send: \u65e5\u672c\n' u'Recv: \u65e5\u672c\n'
'Send close\n' 'Recv ack\n')
self._check_example_echo_client_result(
expected, stdoutdata, stderrdata)
# Process a big message for which extended payload length is used.
# To handle extended payload length, ws_version attribute will be
# accessed. This test checks that ws_version is correctly set.
big_message = 'a' * 1024
args = [client_command,
'-p', str(self._options.server_port),
'-m', big_message]
client = self._run_python_command(args, stdout=subprocess.PIPE)
stdoutdata, stderrdata = client.communicate()
expected = ('Send: %s\nRecv: %s\nSend close\nRecv ack\n' %
(big_message, big_message))
self._check_example_echo_client_result(
expected, stdoutdata, stderrdata)
finally:
self._kill_process(server.pid)
if __name__ == '__main__':
unittest.main()
# vi:sts=4 sw=4 et