#!/usr/bin/env python
#
# Copyright 2009, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests for handshake module."""
import unittest
import config # This must be imported before mod_pywebsocket.
from mod_pywebsocket import handshake
import mock
_GOOD_REQUEST = (
80,
'/demo',
{
'Upgrade':'WebSocket',
'Connection':'Upgrade',
'Host':'example.com',
'Origin':'http://example.com',
'WebSocket-Protocol':'sample',
}
)
_GOOD_RESPONSE_DEFAULT_PORT = (
'HTTP/1.1 101 Web Socket Protocol Handshake\r\n'
'Upgrade: WebSocket\r\n'
'Connection: Upgrade\r\n'
'WebSocket-Origin: http://example.com\r\n'
'WebSocket-Location: ws://example.com/demo\r\n'
'WebSocket-Protocol: sample\r\n'
'\r\n')
_GOOD_RESPONSE_SECURE = (
'HTTP/1.1 101 Web Socket Protocol Handshake\r\n'
'Upgrade: WebSocket\r\n'
'Connection: Upgrade\r\n'
'WebSocket-Origin: http://example.com\r\n'
'WebSocket-Location: wss://example.com/demo\r\n'
'WebSocket-Protocol: sample\r\n'
'\r\n')
_GOOD_REQUEST_NONDEFAULT_PORT = (
8081,
'/demo',
{
'Upgrade':'WebSocket',
'Connection':'Upgrade',
'Host':'example.com:8081',
'Origin':'http://example.com',
'WebSocket-Protocol':'sample',
}
)
_GOOD_RESPONSE_NONDEFAULT_PORT = (
'HTTP/1.1 101 Web Socket Protocol Handshake\r\n'
'Upgrade: WebSocket\r\n'
'Connection: Upgrade\r\n'
'WebSocket-Origin: http://example.com\r\n'
'WebSocket-Location: ws://example.com:8081/demo\r\n'
'WebSocket-Protocol: sample\r\n'
'\r\n')
_GOOD_RESPONSE_SECURE_NONDEF = (
'HTTP/1.1 101 Web Socket Protocol Handshake\r\n'
'Upgrade: WebSocket\r\n'
'Connection: Upgrade\r\n'
'WebSocket-Origin: http://example.com\r\n'
'WebSocket-Location: wss://example.com:8081/demo\r\n'
'WebSocket-Protocol: sample\r\n'
'\r\n')
_GOOD_REQUEST_NO_PROTOCOL = (
80,
'/demo',
{
'Upgrade':'WebSocket',
'Connection':'Upgrade',
'Host':'example.com',
'Origin':'http://example.com',
}
)
_GOOD_RESPONSE_NO_PROTOCOL = (
'HTTP/1.1 101 Web Socket Protocol Handshake\r\n'
'Upgrade: WebSocket\r\n'
'Connection: Upgrade\r\n'
'WebSocket-Origin: http://example.com\r\n'
'WebSocket-Location: ws://example.com/demo\r\n'
'\r\n')
_GOOD_REQUEST_WITH_OPTIONAL_HEADERS = (
80,
'/demo',
{
'Upgrade':'WebSocket',
'Connection':'Upgrade',
'Host':'example.com',
'Origin':'http://example.com',
'WebSocket-Protocol':'sample',
'AKey':'AValue',
'EmptyValue':'',
}
)
_BAD_REQUESTS = (
( # HTTP request
80,
'/demo',
{
'Host':'www.google.com',
'User-Agent':'Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10.5;'
' en-US; rv:1.9.1.3) Gecko/20090824 Firefox/3.5.3'
' GTB6 GTBA',
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,'
'*/*;q=0.8',
'Accept-Language':'en-us,en;q=0.5',
'Accept-Encoding':'gzip,deflate',
'Accept-Charset':'ISO-8859-1,utf-8;q=0.7,*;q=0.7',
'Keep-Alive':'300',
'Connection':'keep-alive',
}
),
( # Missing Upgrade
80,
'/demo',
{
'Connection':'Upgrade',
'Host':'example.com',
'Origin':'http://example.com',
'WebSocket-Protocol':'sample',
}
),
( # Wrong Upgrade
80,
'/demo',
{
'Upgrade':'NonWebSocket',
'Connection':'Upgrade',
'Host':'example.com',
'Origin':'http://example.com',
'WebSocket-Protocol':'sample',
}
),
( # Empty WebSocket-Protocol
80,
'/demo',
{
'Upgrade':'WebSocket',
'Connection':'Upgrade',
'Host':'example.com',
'Origin':'http://example.com',
'WebSocket-Protocol':'',
}
),
( # Wrong port number format
80,
'/demo',
{
'Upgrade':'WebSocket',
'Connection':'Upgrade',
'Host':'example.com:0x50',
'Origin':'http://example.com',
'WebSocket-Protocol':'sample',
}
),
( # Header/connection port mismatch
8080,
'/demo',
{
'Upgrade':'WebSocket',
'Connection':'Upgrade',
'Host':'example.com',
'Origin':'http://example.com',
'WebSocket-Protocol':'sample',
}
),
( # Illegal WebSocket-Protocol
80,
'/demo',
{
'Upgrade':'WebSocket',
'Connection':'Upgrade',
'Host':'example.com',
'Origin':'http://example.com',
'WebSocket-Protocol':'illegal\x09protocol',
}
),
)
_STRICTLY_GOOD_REQUESTS = (
(
'GET /demo HTTP/1.1\r\n',
'Upgrade: WebSocket\r\n',
'Connection: Upgrade\r\n',
'Host: example.com\r\n',
'Origin: http://example.com\r\n',
'\r\n',
),
( # WebSocket-Protocol
'GET /demo HTTP/1.1\r\n',
'Upgrade: WebSocket\r\n',
'Connection: Upgrade\r\n',
'Host: example.com\r\n',
'Origin: http://example.com\r\n',
'WebSocket-Protocol: sample\r\n',
'\r\n',
),
( # WebSocket-Protocol and Cookie
'GET /demo HTTP/1.1\r\n',
'Upgrade: WebSocket\r\n',
'Connection: Upgrade\r\n',
'Host: example.com\r\n',
'Origin: http://example.com\r\n',
'WebSocket-Protocol: sample\r\n',
'Cookie: xyz\r\n'
'\r\n',
),
( # Cookie
'GET /demo HTTP/1.1\r\n',
'Upgrade: WebSocket\r\n',
'Connection: Upgrade\r\n',
'Host: example.com\r\n',
'Origin: http://example.com\r\n',
'Cookie: abc/xyz\r\n'
'Cookie2: $Version=1\r\n'
'Cookie: abc\r\n'
'\r\n',
),
(
'GET / HTTP/1.1\r\n',
'Upgrade: WebSocket\r\n',
'Connection: Upgrade\r\n',
'Host: example.com\r\n',
'Origin: http://example.com\r\n',
'\r\n',
),
)
_NOT_STRICTLY_GOOD_REQUESTS = (
( # Extra space after GET
'GET /demo HTTP/1.1\r\n',
'Upgrade: WebSocket\r\n',
'Connection: Upgrade\r\n',
'Host: example.com\r\n',
'Origin: http://example.com\r\n',
'\r\n',
),
( # Resource name doesn't stat with '/'
'GET demo HTTP/1.1\r\n',
'Upgrade: WebSocket\r\n',
'Connection: Upgrade\r\n',
'Host: example.com\r\n',
'Origin: http://example.com\r\n',
'\r\n',
),
( # No space after :
'GET /demo HTTP/1.1\r\n',
'Upgrade:WebSocket\r\n',
'Connection: Upgrade\r\n',
'Host: example.com\r\n',
'Origin: http://example.com\r\n',
'\r\n',
),
( # Lower case Upgrade header
'GET /demo HTTP/1.1\r\n',
'upgrade: WebSocket\r\n',
'Connection: Upgrade\r\n',
'Host: example.com\r\n',
'Origin: http://example.com\r\n',
'\r\n',
),
( # Connection comes before Upgrade
'GET /demo HTTP/1.1\r\n',
'Connection: Upgrade\r\n',
'Upgrade: WebSocket\r\n',
'Host: example.com\r\n',
'Origin: http://example.com\r\n',
'\r\n',
),
( # Origin comes before Host
'GET /demo HTTP/1.1\r\n',
'Upgrade: WebSocket\r\n',
'Connection: Upgrade\r\n',
'Origin: http://example.com\r\n',
'Host: example.com\r\n',
'\r\n',
),
( # Host continued to the next line
'GET /demo HTTP/1.1\r\n',
'Upgrade: WebSocket\r\n',
'Connection: Upgrade\r\n',
'Host: example\r\n',
' .com\r\n',
'Origin: http://example.com\r\n',
'\r\n',
),
( # Cookie comes before WebSocket-Protocol
'GET /demo HTTP/1.1\r\n',
'Upgrade: WebSocket\r\n',
'Connection: Upgrade\r\n',
'Host: example.com\r\n',
'Origin: http://example.com\r\n',
'Cookie: xyz\r\n'
'WebSocket-Protocol: sample\r\n',
'\r\n',
),
( # Unknown header
'GET /demo HTTP/1.1\r\n',
'Upgrade: WebSocket\r\n',
'Connection: Upgrade\r\n',
'Host: example.com\r\n',
'Origin: http://example.com\r\n',
'Content-Type: text/html\r\n'
'\r\n',
),
( # Cookie with continuation lines
'GET /demo HTTP/1.1\r\n',
'Upgrade: WebSocket\r\n',
'Connection: Upgrade\r\n',
'Host: example.com\r\n',
'Origin: http://example.com\r\n',
'Cookie: xyz\r\n',
' abc\r\n',
' defg\r\n',
'\r\n',
),
( # Wrong-case cookie
'GET /demo HTTP/1.1\r\n',
'Upgrade: WebSocket\r\n',
'Connection: Upgrade\r\n',
'Host: example.com\r\n',
'Origin: http://example.com\r\n',
'cookie: abc/xyz\r\n'
'\r\n',
),
( # Cookie, no space after colon
'GET /demo HTTP/1.1\r\n',
'Upgrade: WebSocket\r\n',
'Connection: Upgrade\r\n',
'Host: example.com\r\n',
'Origin: http://example.com\r\n',
'Cookie:abc/xyz\r\n'
'\r\n',
),
)
def _create_request(request_def):
conn = mock.MockConn('')
conn.local_addr = ('0.0.0.0', request_def[0])
return mock.MockRequest(
uri=request_def[1],
headers_in=request_def[2],
connection=conn)
def _create_get_memorized_lines(lines):
def get_memorized_lines():
return lines
return get_memorized_lines
def _create_requests_with_lines(request_lines_set):
requests = []
for lines in request_lines_set:
request = _create_request(_GOOD_REQUEST)
request.connection.get_memorized_lines = _create_get_memorized_lines(
lines)
requests.append(request)
return requests
class HandshakerTest(unittest.TestCase):
def test_validate_protocol(self):
handshake._validate_protocol('sample') # should succeed.
handshake._validate_protocol('Sample') # should succeed.
handshake._validate_protocol('sample\x20protocol') # should succeed.
handshake._validate_protocol('sample\x7eprotocol') # should succeed.
self.assertRaises(handshake.HandshakeError,
handshake._validate_protocol,
'')
self.assertRaises(handshake.HandshakeError,
handshake._validate_protocol,
'sample\x19protocol')
self.assertRaises(handshake.HandshakeError,
handshake._validate_protocol,
'sample\x7fprotocol')
self.assertRaises(handshake.HandshakeError,
handshake._validate_protocol,
# "Japan" in Japanese
u'\u65e5\u672c')
def test_good_request_default_port(self):
request = _create_request(_GOOD_REQUEST)
handshaker = handshake.Handshaker(request,
mock.MockDispatcher())
handshaker.do_handshake()
self.assertEqual(_GOOD_RESPONSE_DEFAULT_PORT,
request.connection.written_data())
self.assertEqual('/demo', request.ws_resource)
self.assertEqual('http://example.com', request.ws_origin)
self.assertEqual('ws://example.com/demo', request.ws_location)
self.assertEqual('sample', request.ws_protocol)
def test_good_request_secure_default_port(self):
request = _create_request(_GOOD_REQUEST)
request.connection.local_addr = ('0.0.0.0', 443)
request.is_https_ = True
handshaker = handshake.Handshaker(request,
mock.MockDispatcher())
handshaker.do_handshake()
self.assertEqual(_GOOD_RESPONSE_SECURE,
request.connection.written_data())
self.assertEqual('sample', request.ws_protocol)
def test_good_request_nondefault_port(self):
request = _create_request(_GOOD_REQUEST_NONDEFAULT_PORT)
handshaker = handshake.Handshaker(request,
mock.MockDispatcher())
handshaker.do_handshake()
self.assertEqual(_GOOD_RESPONSE_NONDEFAULT_PORT,
request.connection.written_data())
self.assertEqual('sample', request.ws_protocol)
def test_good_request_secure_non_default_port(self):
request = _create_request(_GOOD_REQUEST_NONDEFAULT_PORT)
request.is_https_ = True
handshaker = handshake.Handshaker(request,
mock.MockDispatcher())
handshaker.do_handshake()
self.assertEqual(_GOOD_RESPONSE_SECURE_NONDEF,
request.connection.written_data())
self.assertEqual('sample', request.ws_protocol)
def test_good_request_default_no_protocol(self):
request = _create_request(_GOOD_REQUEST_NO_PROTOCOL)
handshaker = handshake.Handshaker(request,
mock.MockDispatcher())
handshaker.do_handshake()
self.assertEqual(_GOOD_RESPONSE_NO_PROTOCOL,
request.connection.written_data())
self.assertEqual(None, request.ws_protocol)
def test_good_request_optional_headers(self):
request = _create_request(_GOOD_REQUEST_WITH_OPTIONAL_HEADERS)
handshaker = handshake.Handshaker(request,
mock.MockDispatcher())
handshaker.do_handshake()
self.assertEqual('AValue',
request.headers_in['AKey'])
self.assertEqual('',
request.headers_in['EmptyValue'])
def test_bad_requests(self):
for request in map(_create_request, _BAD_REQUESTS):
handshaker = handshake.Handshaker(request,
mock.MockDispatcher())
self.assertRaises(handshake.HandshakeError, handshaker.do_handshake)
def test_strictly_good_requests(self):
for request in _create_requests_with_lines(_STRICTLY_GOOD_REQUESTS):
strict_handshaker = handshake.Handshaker(request,
mock.MockDispatcher(),
True)
strict_handshaker.do_handshake()
def test_not_strictly_good_requests(self):
for request in _create_requests_with_lines(_NOT_STRICTLY_GOOD_REQUESTS):
strict_handshaker = handshake.Handshaker(request,
mock.MockDispatcher(),
True)
self.assertRaises(handshake.HandshakeError,
strict_handshaker.do_handshake)
if __name__ == '__main__':
unittest.main()
# vi:sts=4 sw=4 et