"""
Test suite for SocketServer.py.
"""
import contextlib
import imp
import os
import select
import signal
import socket
import select
import errno
import tempfile
import unittest
import SocketServer
import test.test_support
from test.test_support import reap_children, reap_threads, verbose
try:
import threading
except ImportError:
threading = None
test.test_support.requires("network")
TEST_STR = "hello world\n"
HOST = test.test_support.HOST
HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"
def signal_alarm(n):
"""Call signal.alarm when it exists (i.e. not on Windows)."""
if hasattr(signal, 'alarm'):
signal.alarm(n)
# Remember real select() to avoid interferences with mocking
_real_select = select.select
def receive(sock, n, timeout=20):
r, w, x = _real_select([sock], [], [], timeout)
if sock in r:
return sock.recv(n)
else:
raise RuntimeError, "timed out on %r" % (sock,)
if HAVE_UNIX_SOCKETS:
class ForkingUnixStreamServer(SocketServer.ForkingMixIn,
SocketServer.UnixStreamServer):
pass
class ForkingUnixDatagramServer(SocketServer.ForkingMixIn,
SocketServer.UnixDatagramServer):
pass
@contextlib.contextmanager
def simple_subprocess(testcase):
pid = os.fork()
if pid == 0:
# Don't raise an exception; it would be caught by the test harness.
os._exit(72)
yield None
pid2, status = os.waitpid(pid, 0)
testcase.assertEqual(pid2, pid)
testcase.assertEqual(72 << 8, status)
@unittest.skipUnless(threading, 'Threading required for this test.')
class SocketServerTest(unittest.TestCase):
"""Test all socket servers."""
def setUp(self):
signal_alarm(60) # Kill deadlocks after 60 seconds.
self.port_seed = 0
self.test_files = []
def tearDown(self):
signal_alarm(0) # Didn't deadlock.
reap_children()
for fn in self.test_files:
try:
os.remove(fn)
except os.error:
pass
self.test_files[:] = []
def pickaddr(self, proto):
if proto == socket.AF_INET:
return (HOST, 0)
else:
# XXX: We need a way to tell AF_UNIX to pick its own name
# like AF_INET provides port==0.
dir = None
if os.name == 'os2':
dir = '\socket'
fn = tempfile.mktemp(prefix='unix_socket.', dir=dir)
if os.name == 'os2':
# AF_UNIX socket names on OS/2 require a specific prefix
# which can't include a drive letter and must also use
# backslashes as directory separators
if fn[1] == ':':
fn = fn[2:]
if fn[0] in (os.sep, os.altsep):
fn = fn[1:]
if os.sep == '/':
fn = fn.replace(os.sep, os.altsep)
else:
fn = fn.replace(os.altsep, os.sep)
self.test_files.append(fn)
return fn
def make_server(self, addr, svrcls, hdlrbase):
class MyServer(svrcls):
def handle_error(self, request, client_address):
self.close_request(request)
self.server_close()
raise
class MyHandler(hdlrbase):
def handle(self):
line = self.rfile.readline()
self.wfile.write(line)
if verbose: print "creating server"
server = MyServer(addr, MyHandler)
self.assertEqual(server.server_address, server.socket.getsockname())
return server
@reap_threads
def run_server(self, svrcls, hdlrbase, testfunc):
server = self.make_server(self.pickaddr(svrcls.address_family),
svrcls, hdlrbase)
# We had the OS pick a port, so pull the real address out of
# the server.
addr = server.server_address
if verbose:
print "server created"
print "ADDR =", addr
print "CLASS =", svrcls
t = threading.Thread(
name='%s serving' % svrcls,
target=server.serve_forever,
# Short poll interval to make the test finish quickly.
# Time between requests is short enough that we won't wake
# up spuriously too many times.
kwargs={'poll_interval':0.01})
t.daemon = True # In case this function raises.
t.start()
if verbose: print "server running"
for i in range(3):
if verbose: print "test client", i
testfunc(svrcls.address_family, addr)
if verbose: print "waiting for server"
server.shutdown()
t.join()
if verbose: print "done"
def stream_examine(self, proto, addr):
s = socket.socket(proto, socket.SOCK_STREAM)
s.connect(addr)
s.sendall(TEST_STR)
buf = data = receive(s, 100)
while data and '\n' not in buf:
data = receive(s, 100)
buf += data
self.assertEqual(buf, TEST_STR)
s.close()
def dgram_examine(self, proto, addr):
s = socket.socket(proto, socket.SOCK_DGRAM)
s.sendto(TEST_STR, addr)
buf = data = receive(s, 100)
while data and '\n' not in buf:
data = receive(s, 100)
buf += data
self.assertEqual(buf, TEST_STR)
s.close()
def test_TCPServer(self):
self.run_server(SocketServer.TCPServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
def test_ThreadingTCPServer(self):
self.run_server(SocketServer.ThreadingTCPServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
if HAVE_FORKING:
def test_ForkingTCPServer(self):
with simple_subprocess(self):
self.run_server(SocketServer.ForkingTCPServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
if HAVE_UNIX_SOCKETS:
def test_UnixStreamServer(self):
self.run_server(SocketServer.UnixStreamServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
def test_ThreadingUnixStreamServer(self):
self.run_server(SocketServer.ThreadingUnixStreamServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
if HAVE_FORKING:
def test_ForkingUnixStreamServer(self):
with simple_subprocess(self):
self.run_server(ForkingUnixStreamServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
def test_UDPServer(self):
self.run_server(SocketServer.UDPServer,
SocketServer.DatagramRequestHandler,
self.dgram_examine)
def test_ThreadingUDPServer(self):
self.run_server(SocketServer.ThreadingUDPServer,
SocketServer.DatagramRequestHandler,
self.dgram_examine)
if HAVE_FORKING:
def test_ForkingUDPServer(self):
with simple_subprocess(self):
self.run_server(SocketServer.ForkingUDPServer,
SocketServer.DatagramRequestHandler,
self.dgram_examine)
@contextlib.contextmanager
def mocked_select_module(self):
"""Mocks the select.select() call to raise EINTR for first call"""
old_select = select.select
class MockSelect:
def __init__(self):
self.called = 0
def __call__(self, *args):
self.called += 1
if self.called == 1:
# raise the exception on first call
raise select.error(errno.EINTR, os.strerror(errno.EINTR))
else:
# Return real select value for consecutive calls
return old_select(*args)
select.select = MockSelect()
try:
yield select.select
finally:
select.select = old_select
def test_InterruptServerSelectCall(self):
with self.mocked_select_module() as mock_select:
pid = self.run_server(SocketServer.TCPServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
# Make sure select was called again:
self.assertGreater(mock_select.called, 1)
# Alas, on Linux (at least) recvfrom() doesn't return a meaningful
# client address so this cannot work:
# if HAVE_UNIX_SOCKETS:
# def test_UnixDatagramServer(self):
# self.run_server(SocketServer.UnixDatagramServer,
# SocketServer.DatagramRequestHandler,
# self.dgram_examine)
#
# def test_ThreadingUnixDatagramServer(self):
# self.run_server(SocketServer.ThreadingUnixDatagramServer,
# SocketServer.DatagramRequestHandler,
# self.dgram_examine)
#
# if HAVE_FORKING:
# def test_ForkingUnixDatagramServer(self):
# self.run_server(SocketServer.ForkingUnixDatagramServer,
# SocketServer.DatagramRequestHandler,
# self.dgram_examine)
@reap_threads
def test_shutdown(self):
# Issue #2302: shutdown() should always succeed in making an
# other thread leave serve_forever().
class MyServer(SocketServer.TCPServer):
pass
class MyHandler(SocketServer.StreamRequestHandler):
pass
threads = []
for i in range(20):
s = MyServer((HOST, 0), MyHandler)
t = threading.Thread(
name='MyServer serving',
target=s.serve_forever,
kwargs={'poll_interval':0.01})
t.daemon = True # In case this function raises.
threads.append((t, s))
for t, s in threads:
t.start()
s.shutdown()
for t, s in threads:
t.join()
def test_main():
if imp.lock_held():
# If the import lock is held, the threads will hang
raise unittest.SkipTest("can't run when import lock is held")
test.test_support.run_unittest(SocketServerTest)
if __name__ == "__main__":
test_main()