# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import time
from commands import *
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros import chromedriver
from selenium.webdriver.common.keys import Keys
from autotest_lib.client.cros.graphics import graphics_utils
from selenium.webdriver.common.action_chains import ActionChains
from selenium.common.exceptions import WebDriverException
TIMEOUT_TO_COPY = 1800 # in Secs. This timeout is for files beyond 1GB
SEARCH_BUTTON_ID = "search-button"
SEARCH_BOX_CSS = "div#search-box"
PAPER_CONTAINTER = "paper-input-container"
DELETE_BUTTON_ID = "delete-button"
FILE_LIST_ID = "file-list"
LABLE_ENTRY_CSS = "span.label.entry-name"
CR_DIALOG_CLASS = "cr-dialog-ok"
USER_LOCATION = "/home/chronos/user"
# Using graphics_utils to simulate below keys
OPEN_FILES_APPLICATION_KEYS = ["KEY_RIGHTSHIFT", "KEY_LEFTALT", "KEY_M"]
SWITCH_TO_APP_KEY_COMBINATION = ["KEY_LEFTALT", 'KEY_TAB']
SELECT_ALL_KEY_COMBINATION = ["KEY_LEFTCTRL", "KEY_A"]
PASTE_KEY_COMBINATION = ["KEY_LEFTCTRL", "KEY_V"]
GOOGLE_DRIVE = 'My Drive'
class files_CopyFileToGoogleDriveUI(graphics_utils.GraphicsTest):
"""Copy a file from Downloads folder to Google drive"""
version = 1
TIME_DELAY = 5
_WAIT_TO_LOAD = 5
def initialize(self):
"""Autotest initialize function"""
super(files_CopyFileToGoogleDriveUI, self).initialize(
raise_error_on_hang=True)
def cleanup(self):
"""Autotest cleanup function"""
if self._GSC:
keyvals = self._GSC.get_memory_difference_keyvals()
for key, val in keyvals.iteritems():
self.output_perf_value(
description=key,
value=val,
units='bytes',
higher_is_better=False)
self.write_perf_keyval(keyvals)
super(files_CopyFileToGoogleDriveUI, self).cleanup()
# If test fails then script will collect the screen shot to know at
# which instance failure occurred.
if not self.success:
graphics_utils.take_screenshot(os.path.join(self.debugdir),
"chrome")
def switch_to_app(self, driver, title):
"""Switching to application using title
@param driver: chrome driver object
@param title: Title of the application
@return: True if the app is detected otherwise False
"""
windows = driver.window_handles
logging.debug("Windows opened: %s", windows)
# Checking current window initially..
logging.debug("Current window is %s", driver.title)
if driver.title.strip().lower() == title.lower():
return True
# Switching to all opened windows to find out the required window
for window in windows:
try:
logging.debug("Switching to window")
driver.switch_to_window(window)
logging.debug("Switched to window: %s", driver.title)
time.sleep(2)
if driver.title.strip().lower() == title.lower():
logging.info("%s application opened!", title)
return True
except WebDriverException as we:
logging.debug("Webdriver exception occurred. Exception: %s",
str(we))
except Exception as e:
logging.debug("Exception: %s", str(e))
return False
def open_files_application(self, driver):
"""Open and switch to files application using graphics_utils.py
@param driver: chrome driver object
"""
logging.info("Opening files application")
graphics_utils.press_keys(OPEN_FILES_APPLICATION_KEYS)
time.sleep(self._WAIT_TO_LOAD)
try:
self.switch_to_files(driver)
except Exception as e:
logging.error("Exception when switching files application.. %s",
str(e))
logging.error("Failed to find files application. Trying again.")
graphics_utils.press_keys(OPEN_FILES_APPLICATION_KEYS)
time.sleep(self._WAIT_TO_LOAD)
self.switch_to_files(driver)
def switch_to_files(self, driver, title="Downloads"):
"""Switch to files application
@param driver: chrome driver object
@param title: Title of the Files application
"""
logging.debug("Switching/Focus on the Files app")
if self.switch_to_app(driver, title):
logging.info("Focused on Files application")
graphics_utils.press_keys(SWITCH_TO_APP_KEY_COMBINATION)
time.sleep(1)
else:
raise error.TestFail("Failed to open on Files application")
def check_folder_opened(self, driver, title):
"""Check the selected folder is opened or not
@param driver: chrome driver object
@param title: Folder name
@return: Returns True if expected folder is opened otherwise False
"""
logging.info("Actual files application title is %s", driver.title)
logging.info("Expected files application title is %s", title)
if driver.title == title:
return True
return False
def open_folder(self, driver, folder):
"""Open given folder
@param driver: chrome driver object
@param folder: Directory name
"""
folder_webelements = driver.find_elements_by_css_selector(
LABLE_ENTRY_CSS)
for element in folder_webelements:
try:
logging.debug("Found folder name: %s", element.text.strip())
if folder == element.text.strip():
element.click()
time.sleep(3)
if self.check_folder_opened(driver, element.text.strip()):
logging.info("Folder is opened!")
return
except Exception as e:
logging.error("Exception when getting Files application "
"folders %s", str(e))
raise error.TestError("Folder :%s is not opened or found", folder)
def list_files(self, driver):
"""List files in the folder
@param driver: chrome driver object
@return: Returns list of files
"""
return driver.find_element_by_id(
FILE_LIST_ID).find_elements_by_tag_name('li')
def search_file(self, driver, file_name):
"""Search given file in Files application
@param driver: chrome driver object
@param file_name: Required file
"""
driver.find_element_by_id(SEARCH_BUTTON_ID).click()
search_box_element = driver.find_element_by_css_selector(
SEARCH_BOX_CSS)
search_box_element.find_element_by_css_selector(
PAPER_CONTAINTER).find_element_by_tag_name('input').clear()
search_box_element.find_element_by_css_selector(
PAPER_CONTAINTER).find_element_by_tag_name('input').send_keys(
file_name)
def copy_file(self, driver, source, destination, file_name, clean=True):
"""Copy file from one directory to another
@param driver: chrome driver object
@param source: Directory name from where to copy
@param destination: Directory name to where to copy
@param file_name: File to copy
@param clean: Cleans destination if True otherwise nothing
"""
self.open_folder(driver, source)
self.search_file(driver, file_name)
files = self.list_files(driver)
action_chains = ActionChains(driver)
for item in files:
logging.info("Selecting file to copy in %s", file_name)
item.click()
file_size = item.text.split()[1].strip()
file_size_units = item.text.split()[2].strip()
logging.debug("Select copy")
action_chains.move_to_element(item) \
.click(item).key_down(Keys.CONTROL) \
.send_keys("c") \
.key_up(Keys.CONTROL) \
.perform()
self.open_folder(driver, destination)
if clean:
drive_files = self.list_files(driver)
if len(drive_files) != 0:
logging.info("Removing existing files from %s",
destination)
drive_files[0].click()
logging.debug("Select all files/dirs")
graphics_utils.press_keys(SELECT_ALL_KEY_COMBINATION)
time.sleep(0.2)
driver.find_element_by_id(DELETE_BUTTON_ID).click()
driver.find_element_by_class_name(CR_DIALOG_CLASS).click()
time.sleep(self.TIME_DELAY)
logging.debug("Pressing control+v to paste the file in required "
"location")
graphics_utils.press_keys(PASTE_KEY_COMBINATION)
time.sleep(self.TIME_DELAY)
# Take dummy values initially
required_file_size = "0"
required_file_size_units = "KB"
required_file = None
# wait till the data copied
start_time = time.time()
while required_file_size != file_size and \
required_file_size_units != file_size_units and \
(time.time() - start_time <= TIMEOUT_TO_COPY):
drive_files_during_copy = self.list_files(driver)
if len(drive_files_during_copy) == 0:
raise error.TestError("File copy not started!")
for i_item in drive_files_during_copy:
if i_item.text.strip().split()[0].strip() == file_name:
logging.info("File found %s", i_item.text.split()[
0].strip())
required_file = file
if not required_file:
raise error.TestError("No such file/directory in drive, "
"%s", required_file)
logging.info(required_file.text.split())
required_file_size = required_file.text.split()[1]
required_file_size_units = required_file.text.split()[2]
time.sleep(5)
logging.debug("%s %s data copied" % (required_file_size,
required_file_size_units))
# Validation starts here
found = False
drive_files_after_copy = self.list_files(driver)
for copied_file in drive_files_after_copy:
logging.debug("File in destination: %s",
copied_file.text.strip())
if copied_file.find_element_by_class_name(
'entry-name').text.strip() == file_name:
found = True
break
if found:
logging.info("Copied the file successfully!")
else:
raise error.TestFail("File not transferred successfully!")
def catch_info_or_error_messages(self, driver):
"""Logic to catch the error
@param driver: chrome driver object
"""
errors = []
try:
driver.find_element_by_css_selector(
'div.button-frame').find_element_by_class_name('open').click()
except Exception as e:
logging.info("Error in open error messages")
logging.info(str(e))
error_elements = driver.find_elements_by_css_selector(
'div.progress-frame')
if len(error_elements) != 0:
for error_element in error_elements:
info_text = error_element.find_element_by_tag_name(
'label').text
if info_text != "":
errors.append(info_text)
return errors
def create_file(self, filename):
"""Create a file"""
status, output = getstatusoutput('dd if=/dev/zero of=%s bs=%s '
'count=1 iflag=fullblock' %
(filename, 1024))
if status:
raise error.TestError("Failed to create file")
def run_once(self, username=None, password=None, source="Downloads",
file_name='test.dat'):
"""Copy file to Google Drive in Files application
@param username: Real user(Not default autotest user)
@param password: Password for the user.
@param source: From where to copy file
@param file_name: File name
"""
self.success = False # Used to capture the screenshot if the TC fails
with chromedriver.chromedriver(username=username,
password=password,
disable_default_apps=False,
gaia_login=True) as cr_instance:
driver = cr_instance.driver
self.open_files_application(driver)
self.create_file(os.path.join(os.path.join(USER_LOCATION,
source), file_name))
self.copy_file(driver, source, GOOGLE_DRIVE, file_name)
errors = self.catch_info_or_error_messages(driver)
if len(errors):
raise error.TestFail("Test failed with the following"
" errors. %s", errors)
self.success = True