普通文本  |  180行  |  6.26 KB

# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Tester feedback request multiplexer."""

from multiprocessing import reduction
import Queue
import collections
import multiprocessing
import os
import sys

import common
from autotest_lib.client.common_lib.feedback import tester_feedback_client

import input_handlers
import request
import sequenced_request


ReqTuple = collections.namedtuple(
        'ReqTuple', ('obj', 'reduced_reply_pipe', 'query_num', 'atomic'))


class FeedbackRequestMultiplexer(object):
    """A feedback request multiplexer."""

    class RequestProcessingTerminated(Exception):
        """User internally to signal processor termination."""


    def __init__(self):
        self._request_queue = multiprocessing.Queue()
        self._pending = []
        self._request_handling_process = None
        self._running = False
        self._atomic_seq = None


    def _dequeue_request(self, block=False):
        try:
            req_tuple = self._request_queue.get(block=block)
        except Queue.Empty:
            return False

        if req_tuple is None:
            raise self.RequestProcessingTerminated
        self._pending.append(req_tuple)
        return True


    def _atomic_seq_cont(self):
        """Returns index of next pending request in atomic sequence, if any."""
        for req_idx, req_tuple in enumerate(self._pending):
            if req_tuple.query_num == self._atomic_seq:
                return req_idx


    def _handle_requests(self, stdin):
        """Processes feedback requests until termination is signaled.

        This method is run in a separate process and needs to override stdin in
        order for raw_input() to work.
        """
        sys.stdin = stdin
        try:
            while True:
                req_idx = None

                # Wait for a (suitable) request to become available.
                while True:
                    if self._atomic_seq is None:
                        if self._pending:
                            break
                    else:
                        req_idx = self._atomic_seq_cont()
                        if req_idx is not None:
                            break
                    self._dequeue_request(block=True)

                # If no request was pre-selected, prompt the user to choose one.
                if req_idx is None:
                    raw_input('Pending feedback requests, hit Enter to '
                              'process... ')

                    # Pull all remaining queued requests.
                    while self._dequeue_request():
                        pass

                    # Select the request to process.
                    if len(self._pending) == 1:
                        print('Processing: %s' %
                              self._pending[0].obj.get_title())
                        req_idx = 0
                    else:
                        choose_req = sequenced_request.SequencedFeedbackRequest(
                                None, None, None)
                        choose_req.append_question(
                                'List of pending feedback requests:',
                                input_handlers.MultipleChoiceInputHandler(
                                        [req_tuple.obj.get_title()
                                         for req_tuple in self._pending],
                                        default=1),
                                prompt='Choose a request to process')
                        req_idx, _ = choose_req.execute()

                # Pop and handle selected request, then close pipe.
                req_tuple = self._pending.pop(req_idx)
                if req_tuple.obj is not None:
                    try:
                        ret = req_tuple.obj.execute()
                    except request.FeedbackRequestError as e:
                        ret = (tester_feedback_client.QUERY_RET_ERROR, str(e))
                    reply_pipe = req_tuple.reduced_reply_pipe[0](
                            *req_tuple.reduced_reply_pipe[1])
                    reply_pipe.send(ret)
                    reply_pipe.close()

                # Set the atomic sequence if so instructed.
                self._atomic_seq = (req_tuple.query_num if req_tuple.atomic
                                    else None)

        except self.RequestProcessingTerminated:
            pass


    def start(self):
        """Starts the request multiplexer."""
        if self._running:
            return

        dup_stdin = os.fdopen(os.dup(sys.stdin.fileno()))
        self._request_handling_process = multiprocessing.Process(
                target=self._handle_requests, args=(dup_stdin,))
        self._request_handling_process.start()

        self._running = True


    def stop(self):
        """Stops the request multiplexer."""
        if not self._running:
            return

        # Tell the request handler to quit.
        self._request_queue.put(None)
        self._request_handling_process.join()

        self._running = False


    def process_request(self, request, query_num, atomic):
        """Processes a feedback requests and returns its result.

        This call is used by queries for submitting individual requests. It is
        a blocking call that should be called from a separate execution thread.

        @param request: The feedback request to process.
        @param query_num: The unique query number.
        @param atomic: Whether subsequent request(s) are expected and should be
                       processed without interruption.
        """
        reply_pipe_send, reply_pipe_recv = multiprocessing.Pipe()
        reduced_reply_pipe_send = reduction.reduce_connection(reply_pipe_send)
        self._request_queue.put(ReqTuple(request, reduced_reply_pipe_send,
                                         query_num, atomic))
        return reply_pipe_recv.recv()


    def end_atomic_seq(self, query_num):
        """Ends the current atomic sequence.

        This enqueues a null request with the given query_num and atomicity set
        to False, causing the multiplexer to terminate the atomic sequence.

        @param query_num: The unique query number.
        """
        self._request_queue.put(ReqTuple(None, None, query_num, False))