普通文本  |  120行  |  3.71 KB

""" Parallel execution management """

__author__ = """Copyright Andy Whitcroft 2006"""

import sys, logging, os, pickle, traceback, gc, time
from autotest_lib.client.common_lib import error, utils

def fork_start(tmp, l):
    sys.stdout.flush()
    sys.stderr.flush()
    pid = os.fork()
    if pid:
        # Parent
        return pid

    try:
        try:
            l()
        except error.AutotestError:
            raise
        except Exception, e:
            raise error.UnhandledTestError(e)
    except Exception, detail:
        try:
            try:
                logging.error('child process failed')
                # logging.exception() uses ERROR level, but we want DEBUG for
                # the traceback
                for line in traceback.format_exc().splitlines():
                    logging.debug(line)
            finally:
                # note that exceptions originating in this block won't make it
                # to the logs
                output_dir = os.path.join(tmp, 'debug')
                if not os.path.exists(output_dir):
                    os.makedirs(output_dir)
                ename = os.path.join(output_dir, "error-%d" % os.getpid())
                pickle.dump(detail, open(ename, "w"))

                sys.stdout.flush()
                sys.stderr.flush()
        finally:
            # clear exception information to allow garbage collection of
            # objects referenced by the exception's traceback
            sys.exc_clear()
            gc.collect()
            os._exit(1)
    else:
        try:
            sys.stdout.flush()
            sys.stderr.flush()
        finally:
            os._exit(0)


def _check_for_subprocess_exception(temp_dir, pid):
    ename = temp_dir + "/debug/error-%d" % pid
    if os.path.exists(ename):
        try:
            e = pickle.load(file(ename, 'r'))
        except ImportError:
            with open(ename, 'r') as fp:
                file_text = fp.read()
            raise error.TestError(
                    'Subprocess raised an exception that could not be '
                    'identified. The root cause exception is in the text '
                    'that follows: ' + file_text)
        finally:
            # Rename the error-pid file so that they do not affect later child
            # processes that use recycled pids.
            i = 0
            while True:
                pename = ename + ('-%d' % i)
                i += 1
                if not os.path.exists(pename):
                    break
            os.rename(ename, pename)
        raise e


def fork_waitfor(tmp, pid):
    (pid, status) = os.waitpid(pid, 0)

    _check_for_subprocess_exception(tmp, pid)

    if status:
        raise error.TestError("Test subprocess failed rc=%d" % (status))

def fork_waitfor_timed(tmp, pid, timeout):
    """
    Waits for pid until it terminates or timeout expires.
    If timeout expires, test subprocess is killed.
    """
    timer_expired = True
    poll_time = 2
    time_passed = 0
    while time_passed < timeout:
        time.sleep(poll_time)
        (child_pid, status) = os.waitpid(pid, os.WNOHANG)
        if (child_pid, status) == (0, 0):
            time_passed = time_passed + poll_time
        else:
            timer_expired = False
            break

    if timer_expired:
        logging.info('Timer expired (%d sec.), nuking pid %d', timeout, pid)
        utils.nuke_pid(pid)
        (child_pid, status) = os.waitpid(pid, 0)
        raise error.TestError("Test timeout expired, rc=%d" % (status))
    else:
        _check_for_subprocess_exception(tmp, pid)

    if status:
        raise error.TestError("Test subprocess failed rc=%d" % (status))

def fork_nuke_subprocess(tmp, pid):
    utils.nuke_pid(pid)
    _check_for_subprocess_exception(tmp, pid)