#!/usr/bin/env python
# Copyright (c) 2011 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
Stess Tests for Google Chrome.
This script runs 4 different stress tests:
1. Plugin stress.
2. Back and forward stress.
3. Download stress.
4. Preference stress.
After every cycle (running all 4 stress tests) it checks for crashes.
If there are any crashes, the script generates a report, uploads it to
a server and mails about the crash and the link to the report on the server.
Apart from this whenever the test stops on mac it looks for and reports
zombies.
Prerequisites:
Test needs the following files/folders in the Data dir.
1. A crash_report tool in "pyauto_private/stress/mac" folder for use on Mac.
2. A "downloads" folder containing stress_downloads and all the files
referenced in it.
3. A pref_dict file in "pyauto_private/stress/mac" folder.
4. A "plugin" folder containing doubleAnimation.xaml, flash.swf, FlashSpin.swf,
generic.html, get_flash_player.gif, js-invoker.swf, mediaplayer.wmv,
NavigatorTicker11.class, Plugins_page.html, sample5.mov, silverlight.xaml,
silverlight.js, embed.pdf, plugins_page.html and test6.swf.
5. A stress_pref file in "pyauto_private/stress".
"""
import commands
import glob
import logging
import os
import random
import re
import shutil
import sys
import time
import urllib
import test_utils
import subprocess
import pyauto_functional
import pyauto
import pyauto_utils
CRASHES = 'crashes' # Name of the folder to store crashes
class StressTest(pyauto.PyUITest):
"""Run all the stress tests."""
flash_url1 = pyauto.PyUITest.GetFileURLForPath(
os.path.join(pyauto.PyUITest.DataDir(), 'plugin', 'flash.swf'))
flash_url2 = pyauto.PyUITest.GetFileURLForPath(
os.path.join(pyauto.PyUITest.DataDir(),'plugin', 'js-invoker.swf'))
flash_url3 = pyauto.PyUITest.GetFileURLForPath(
os.path.join(pyauto.PyUITest.DataDir(), 'plugin', 'generic.html'))
plugin_url = pyauto.PyUITest.GetFileURLForPath(
os.path.join(pyauto.PyUITest.DataDir(), 'plugin', 'plugins_page.html'))
empty_url = pyauto.PyUITest.GetFileURLForPath(
os.path.join(pyauto.PyUITest.DataDir(), 'empty.html'))
download_url1 = pyauto.PyUITest.GetFileURLForPath(
os.path.join(pyauto.PyUITest.DataDir(), 'downloads', 'a_zip_file.zip'))
download_url2 = pyauto.PyUITest.GetFileURLForPath(
os.path.join(pyauto.PyUITest.DataDir(),'zip', 'test.zip'))
file_list = pyauto.PyUITest.EvalDataFrom(
os.path.join(pyauto.PyUITest.DataDir(), 'downloads', 'stress_downloads'))
symbols_dir = os.path.join(os.getcwd(), 'Build_Symbols')
stress_pref = pyauto.PyUITest.EvalDataFrom(
os.path.join(pyauto.PyUITest.DataDir(), 'pyauto_private', 'stress',
'stress_pref'))
breakpad_dir = None
chrome_version = None
bookmarks_list = []
def _FuncDir(self):
"""Returns the path to the functional dir chrome/test/functional."""
return os.path.dirname(__file__)
def _DownloadSymbols(self):
"""Downloads the symbols for the build being tested."""
download_location = os.path.join(os.getcwd(), 'Build_Symbols')
if os.path.exists(download_location):
shutil.rmtree(download_location)
os.makedirs(download_location)
url = self.stress_pref['symbols_dir'] + self.chrome_version
# TODO: Add linux symbol_files
if self.IsWin():
url = url + '/win/'
symbol_files = ['chrome.dll.pdb', 'chrome.exe.pdb']
elif self.IsMac():
url = url + '/mac/'
symbol_files = map(urllib.quote,
['Google Chrome Framework.framework',
'Google Chrome Helper.app',
'Google Chrome.app',
'crash_inspector',
'crash_report_sender',
'ffmpegsumo.so',
'libplugin_carbon_interpose.dylib'])
index = 0
symbol_files = ['%s-%s-i386.breakpad' % (sym_file, self.chrome_version) \
for sym_file in symbol_files]
logging.info(symbol_files)
for sym_file in symbol_files:
sym_url = url + sym_file
logging.info(sym_url)
download_sym_file = os.path.join(download_location, sym_file)
logging.info(download_sym_file)
urllib.urlretrieve(sym_url, download_sym_file)
def setUp(self):
pyauto.PyUITest.setUp(self)
self.breakpad_dir = self._CrashDumpFolder()
self.chrome_version = self.GetBrowserInfo()['properties']['ChromeVersion']
# Plugin stress functions
def _CheckForPluginProcess(self, plugin_name):
"""Checks if a particular plugin process exists.
Args:
plugin_name : plugin process which should be running.
"""
process = self.GetBrowserInfo()['child_processes']
self.assertTrue([x for x in process
if x['type'] == 'Plug-in' and
x['name'] == plugin_name])
def _GetPluginProcessId(self, plugin_name):
"""Get Plugin process id.
Args:
plugin_name: Plugin whose pid is expected.
Eg: "Shockwave Flash"
Returns:
Process id if the plugin process is running.
None otherwise.
"""
for process in self.GetBrowserInfo()['child_processes']:
if process['type'] == 'Plug-in' and \
re.search(plugin_name, process['name']):
return process['pid']
return None
def _CloseAllTabs(self):
"""Close all but one tab in first window."""
tab_count = self.GetTabCount(0)
for tab_index in xrange(tab_count - 1, 0, -1):
self.CloseTab(tab_index)
def _CloseAllWindows(self):
"""Close all windows except one."""
win_count = self.GetBrowserWindowCount()
for windex in xrange(win_count - 1, 0, -1):
self.RunCommand(pyauto.IDC_CLOSE_WINDOW, windex)
def _ReloadAllTabs(self):
"""Reload all the tabs in first window."""
for tab_index in range(self.GetTabCount()):
self.ReloadTab(tab_index)
def _LoadFlashInMultipleTabs(self):
"""Load Flash in multiple tabs in first window."""
self.NavigateToURL(self.empty_url)
# Open 18 tabs with flash
for _ in range(9):
self.AppendTab(pyauto.GURL(self.flash_url1))
self.AppendTab(pyauto.GURL(self.flash_url2))
def _OpenAndCloseMultipleTabsWithFlash(self):
"""Stress test for flash in multiple tabs."""
logging.info("In _OpenAndCloseMultipleWindowsWithFlash.")
self._LoadFlashInMultipleTabs()
self._CheckForPluginProcess('Shockwave Flash')
self._CloseAllTabs()
def _OpenAndCloseMultipleWindowsWithFlash(self):
"""Stress test for flash in multiple windows."""
logging.info('In _OpenAndCloseMultipleWindowsWithFlash.')
# Open 5 Normal and 4 Incognito windows
for tab_index in range(1, 10):
if tab_index < 6:
self.OpenNewBrowserWindow(True)
else:
self.RunCommand(pyauto.IDC_NEW_INCOGNITO_WINDOW)
self.NavigateToURL(self.flash_url2, tab_index, 0)
self.AppendTab(pyauto.GURL(self.flash_url2), tab_index)
self._CloseAllWindows()
def _OpenAndCloseMultipleTabsWithMultiplePlugins(self):
"""Stress test using multiple plugins in multiple tabs."""
logging.info('In _OpenAndCloseMultipleTabsWithMultiplePlugins.')
# Append 4 tabs with URL
for _ in range(5):
self.AppendTab(pyauto.GURL(self.plugin_url))
self._CloseAllTabs()
def _OpenAndCloseMultipleWindowsWithMultiplePlugins(self):
"""Stress test using multiple plugins in multiple windows."""
logging.info('In _OpenAndCloseMultipleWindowsWithMultiplePlugins.')
# Open 4 windows with URL
for tab_index in range(1, 5):
if tab_index < 6:
self.OpenNewBrowserWindow(True)
else:
self.RunCommand(pyauto.IDC_NEW_INCOGNITO_WINDOW)
self.NavigateToURL(self.plugin_url, tab_index, 0)
self._CloseAllWindows()
def _KillAndReloadFlash(self):
"""Stress test by killing flash process and reloading tabs."""
self._LoadFlashInMultipleTabs()
flash_process_id1 = self._GetPluginProcessId('Shockwave Flash')
self.Kill(flash_process_id1)
self._ReloadAllTabs()
self._CloseAllTabs()
def _KillAndReloadRenderersWithFlash(self):
"""Stress test by killing renderer processes and reloading tabs."""
logging.info('In _KillAndReloadRenderersWithFlash')
self._LoadFlashInMultipleTabs()
info = self.GetBrowserInfo()
# Kill all renderer processes
for tab_index in range(self.GetTabCount(0)):
self.KillRendererProcess(
info['windows'][0]['tabs'][tab_index]['renderer_pid'])
self._ReloadAllTabs()
self._CloseAllTabs()
def _TogglePlugin(self, plugin_name):
"""Toggle plugin status.
Args:
plugin_name: Name of the plugin to toggle.
"""
plugins = self.GetPluginsInfo().Plugins()
for item in range(len(plugins)):
if re.search(plugin_name, plugins[item]['name']):
if plugins[item]['enabled']:
self.DisablePlugin(plugins[item]['path'])
else:
self.EnablePlugin(plugins[item]['path'])
def _ToggleAndReloadFlashPlugin(self):
"""Toggle flash and reload all tabs."""
logging.info('In _ToggleAndReloadFlashPlugin')
for _ in range(10):
self.AppendTab(pyauto.GURL(self.flash_url3))
# Disable Flash Plugin
self._TogglePlugin('Shockwave Flash')
self._ReloadAllTabs()
# Enable Flash Plugin
self._TogglePlugin('Shockwave Flash')
self._ReloadAllTabs()
self._CloseAllTabs()
# Downloads stress functions
def _LoadDownloadsInMultipleTabs(self):
"""Load Downloads in multiple tabs in the same window."""
# Open 15 tabs with downloads
logging.info('In _LoadDownloadsInMultipleTabs')
for tab_index in range(15):
# We open an empty tab and then downlad a file from it.
self.AppendTab(pyauto.GURL(self.empty_url))
self.NavigateToURL(self.download_url1, 0, tab_index + 1)
self.AppendTab(pyauto.GURL(self.empty_url))
self.NavigateToURL(self.download_url2, 0, tab_index + 2)
def _OpenAndCloseMultipleTabsWithDownloads(self):
"""Download items in multiple tabs."""
logging.info('In _OpenAndCloseMultipleTabsWithDownloads')
self._LoadDownloadsInMultipleTabs()
self._CloseAllTabs()
def _OpenAndCloseMultipleWindowsWithDownloads(self):
"""Randomly have downloads in multiple windows."""
logging.info('In _OpenAndCloseMultipleWindowsWithDownloads')
# Open 15 Windows randomly on both regular and incognito with downloads
for window_index in range(15):
tick = round(random.random() * 100)
if tick % 2 != 0:
self.NavigateToURL(self.download_url2, 0, 0)
else:
self.RunCommand(pyauto.IDC_NEW_INCOGNITO_WINDOW)
self.AppendTab(pyauto.GURL(self.empty_url), 1)
self.NavigateToURL(self.download_url2, 1, 1)
self._CloseAllWindows()
def _OpenAndCloseMultipleTabsWithMultipleDownloads(self):
"""Download multiple items in multiple tabs."""
logging.info('In _OpenAndCloseMultipleTabsWithMultipleDownloads')
self.NavigateToURL(self.empty_url)
for _ in range(15):
for file in self.file_list:
count = 1
url = self.GetFileURLForPath(
os.path.join(self.DataDir(), 'downloads', file))
self.AppendTab(pyauto.GURL(self.empty_url))
self.NavigateToURL(url, 0, count)
count = count + 1
self._CloseAllTabs()
def _OpenAndCloseMultipleWindowsWithMultipleDownloads(self):
"""Randomly multiple downloads in multiple windows."""
logging.info('In _OpenAndCloseMultipleWindowsWithMultipleDownloads')
for _ in range(15):
for file in self.file_list:
tick = round(random.random() * 100)
url = self.GetFileURLForPath(
os.path.join(self.DataDir(), 'downloads', file))
if tick % 2!= 0:
self.NavigateToURL(url, 0, 0)
else:
self.RunCommand(pyauto.IDC_NEW_INCOGNITO_WINDOW)
self.AppendTab(pyauto.GURL(self.empty_url), 1)
self.NavigateToURL(url, 1, 1)
self._CloseAllWindows()
# Back and Forward stress functions
def _BrowserGoBack(self, window_index):
"""Go back in the browser history.
Chrome has limitation on going back and can only go back 49 pages.
Args:
window_index: the index of the browser window to work on.
"""
for nback in range(48): # Go back 48 times.
if nback % 4 == 0: # Bookmark every 5th url when going back.
self._BookMarkEvery5thURL(window_index)
self.TabGoBack(tab_index=0, windex=window_index)
def _BrowserGoForward(self, window_index):
"""Go Forward in the browser history.
Chrome has limitation on going back and can only go back 49 pages.
Args:
window_index: the index of the browser window to work on.
"""
for nforward in range(48): # Go back 48 times.
if nforward % 4 == 0: # Bookmark every 5th url when going Forward
self._BookMarkEvery5thURL(window_index)
self.TabGoForward(tab_index=0, windex=window_index)
def _AddToListAndBookmark(self, newname, url):
"""Bookmark the url to bookmarkbar and to he list of bookmarks.
Args:
newname: the name of the bookmark.
url: the url to bookmark.
"""
bookmarks = self.GetBookmarkModel()
bar_id = bookmarks.BookmarkBar()['id']
self.AddBookmarkURL(bar_id, 0, newname, url)
self.bookmarks_list.append(newname)
def _RemoveFromListAndBookmarkBar(self, name):
"""Remove the bookmark bor and bookmarks list.
Args:
name: the name of bookmark to remove.
"""
bookmarks = self.GetBookmarkModel()
node = bookmarks.FindByTitle(name)
self.RemoveBookmark(node[0]['id'])
self.bookmarks_list.remove(name)
def _DuplicateBookmarks(self, name):
"""Find duplicate bookmark in the bookmarks list.
Args:
name: name of the bookmark.
Returns:
True if it's a duplicate.
"""
for index in (self.bookmarks_list):
if index == name:
return True
return False
def _BookMarkEvery5thURL(self, window_index):
"""Check for duplicate in list and bookmark current url.
If its the first time and list is empty add the bookmark.
If its a duplicate remove the bookmark.
If its new tab page move over.
Args:
window_index: the index of the browser window to work on.
"""
tab_title = self.GetActiveTabTitle(window_index) # get the page title
url = self.GetActiveTabURL(window_index).spec() # get the page url
if not self.bookmarks_list:
self._AddToListAndBookmark(tab_title, url) # first run bookmark the url
return
elif self._DuplicateBookmarks(tab_title):
self._RemoveFromListAndBookmarkBar(tab_title)
return
elif tab_title == 'New Tab': # new tab page pass over
return
else:
# new bookmark add it to bookmarkbar
self._AddToListAndBookmark(tab_title, url)
return
def _ReadFileAndLoadInNormalAndIncognito(self):
"""Read urls and load them in normal and incognito window.
We load 96 urls only as we can go back and forth 48 times.
Uses time to get different urls in normal and incognito window
The source file is taken from stress folder in /data folder.
"""
# URL source from stress folder in data folder
data_file = os.path.join(self.DataDir(), 'pyauto_private', 'stress',
'urls_and_titles')
url_data = self.EvalDataFrom(data_file)
urls = url_data.keys()
i = 0
ticks = int(time.time()) # get the latest time.
for url in urls:
if i <= 96 : # load only 96 urls.
if ticks % 2 == 0: # loading in Incognito and Normal window.
self.NavigateToURL(url)
else:
self.NavigateToURL(url, 1, 0)
else:
break
ticks = ticks - 1
i += 1
return
def _StressTestNavigation(self):
""" This is the method from where various navigations are called.
First we load the urls then call navigete back and forth in
incognito window then in normal window.
"""
self._ReadFileAndLoadInNormalAndIncognito() # Load the urls.
self._BrowserGoBack(1) # Navigate back in incognito window.
self._BrowserGoForward(1) # Navigate forward in incognito window
self._BrowserGoBack(0) # Navigate back in normal window
self._BrowserGoForward(0) # Navigate forward in normal window
# Preference stress functions
def _RandomBool(self):
"""For any preferences bool value, it takes True or False value.
We are generating random True or False value.
"""
return random.randint(0, 1) == 1
def _RandomURL(self):
"""Some of preferences take string url, so generating random url here."""
# Site list
site_list = ['test1.html', 'test2.html','test3.html','test4.html',
'test5.html', 'test7.html', 'test6.html']
random_site = random.choice(site_list)
# Returning a url of random site
return self.GetFileURLForPath(os.path.join(self.DataDir(), random_site))
def _RandomURLArray(self):
"""Returns a list of 10 random URLs."""
return [self._RandomURL() for _ in range(10)]
def _RandomInt(self, max_number):
"""Some of the preferences takes integer value.
Eg: If there are three options, we generate random
value for any option.
Arg:
max_number: The number of options that a preference has.
"""
return random.randrange(1, max_number)
def _RandomDownloadDir(self):
"""Returns a random download directory."""
return random.choice(['dl_dir1', 'dl_dir2', 'dl_dir3',
'dl_dir4', 'dl_dir5'])
def _SetPref(self):
"""Reads the preferences from file and
sets the preferences to Chrome.
"""
raw_dictionary = self.EvalDataFrom(os.path.join(self.DataDir(),
'pyauto_private', 'stress', 'pref_dict'))
value_dictionary = {}
for key, value in raw_dictionary.iteritems():
if value == 'BOOL':
value_dictionary[key] = self._RandomBool()
elif value == 'STRING_URL':
value_dictionary[key] = self._RandomURL()
elif value == 'ARRAY_URL':
value_dictionary[key] = self._RandomURLArray()
elif value == 'STRING_PATH':
value_dictionary[key] = self._RandomDownloadDir()
elif value[0:3] == 'INT':
# Normally we difine INT datatype with number of options,
# so parsing number of options and selecting any of them
# randomly.
value_dictionary[key] = 1
max_number = raw_dictionary[key][3:4]
if not max_number == 1:
value_dictionary[key]= self._RandomInt(int(max_number))
self.SetPrefs(getattr(pyauto,key), value_dictionary[key])
return value_dictionary
# Crash reporting functions
def _CrashDumpFolder(self):
"""Get the breakpad folder.
Returns:
The full path of the Crash Reports folder.
"""
breakpad_folder = self.GetBrowserInfo()['properties']['DIR_CRASH_DUMPS']
self.assertTrue(breakpad_folder, 'Cannot figure crash dir')
return breakpad_folder
def _DeleteDumps(self):
"""Delete all the dump files in teh Crash Reports folder."""
# should be called at the start of stress run
if os.path.exists(self.breakpad_dir):
logging.info('xxxxxxxxxxxxxxxINSIDE DELETE DUMPSxxxxxxxxxxxxxxxxx')
if self.IsMac():
shutil.rmtree(self.breakpad_dir)
elif self.IsWin():
files = os.listdir(self.breakpad_dir)
for file in files:
os.remove(file)
first_crash = os.path.join(os.getcwd(), '1stcrash')
crashes_dir = os.path.join(os.getcwd(), 'crashes')
if (os.path.exists(crashes_dir)):
shutil.rmtree(crashes_dir)
shutil.rmtree(first_crash)
def _SymbolicateCrashDmp(self, dmp_file, symbols_dir, output_file):
"""Generate symbolicated crash report.
Args:
dmp_file: the dmp file to symbolicate.
symbols_dir: the directory containing the symbols.
output_file: the output file.
Returns:
Crash report text.
"""
report = ''
if self.IsWin():
windbg_cmd = [
os.path.join('C:', 'Program Files', 'Debugging Tools for Windows',
'windbg.exe'),
'-Q',
'-y',
'\"',
symbols_dir,
'\"',
'-c',
'\".ecxr;k50;.logclose;q\"',
'-logo',
output_file,
'-z',
'\"',
dmp_file,
'\"']
subprocess.call(windbg_cmd)
# Since we are directly writing the info into output_file,
# we just need to copy that in to report
report = open(output_file, 'r').read()
elif self.IsMac():
crash_report = os.path.join(self.DataDir(), 'pyauto_private', 'stress',
'mac', 'crash_report')
for i in range(5): # crash_report doesn't work sometimes. So we retry
report = test_utils.Shell2(
'%s -S "%s" "%s"' % (crash_report, symbols_dir, dmp_file))[0]
if len(report) < 200:
try_again = 'Try %d. crash_report didn\'t work out. Trying again', i
logging.info(try_again)
else:
break
open(output_file, 'w').write(report)
return report
def _SaveSymbols(self, symbols_dir, dump_dir=' ', multiple_dumps=True):
"""Save the symbolicated files for all crash dumps.
Args:
symbols_dir: the directory containing the symbols.
dump_dir: Path to the directory holding the crash dump files.
multiple_dumps: True if we are processing multiple dump files,
False if we are processing only the first crash.
"""
if multiple_dumps:
dump_dir = self.breakpad_dir
if not os.path.isdir(CRASHES):
os.makedirs(CRASHES)
# This will be sent to the method by the caller.
dmp_files = glob.glob(os.path.join(dump_dir, '*.dmp'))
for dmp_file in dmp_files:
dmp_id = os.path.splitext(os.path.basename(dmp_file))[0]
if multiple_dumps:
report_folder = CRASHES
else:
report_folder = dump_dir
report_fname = os.path.join(report_folder,
'%s.txt' % (dmp_id))
report = self._SymbolicateCrashDmp(dmp_file, symbols_dir,
report_fname)
if report == '':
logging.info('Crash report is empty.')
# This is for copying the original dumps.
if multiple_dumps:
shutil.copy2(dmp_file, CRASHES)
def _GetFirstCrashDir(self):
"""Get first crash file in the crash folder.
Here we create the 1stcrash directory which holds the
first crash report, which will be attached to the mail.
"""
breakpad_folder = self.breakpad_dir
dump_list = glob.glob1(breakpad_folder,'*.dmp')
dump_list.sort(key=lambda s: os.path.getmtime(os.path.join(
breakpad_folder, s)))
first_crash_file = os.path.join(breakpad_folder, dump_list[0])
if not os.path.isdir('1stcrash'):
os.makedirs('1stcrash')
shutil.copy2(first_crash_file, '1stcrash')
first_crash_dir = os.path.join(os.getcwd(), '1stcrash')
return first_crash_dir
def _GetFirstCrashFile(self):
"""Get first crash file in the crash folder."""
first_crash_dir = os.path.join(os.getcwd(), '1stcrash')
for each in os.listdir(first_crash_dir):
if each.endswith('.txt'):
first_crash_file = each
return os.path.join(first_crash_dir, first_crash_file)
def _ProcessOnlyFirstCrash(self):
""" Process only the first crash report for email."""
first_dir = self._GetFirstCrashDir()
self._SaveSymbols(self.symbols_dir, first_dir, False)
def _GetOSName(self):
"""Returns the OS type we are running this script on."""
os_name = ''
if self.IsMac():
os_number = commands.getoutput('sw_vers -productVersion | cut -c 1-4')
if os_number == '10.6':
os_name = 'Snow_Leopard'
elif os_number == '10.5':
os_name = 'Leopard'
elif self.IsWin():
# TODO: Windows team need to find the way to get OS name
os_name = 'Windows'
if platform.version()[0] == '5':
os_name = os_name + '_XP'
else:
os_name = os_name + '_Vista/Win7'
return os_name
def _ProcessUploadAndEmailCrashes(self):
"""Upload the crashes found and email the team about this."""
logging.info('#########INSIDE _ProcessUploadAndEmailCrashes#########')
try:
build_version = self.chrome_version
self._SaveSymbols(self.symbols_dir)
self._ProcessOnlyFirstCrash()
file_to_attach = self._GetFirstCrashFile()
# removing the crash_txt for now,
# since we are getting UnicodeDecodeError
# crash_txt = open(file_to_attach).read()
except ValueError:
test_utils.SendMail(self.stress_pref['mailing_address'],
self.stress_pref['mailing_address'],
"We don't have build version",
"BROWSER CRASHED, PLEASE CHECK",
self.stress_pref['smtp'])
# Move crash reports and dumps to server
os_name = self._GetOSName()
dest_dir = build_version + '_' + os_name
if (test_utils.Shell2(self.stress_pref['script'] % (CRASHES, dest_dir))):
logging.info('Copy Complete')
upload_dir= self.stress_pref['upload_dir'] + dest_dir
num_crashes = '\n \n Number of Crashes :' + \
str(len(glob.glob1(self.breakpad_dir, '*.dmp')))
mail_content = '\n\n Crash Report URL :' + upload_dir + '\n' + \
num_crashes + '\n\n' # + crash_txt
mail_subject = 'Stress Results :' + os_name + '_' + build_version
# Sending mail with first crash report, # of crashes, location of upload
test_utils.SendMail(self.stress_pref['mailing_address'],
self.stress_pref['mailing_address'],
mail_subject, mail_content,
self.stress_pref['smtp'], file_to_attach)
def _ReportCrashIfAny(self):
"""Check for browser crashes and report."""
if os.path.isdir(self.breakpad_dir):
listOfDumps = glob.glob(os.path.join(self.breakpad_dir, '*.dmp'))
if len(listOfDumps) > 0:
logging.info('========== INSIDE REPORT CRASH++++++++++++++')
# inform a method to process the dumps
self._ProcessUploadAndEmailCrashes()
# Test functions
def _PrefStress(self):
"""Stress preferences."""
default_prefs = self.GetPrefsInfo()
pref_dictionary = self._SetPref()
for key, value in pref_dictionary.iteritems():
self.assertEqual(value, self.GetPrefsInfo().Prefs(
getattr(pyauto, key)))
for key, value in pref_dictionary.iteritems():
self.SetPrefs(getattr(pyauto, key),
default_prefs.Prefs(getattr(pyauto, key)))
def _NavigationStress(self):
"""Run back and forward stress in normal and incognito window."""
self.RunCommand(pyauto.IDC_NEW_INCOGNITO_WINDOW)
self._StressTestNavigation()
def _DownloadStress(self):
"""Run all the Download stress test."""
org_download_dir = self.GetDownloadDirectory().value()
new_dl_dir = os.path.join(org_download_dir, 'My+Downloads Folder')
os.path.exists(new_dl_dir) and shutil.rmtree(new_dl_dir)
os.makedirs(new_dl_dir)
self.SetPrefs(pyauto.kDownloadDefaultDirectory, new_dl_dir)
self._OpenAndCloseMultipleTabsWithDownloads()
self._OpenAndCloseMultipleWindowsWithDownloads()
self._OpenAndCloseMultipleTabsWithMultipleDownloads()
self._OpenAndCloseMultipleWindowsWithMultipleDownloads()
pyauto_utils.RemovePath(new_dl_dir) # cleanup
self.SetPrefs(pyauto.kDownloadDefaultDirectory, org_download_dir)
def _PluginStress(self):
"""Run all the plugin stress tests."""
self._OpenAndCloseMultipleTabsWithFlash()
self._OpenAndCloseMultipleWindowsWithFlash()
self._OpenAndCloseMultipleTabsWithMultiplePlugins()
self._OpenAndCloseMultipleWindowsWithMultiplePlugins()
self._KillAndReloadRenderersWithFlash()
self._ToggleAndReloadFlashPlugin()
def testStress(self):
"""Run all the stress tests for 24 hrs."""
if self.GetBrowserInfo()['properties']['branding'] != 'Google Chrome':
logging.info('This is not a branded build, so stopping the stress')
return 1
self._DownloadSymbols()
run_number = 1
start_time = time.time()
while True:
logging.info('run %d...' % run_number)
run_number = run_number + 1
if (time.time() - start_time) >= 24*60*60:
logging.info('Its been 24hrs, so we break now.')
break
try:
methods = [self._NavigationStress, self._DownloadStress,
self._PluginStress, self._PrefStress]
random.shuffle(methods)
for method in methods:
method()
logging.info('Method %s done' % method)
except KeyboardInterrupt:
logging.info('----------We got a KeyboardInterrupt-----------')
except Exception, error:
logging.info('-------------There was an ERROR---------------')
logging.info(error)
# Crash Reporting
self._ReportCrashIfAny()
self._DeleteDumps()
if self.IsMac():
zombie = 'ps -el | grep Chrom | grep -v grep | grep Z | wc -l'
zombie_count = int(commands.getoutput(zombie))
if zombie_count > 0:
logging.info('WE HAVE ZOMBIES = %d' % zombie_count)
if __name__ == '__main__':
pyauto_functional.Main()