普通文本  |  325行  |  13.46 KB

# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging
import os
import time

from commands import *

from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros import chromedriver
from selenium.webdriver.common.keys import Keys
from autotest_lib.client.cros.graphics import graphics_utils
from selenium.webdriver.common.action_chains import ActionChains
from selenium.common.exceptions import WebDriverException

TIMEOUT_TO_COPY = 1800  # in Secs. This timeout is for files beyond 1GB
SEARCH_BUTTON_ID = "search-button"
SEARCH_BOX_CSS = "div#search-box"
PAPER_CONTAINTER = "paper-input-container"
DELETE_BUTTON_ID = "delete-button"
FILE_LIST_ID = "file-list"
LABLE_ENTRY_CSS = "span.label.entry-name"
CR_DIALOG_CLASS = "cr-dialog-ok"
USER_LOCATION = "/home/chronos/user"
# Using graphics_utils to simulate below keys
OPEN_FILES_APPLICATION_KEYS = ["KEY_RIGHTSHIFT", "KEY_LEFTALT", "KEY_M"]
SWITCH_TO_APP_KEY_COMBINATION = ["KEY_LEFTALT", 'KEY_TAB']
SELECT_ALL_KEY_COMBINATION = ["KEY_LEFTCTRL", "KEY_A"]
PASTE_KEY_COMBINATION = ["KEY_LEFTCTRL", "KEY_V"]
GOOGLE_DRIVE = 'My Drive'


