# Copyright 2012 Google Inc. All Rights Reserved.
# Author: mrdmnd@ (Matt Redmond)
"""A unit test for sending data to Bartlett. Requires poster module."""
import cookielib
import os
import signal
import subprocess
import tempfile
import time
import unittest
import urllib2
from poster.encode import multipart_encode
from poster.streaminghttp import register_openers
SERVER_DIR = '../.'
SERVER_URL = 'http://localhost:8080/'
GET = '_ah/login?email=googler@google.com&action=Login&continue=%s'
AUTH_URL = SERVER_URL + GET
class ServerTest(unittest.TestCase):
"""A unit test for the bartlett server. Tests upload, serve, and delete."""
def setUp(self):
"""Instantiate the files and server needed to test upload functionality."""
self._server_proc = LaunchLocalServer()
self._jar = cookielib.LWPCookieJar()
self.opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._jar))
# We need these files to not delete when closed, because we have to reopen
# them in read mode after we write and close them.
self.profile_data = tempfile.NamedTemporaryFile(delete=False)
size = 16 * 1024
self.profile_data.write(os.urandom(size))
def tearDown(self):
self.profile_data.close()
os.remove(self.profile_data.name)
os.kill(self._server_proc.pid, signal.SIGINT)
def testIntegration(self): # pylint: disable-msg=C6409
key = self._testUpload()
self._testListAll()
self._testServeKey(key)
self._testDelKey(key)
def _testUpload(self): # pylint: disable-msg=C6409
register_openers()
data = {'profile_data': self.profile_data,
'board': 'x86-zgb',
'chromeos_version': '2409.0.2012_06_08_1114'}
datagen, headers = multipart_encode(data)
request = urllib2.Request(SERVER_URL + 'upload', datagen, headers)
response = urllib2.urlopen(request).read()
self.assertTrue(response)
return response
def _testListAll(self): # pylint: disable-msg=C6409
request = urllib2.Request(AUTH_URL % (SERVER_URL + 'serve'))
response = self.opener.open(request).read()
self.assertTrue(response)
def _testServeKey(self, key): # pylint: disable-msg=C6409
request = urllib2.Request(AUTH_URL % (SERVER_URL + 'serve/' + key))
response = self.opener.open(request).read()
self.assertTrue(response)
def _testDelKey(self, key): # pylint: disable-msg=C6409
# There is no response to a delete request.
# We will check the listAll page to ensure there is no data.
request = urllib2.Request(AUTH_URL % (SERVER_URL + 'del/' + key))
response = self.opener.open(request).read()
request = urllib2.Request(AUTH_URL % (SERVER_URL + 'serve'))
response = self.opener.open(request).read()
self.assertFalse(response)
def LaunchLocalServer():
"""Launch and store an authentication cookie with a local server."""
proc = subprocess.Popen(
['dev_appserver.py', '--clear_datastore', SERVER_DIR],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# Wait for server to come up
while True:
time.sleep(1)
try:
request = urllib2.Request(SERVER_URL + 'serve')
response = urllib2.urlopen(request).read()
if response:
break
except urllib2.URLError:
continue
return proc
if __name__ == '__main__':
unittest.main()