普通文本  |  157行  |  5.56 KB

#!/usr/bin/env python
#
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import os
import unittest

import mox

from apiclient import discovery
from oauth2client.client import ApplicationDefaultCredentialsError
from oauth2client.client import GoogleCredentials
from googleapiclient.errors import UnknownApiNameOrVersion

import common
import pubsub_utils


class MockedPubSub(object):
    """A mocked PubSub handle."""
    def __init__(self, test, topic, msg, retry, ret_val=None,
            raise_except=False):
        self.test = test
        self.topic = topic
        self.msg = msg
        self.retry = retry
        self.ret_val = ret_val
        self.raise_except = raise_except

    def projects(self):
        """Mocked PubSub projects."""
        return self

    def topics(self):
        """Mocked PubSub topics."""
        return self

    def publish(self, topic, body):
        """Mocked PubSub publish method.

        @param topic: PubSub topic string.
        @param body: PubSub notification body.
        """
        self.test.assertEquals(self.topic, topic)
        self.test.assertEquals(self.msg, body['messages'][0])
        return self

    def execute(self, num_retries):
        """Mocked PubSub execute method.

        @param num_retries: Number of retries.
        """
        self.test.assertEquals(self.num_retries, num_retries)
        if self.raise_except:
            raise Exception()
        return self.ret


def _create_sample_message():
    """Creates a sample pubsub message."""
    msg_payload = {'data': 'sample data'}
    msg_attributes = {}
    msg_attributes['var'] = 'value'
    msg_payload['attributes'] = msg_attributes

    return msg_payload


class PubSubTests(mox.MoxTestBase):
    """Tests for pubsub related functios."""

    def test_get_pubsub_service_no_service_account(self):
        """Test getting the pubsub service"""
        self.mox.StubOutWithMock(os.path, 'isfile')
        os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(False)
        self.mox.ReplayAll()
        pubsub = pubsub_utils._get_pubsub_service()
        self.assertIsNone(pubsub)
        self.mox.VerifyAll()

    def test_get_pubsub_service_with_invalid_service_account(self):
        """Test getting the pubsub service"""
        self.mox.StubOutWithMock(os.path, 'isfile')
        self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
        os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
        credentials = self.mox.CreateMock(GoogleCredentials)
        GoogleCredentials.from_stream(
                pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndRaise(
                        ApplicationDefaultCredentialsError())
        self.mox.ReplayAll()
        pubsub = pubsub_utils._get_pubsub_service()
        self.assertIsNone(pubsub)
        self.mox.VerifyAll()

    def test_get_pubsub_service_with_invalid_service_account(self):
        """Test getting the pubsub service"""
        self.mox.StubOutWithMock(os.path, 'isfile')
        self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
        os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
        credentials = self.mox.CreateMock(GoogleCredentials)
        GoogleCredentials.from_stream(
                pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials)
        credentials.create_scoped_required().AndReturn(True)
        credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn(
                credentials)
        self.mox.StubOutWithMock(discovery, 'build')
        discovery.build(pubsub_utils.PUBSUB_SERVICE_NAME,
                pubsub_utils.PUBSUB_VERSION,
                credentials=credentials).AndRaise(UnknownApiNameOrVersion())
        self.mox.ReplayAll()
        pubsub = pubsub_utils._get_pubsub_service()
        self.assertIsNone(pubsub)
        self.mox.VerifyAll()

    def test_get_pubsub_service_with_service_account(self):
        """Test getting the pubsub service"""
        self.mox.StubOutWithMock(os.path, 'isfile')
        self.mox.StubOutWithMock(GoogleCredentials, 'from_stream')
        os.path.isfile(pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(True)
        credentials = self.mox.CreateMock(GoogleCredentials)
        GoogleCredentials.from_stream(
                pubsub_utils.CLOUD_SERVICE_ACCOUNT_FILE).AndReturn(credentials)
        credentials.create_scoped_required().AndReturn(True)
        credentials.create_scoped(pubsub_utils.PUBSUB_SCOPES).AndReturn(
                credentials)
        self.mox.StubOutWithMock(discovery, 'build')
        discovery.build(pubsub_utils.PUBSUB_SERVICE_NAME,
                pubsub_utils.PUBSUB_VERSION,
                credentials=credentials).AndReturn(1)
        self.mox.ReplayAll()
        pubsub = pubsub_utils._get_pubsub_service()
        self.assertIsNotNone(pubsub)
        self.mox.VerifyAll()

    def test_publish_notifications(self):
        """Tests publish notifications."""
        self.mox.StubOutWithMock(pubsub_utils, '_get_pubsub_service')
        msg = _create_sample_message()
        pubsub_utils._get_pubsub_service().AndReturn(MockedPubSub(
            self,
            'test_topic',
            msg,
            pubsub_utils._PUBSUB_NUM_RETRIES,
            # use tuple ('123') instead of list just for easy to write the test.
            ret_val = {'messageIds', ('123')}))

        self.mox.ReplayAll()
        pubsub_utils.publish_notifications(
                'test_topic', [msg])
        self.mox.VerifyAll()


if __name__ == '__main__':
    unittest.main()