class files_CopyFileToGoogleDriveUI(graphics_utils.GraphicsTest):

    """Copy a file from Downloads folder to Google drive"""

    version = 1
    TIME_DELAY = 5
    _WAIT_TO_LOAD = 5

    def initialize(self):
        """Autotest initialize function"""
        super(files_CopyFileToGoogleDriveUI, self).initialize(
                raise_error_on_hang=True)

    def cleanup(self):
        """Autotest cleanup function"""
        if self._GSC:
            keyvals = self._GSC.get_memory_difference_keyvals()
            for key, val in keyvals.iteritems():
                self.output_perf_value(
                    description=key,
                    value=val,
                    units='bytes',
                    higher_is_better=False)
            self.write_perf_keyval(keyvals)
        super(files_CopyFileToGoogleDriveUI, self).cleanup()
        # If test fails then script will collect the screen shot to know at
        # which instance failure occurred.
        if not self.success:
            graphics_utils.take_screenshot(os.path.join(self.debugdir),
                                           "chrome")

    def switch_to_app(self, driver, title):
        """Switching to application using title

        @param driver: chrome driver object
        @param title: Title of the application
        @return: True if the app is detected otherwise False
        """
        windows = driver.window_handles
        logging.debug("Windows opened: %s", windows)
        # Checking current window initially..
        logging.debug("Current window is %s", driver.title)
        if driver.title.strip().lower() == title.lower():
            return True
        # Switching to all opened windows to find out the required window
        for window in windows:
            try:
                logging.debug("Switching to window")
                driver.switch_to_window(window)
                logging.debug("Switched to window: %s", driver.title)
                time.sleep(2)
                if driver.title.strip().lower() == title.lower():
                    logging.info("%s application opened!", title)
                    return True
            except WebDriverException as we:
                logging.debug("Webdriver exception occurred. Exception: %s",
                              str(we))
            except Exception as e:
                logging.debug("Exception: %s", str(e))
        return False

    def open_files_application(self, driver):
        """Open and switch to files application using graphics_utils.py

        @param driver: chrome driver object
        """
        logging.info("Opening files application")
        graphics_utils.press_keys(OPEN_FILES_APPLICATION_KEYS)
        time.sleep(self._WAIT_TO_LOAD)
        try:
            self.switch_to_files(driver)
        except Exception as e:
            logging.error("Exception when switching files application.. %s",
                          str(e))
            logging.error("Failed to find files application. Trying again.")
            graphics_utils.press_keys(OPEN_FILES_APPLICATION_KEYS)
            time.sleep(self._WAIT_TO_LOAD)
            self.switch_to_files(driver)

    def switch_to_files(self, driver, title="Downloads"):
        """Switch to files application

        @param driver: chrome driver object
        @param title: Title of the Files application
        """
        logging.debug("Switching/Focus on the Files app")
        if self.switch_to_app(driver, title):
            logging.info("Focused on Files application")
            graphics_utils.press_keys(SWITCH_TO_APP_KEY_COMBINATION)
            time.sleep(1)
        else:
            raise error.TestFail("Failed to open on Files application")

    def check_folder_opened(self, driver, title):
        """Check the selected folder is opened or not

        @param driver: chrome driver object
        @param title: Folder name
        @return: Returns True if expected folder is opened otherwise False
        """
        logging.info("Actual files application title is %s", driver.title)
        logging.info("Expected files application title is %s", title)
        if driver.title == title:
            return True
        return False

    def open_folder(self, driver, folder):
        """Open given folder

        @param driver: chrome driver object
        @param folder: Directory name
        """
        folder_webelements = driver.find_elements_by_css_selector(
            LABLE_ENTRY_CSS)
        for element in folder_webelements:
            try:
                logging.debug("Found folder name: %s", element.text.strip())
                if folder == element.text.strip():
                    element.click()
                    time.sleep(3)
                    if self.check_folder_opened(driver, element.text.strip()):
                        logging.info("Folder is opened!")
                        return
            except Exception as e:
                logging.error("Exception when getting Files application "
                              "folders %s", str(e))
        raise error.TestError("Folder :%s is not opened or found", folder)

    def list_files(self, driver):
        """List files in the folder

        @param driver: chrome driver object
        @return: Returns list of files
        """
        return driver.find_element_by_id(
            FILE_LIST_ID).find_elements_by_tag_name('li')

    def search_file(self, driver, file_name):
        """Search given file in Files application

        @param driver: chrome driver object
        @param file_name: Required file
        """
        driver.find_element_by_id(SEARCH_BUTTON_ID).click()
        search_box_element = driver.find_element_by_css_selector(
            SEARCH_BOX_CSS)
        search_box_element.find_element_by_css_selector(
            PAPER_CONTAINTER).find_element_by_tag_name('input').clear()
        search_box_element.find_element_by_css_selector(
            PAPER_CONTAINTER).find_element_by_tag_name('input').send_keys(
            file_name)

    def copy_file(self, driver, source, destination, file_name, clean=True):
        """Copy file from one directory to another

        @param driver: chrome driver object
        @param source: Directory name from where to copy
        @param destination: Directory name to where to copy
        @param file_name: File to copy
        @param clean: Cleans destination if True otherwise nothing
        """
        self.open_folder(driver, source)
        self.search_file(driver, file_name)
        files = self.list_files(driver)
        action_chains = ActionChains(driver)

        for item in files:
            logging.info("Selecting file to copy in %s", file_name)
            item.click()
            file_size = item.text.split()[1].strip()
            file_size_units = item.text.split()[2].strip()
            logging.debug("Select copy")
            action_chains.move_to_element(item) \
                .click(item).key_down(Keys.CONTROL) \
                .send_keys("c") \
                .key_up(Keys.CONTROL) \
                .perform()
            self.open_folder(driver, destination)
            if clean:
                drive_files = self.list_files(driver)
                if len(drive_files) != 0:
                    logging.info("Removing existing files from %s",
                                 destination)
                    drive_files[0].click()
                    logging.debug("Select all files/dirs")
                    graphics_utils.press_keys(SELECT_ALL_KEY_COMBINATION)
                    time.sleep(0.2)
                    driver.find_element_by_id(DELETE_BUTTON_ID).click()
                    driver.find_element_by_class_name(CR_DIALOG_CLASS).click()
                    time.sleep(self.TIME_DELAY)
            logging.debug("Pressing control+v to paste the file in required "
                          "location")
            graphics_utils.press_keys(PASTE_KEY_COMBINATION)
            time.sleep(self.TIME_DELAY)
            # Take dummy values initially
            required_file_size = "0"
            required_file_size_units = "KB"
            required_file = None
            # wait till the data copied
            start_time = time.time()
            while required_file_size != file_size and \
                required_file_size_units != file_size_units and \
                    (time.time() - start_time <= TIMEOUT_TO_COPY):
                drive_files_during_copy = self.list_files(driver)
                if len(drive_files_during_copy) == 0:
                    raise error.TestError("File copy not started!")
                for i_item in drive_files_during_copy:
                    if i_item.text.strip().split()[0].strip() == file_name:
                        logging.info("File found %s", i_item.text.split()[
                            0].strip())
                        required_file = file
                if not required_file:
                    raise error.TestError("No such file/directory in drive, "
                                          "%s", required_file)
                logging.info(required_file.text.split())
                required_file_size = required_file.text.split()[1]
                required_file_size_units = required_file.text.split()[2]
                time.sleep(5)
                logging.debug("%s %s data copied" % (required_file_size,
                                                     required_file_size_units))
            # Validation starts here
            found = False
            drive_files_after_copy = self.list_files(driver)
            for copied_file in drive_files_after_copy:
                logging.debug("File in destination: %s",
                              copied_file.text.strip())
                if copied_file.find_element_by_class_name(
                        'entry-name').text.strip() == file_name:
                    found = True
                    break

            if found:
                logging.info("Copied the file successfully!")
            else:
                raise error.TestFail("File not transferred successfully!")

    def catch_info_or_error_messages(self, driver):
        """Logic to catch the error

        @param driver: chrome driver object
        """
        errors = []
        try:
            driver.find_element_by_css_selector(
                'div.button-frame').find_element_by_class_name('open').click()
        except Exception as e:
            logging.info("Error in open error messages")
            logging.info(str(e))
        error_elements = driver.find_elements_by_css_selector(
            'div.progress-frame')
        if len(error_elements) != 0:
            for error_element in error_elements:
                info_text = error_element.find_element_by_tag_name(
                    'label').text
                if info_text != "":
                    errors.append(info_text)
        return errors

    def create_file(self, filename):
        """Create a file"""
        status, output = getstatusoutput('dd if=/dev/zero of=%s bs=%s '
                                         'count=1 iflag=fullblock' %
                                         (filename, 1024))
        if status:
            raise error.TestError("Failed to create file")

    def run_once(self, username=None, password=None, source="Downloads",
                 file_name='test.dat'):
        """Copy file to Google Drive in Files application

        @param username: Real user(Not default autotest user)
        @param password: Password for the user.
        @param source: From where to copy file
        @param file_name: File name
        """
        self.success = False  # Used to capture the screenshot if the TC fails
        with chromedriver.chromedriver(username=username,
                                       password=password,
                                       disable_default_apps=False,
                                       gaia_login=True) as cr_instance:
            driver = cr_instance.driver
            self.open_files_application(driver)
            self.create_file(os.path.join(os.path.join(USER_LOCATION,
                                                       source), file_name))
            self.copy_file(driver, source, GOOGLE_DRIVE, file_name)
            errors = self.catch_info_or_error_messages(driver)
            if len(errors):
                raise error.TestFail("Test failed with the following"
                                     " errors. %s", errors)
        self.success = True