#!/usr/bin/env python # # Copyright 2016 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Tests for acloud.internal.lib.gcompute_client.""" # pylint: disable=too-many-lines import copy import os import unittest import mock # pylint: disable=import-error import apiclient.http from acloud import errors from acloud.internal.lib import driver_test_lib from acloud.internal.lib import gcompute_client from acloud.internal.lib import utils GS_IMAGE_SOURCE_URI = "https://storage.googleapis.com/fake-bucket/fake.tar.gz" GS_IMAGE_SOURCE_DISK = ( "https://www.googleapis.com/compute/v1/projects/fake-project/zones/" "us-east1-d/disks/fake-disk") PROJECT = "fake-project" # pylint: disable=protected-access, too-many-public-methods class ComputeClientTest(driver_test_lib.BaseDriverTest): """Test ComputeClient.""" PROJECT_OTHER = "fake-project-other" INSTANCE = "fake-instance" IMAGE = "fake-image" IMAGE_URL = "http://fake-image-url" IMAGE_OTHER = "fake-image-other" MACHINE_TYPE = "fake-machine-type" MACHINE_TYPE_URL = "http://fake-machine-type-url" METADATA = ("metadata_key", "metadata_value") ACCELERATOR_URL = "http://speedy-gpu" NETWORK = "fake-network" NETWORK_URL = "http://fake-network-url" SUBNETWORK_URL = "http://fake-subnetwork-url" ZONE = "fake-zone" REGION = "fake-region" OPERATION_NAME = "fake-op" IMAGE_FINGERPRINT = "L_NWHuz7wTY=" GPU = "fancy-graphics" SSHKEY = ( "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDBkTOTRze9v2VOqkkf7RG" "jSkg6Z2kb9Q9UHsDGatvend3fmjIw1Tugg0O7nnjlPkskmlgyd4a/j99WOeLL" "CPk6xPyoVjrPUVBU/pAk09ORTC4Zqk6YjlW7LOfzvqmXhmIZfYu6Q4Yt50pZzhl" "lllfu26nYjY7Tg12D019nJi/kqPX5+NKgt0LGXTu8T1r2Gav/q4V7QRWQrB8Eiu" "pxXR7I2YhynqovkEt/OXG4qWgvLEXGsWtSQs0CtCzqEVxz0Y9ECr7er4VdjSQxV" "AaeLAsQsK9ROae8hMBFZ3//8zLVapBwpuffCu+fUoql9qeV9xagZcc9zj8XOUOW" "ApiihqNL1111 test@test1.org") EXTRA_SCOPES = ["scope1"] def setUp(self): """Set up test.""" super(ComputeClientTest, self).setUp() self.Patch(gcompute_client.ComputeClient, "InitResourceHandle") fake_cfg = mock.MagicMock() fake_cfg.project = PROJECT fake_cfg.extra_scopes = self.EXTRA_SCOPES self.compute_client = gcompute_client.ComputeClient( fake_cfg, mock.MagicMock()) self.compute_client._service = mock.MagicMock() self._disk_args = copy.deepcopy(gcompute_client.BASE_DISK_ARGS) self._disk_args["initializeParams"] = {"diskName": self.INSTANCE, "sourceImage": self.IMAGE_URL} # pylint: disable=invalid-name def _SetupMocksForGetOperationStatus(self, mock_result, operation_scope): """A helper class for setting up mocks for testGetOperationStatus*. Args: mock_result: The result to return by _GetOperationStatus. operation_scope: A value of OperationScope. Returns: A mock for Resource object. """ resource_mock = mock.MagicMock() mock_api = mock.MagicMock() if operation_scope == gcompute_client.OperationScope.GLOBAL: self.compute_client._service.globalOperations = mock.MagicMock( return_value=resource_mock) elif operation_scope == gcompute_client.OperationScope.ZONE: self.compute_client._service.zoneOperations = mock.MagicMock( return_value=resource_mock) elif operation_scope == gcompute_client.OperationScope.REGION: self.compute_client._service.regionOperations = mock.MagicMock( return_value=resource_mock) resource_mock.get = mock.MagicMock(return_value=mock_api) mock_api.execute = mock.MagicMock(return_value=mock_result) return resource_mock def testGetOperationStatusGlobal(self): """Test _GetOperationStatus for global.""" resource_mock = self._SetupMocksForGetOperationStatus( {"status": "GOOD"}, gcompute_client.OperationScope.GLOBAL) status = self.compute_client._GetOperationStatus( {"name": self.OPERATION_NAME}, gcompute_client.OperationScope.GLOBAL) self.assertEqual(status, "GOOD") resource_mock.get.assert_called_with( project=PROJECT, operation=self.OPERATION_NAME) def testGetOperationStatusZone(self): """Test _GetOperationStatus for zone.""" resource_mock = self._SetupMocksForGetOperationStatus( {"status": "GOOD"}, gcompute_client.OperationScope.ZONE) status = self.compute_client._GetOperationStatus( {"name": self.OPERATION_NAME}, gcompute_client.OperationScope.ZONE, self.ZONE) self.assertEqual(status, "GOOD") resource_mock.get.assert_called_with( project=PROJECT, operation=self.OPERATION_NAME, zone=self.ZONE) def testGetOperationStatusRegion(self): """Test _GetOperationStatus for region.""" resource_mock = self._SetupMocksForGetOperationStatus( {"status": "GOOD"}, gcompute_client.OperationScope.REGION) self.compute_client._GetOperationStatus( {"name": self.OPERATION_NAME}, gcompute_client.OperationScope.REGION, self.REGION) resource_mock.get.assert_called_with( project=PROJECT, operation=self.OPERATION_NAME, region=self.REGION) def testGetOperationStatusError(self): """Test _GetOperationStatus failed.""" self._SetupMocksForGetOperationStatus( {"error": {"errors": ["error1", "error2"]}}, gcompute_client.OperationScope.GLOBAL) self.assertRaisesRegexp(errors.DriverError, "Get operation state failed.*error1.*error2", self.compute_client._GetOperationStatus, {"name": self.OPERATION_NAME}, gcompute_client.OperationScope.GLOBAL) @mock.patch.object(errors, "GceOperationTimeoutError") @mock.patch.object(utils, "PollAndWait") def testWaitOnOperation(self, mock_poll, mock_gce_operation_timeout_error): """Test WaitOnOperation.""" mock_error = mock.MagicMock() mock_gce_operation_timeout_error.return_value = mock_error self.compute_client.WaitOnOperation( operation={"name": self.OPERATION_NAME}, operation_scope=gcompute_client.OperationScope.REGION, scope_name=self.REGION) mock_poll.assert_called_with( func=self.compute_client._GetOperationStatus, expected_return="DONE", timeout_exception=mock_error, timeout_secs=self.compute_client.OPERATION_TIMEOUT_SECS, sleep_interval_secs=self.compute_client.OPERATION_POLL_INTERVAL_SECS, operation={"name": self.OPERATION_NAME}, operation_scope=gcompute_client.OperationScope.REGION, scope_name=self.REGION) def testGetImage(self): """Test GetImage.""" resource_mock = mock.MagicMock() mock_api = mock.MagicMock() self.compute_client._service.images = mock.MagicMock( return_value=resource_mock) resource_mock.get = mock.MagicMock(return_value=mock_api) mock_api.execute = mock.MagicMock(return_value={"name": self.IMAGE}) result = self.compute_client.GetImage(self.IMAGE) self.assertEqual(result, {"name": self.IMAGE}) resource_mock.get.assert_called_with(project=PROJECT, image=self.IMAGE) def testGetImageOther(self): """Test GetImage with other project.""" resource_mock = mock.MagicMock() mock_api = mock.MagicMock() self.compute_client._service.images = mock.MagicMock( return_value=resource_mock) resource_mock.get = mock.MagicMock(return_value=mock_api) mock_api.execute = mock.MagicMock(return_value={"name": self.IMAGE_OTHER}) result = self.compute_client.GetImage( image_name=self.IMAGE_OTHER, image_project=self.PROJECT_OTHER) self.assertEqual(result, {"name": self.IMAGE_OTHER}) resource_mock.get.assert_called_with( project=self.PROJECT_OTHER, image=self.IMAGE_OTHER) def testCreateImageWithSourceURI(self): """Test CreateImage with src uri.""" source_uri = GS_IMAGE_SOURCE_URI source_disk = None labels = None expected_body = {"name": self.IMAGE, "rawDisk": {"source": GS_IMAGE_SOURCE_URI}} mock_check = self.Patch(gcompute_client.ComputeClient, "CheckImageExists", return_value=False) mock_wait = self.Patch(gcompute_client.ComputeClient, "WaitOnOperation") resource_mock = mock.MagicMock() self.compute_client._service.images = mock.MagicMock( return_value=resource_mock) resource_mock.insert = mock.MagicMock() self.compute_client.CreateImage( image_name=self.IMAGE, source_uri=source_uri, source_disk=source_disk, labels=labels) resource_mock.insert.assert_called_with( project=PROJECT, body=expected_body) mock_wait.assert_called_with( operation=mock.ANY, operation_scope=gcompute_client.OperationScope.GLOBAL) mock_check.assert_called_with(self.IMAGE) def testCreateImageWithSourceDisk(self): """Test CreateImage with src disk.""" source_uri = None source_disk = GS_IMAGE_SOURCE_DISK labels = None expected_body = {"name": self.IMAGE, "sourceDisk": GS_IMAGE_SOURCE_DISK} mock_check = self.Patch(gcompute_client.ComputeClient, "CheckImageExists", return_value=False) mock_wait = self.Patch(gcompute_client.ComputeClient, "WaitOnOperation") resource_mock = mock.MagicMock() self.compute_client._service.images = mock.MagicMock( return_value=resource_mock) resource_mock.insert = mock.MagicMock() self.compute_client.CreateImage( image_name=self.IMAGE, source_uri=source_uri, source_disk=source_disk, labels=labels) resource_mock.insert.assert_called_with( project=PROJECT, body=expected_body) mock_wait.assert_called_with( operation=mock.ANY, operation_scope=gcompute_client.OperationScope.GLOBAL) mock_check.assert_called_with(self.IMAGE) def testCreateImageWithSourceDiskAndLabel(self): """Test CreateImage with src disk and label.""" source_uri = None source_disk = GS_IMAGE_SOURCE_DISK labels = {"label1": "xxx"} expected_body = {"name": self.IMAGE, "sourceDisk": GS_IMAGE_SOURCE_DISK, "labels": {"label1": "xxx"}} mock_check = self.Patch(gcompute_client.ComputeClient, "CheckImageExists", return_value=False) mock_wait = self.Patch(gcompute_client.ComputeClient, "WaitOnOperation") resource_mock = mock.MagicMock() self.compute_client._service.images = mock.MagicMock( return_value=resource_mock) resource_mock.insert = mock.MagicMock() self.compute_client.CreateImage( image_name=self.IMAGE, source_uri=source_uri, source_disk=source_disk, labels=labels) resource_mock.insert.assert_called_with( project=PROJECT, body=expected_body) mock_wait.assert_called_with( operation=mock.ANY, operation_scope=gcompute_client.OperationScope.GLOBAL) mock_check.assert_called_with(self.IMAGE) @mock.patch.object(gcompute_client.ComputeClient, "GetImage") def testSetImageLabel(self, mock_get_image): """Test SetImageLabel.""" with mock.patch.object(self.compute_client._service, "images", return_value=mock.MagicMock( setLabels=mock.MagicMock())) as _: image = {"name": self.IMAGE, "sourceDisk": GS_IMAGE_SOURCE_DISK, "labelFingerprint": self.IMAGE_FINGERPRINT, "labels": {"a": "aaa", "b": "bbb"}} mock_get_image.return_value = image new_labels = {"a": "xxx", "c": "ccc"} # Test self.compute_client.SetImageLabels( self.IMAGE, new_labels) # Check result expected_labels = {"a": "xxx", "b": "bbb", "c": "ccc"} self.compute_client._service.images().setLabels.assert_called_with( project=PROJECT, resource=self.IMAGE, body={ "labels": expected_labels, "labelFingerprint": self.IMAGE_FINGERPRINT }) def testCreateImageRaiseDriverErrorWithValidInput(self): """Test CreateImage with valid input.""" source_uri = GS_IMAGE_SOURCE_URI source_disk = GS_IMAGE_SOURCE_DISK self.Patch(gcompute_client.ComputeClient, "CheckImageExists", return_value=False) self.assertRaises(errors.DriverError, self.compute_client.CreateImage, image_name=self.IMAGE, source_uri=source_uri, source_disk=source_disk) def testCreateImageRaiseDriverErrorWithInvalidInput(self): """Test CreateImage with valid input.""" source_uri = None source_disk = None self.Patch(gcompute_client.ComputeClient, "CheckImageExists", return_value=False) self.assertRaises(errors.DriverError, self.compute_client.CreateImage, image_name=self.IMAGE, source_uri=source_uri, source_disk=source_disk) @mock.patch.object(gcompute_client.ComputeClient, "DeleteImage") @mock.patch.object(gcompute_client.ComputeClient, "CheckImageExists", side_effect=[False, True]) @mock.patch.object(gcompute_client.ComputeClient, "WaitOnOperation", side_effect=errors.DriverError("Expected fake error")) def testCreateImageFail(self, mock_wait, mock_check, mock_delete): """Test CreateImage fails.""" resource_mock = mock.MagicMock() self.compute_client._service.images = mock.MagicMock( return_value=resource_mock) resource_mock.insert = mock.MagicMock() expected_body = { "name": self.IMAGE, "rawDisk": { "source": GS_IMAGE_SOURCE_URI, }, } self.assertRaisesRegexp( errors.DriverError, "Expected fake error", self.compute_client.CreateImage, image_name=self.IMAGE, source_uri=GS_IMAGE_SOURCE_URI) resource_mock.insert.assert_called_with( project=PROJECT, body=expected_body) mock_wait.assert_called_with( operation=mock.ANY, operation_scope=gcompute_client.OperationScope.GLOBAL) mock_check.assert_called_with(self.IMAGE) mock_delete.assert_called_with(self.IMAGE) def testCheckImageExistsTrue(self): """Test CheckImageExists return True.""" resource_mock = mock.MagicMock() mock_api = mock.MagicMock() self.compute_client._service.images = mock.MagicMock( return_value=resource_mock) resource_mock.get = mock.MagicMock(return_value=mock_api) mock_api.execute = mock.MagicMock(return_value={"name": self.IMAGE}) self.assertTrue(self.compute_client.CheckImageExists(self.IMAGE)) def testCheckImageExistsFalse(self): """Test CheckImageExists return False.""" resource_mock = mock.MagicMock() mock_api = mock.MagicMock() self.compute_client._service.images = mock.MagicMock( return_value=resource_mock) resource_mock.get = mock.MagicMock(return_value=mock_api) mock_api.execute = mock.MagicMock( side_effect=errors.ResourceNotFoundError(404, "no image")) self.assertFalse(self.compute_client.CheckImageExists(self.IMAGE)) @mock.patch.object(gcompute_client.ComputeClient, "WaitOnOperation") def testDeleteImage(self, mock_wait): """Test DeleteImage.""" resource_mock = mock.MagicMock() self.compute_client._service.images = mock.MagicMock( return_value=resource_mock) resource_mock.delete = mock.MagicMock() self.compute_client.DeleteImage(self.IMAGE) resource_mock.delete.assert_called_with( project=PROJECT, image=self.IMAGE) self.assertTrue(mock_wait.called) def _SetupBatchHttpRequestMock(self): """Setup BatchHttpRequest mock.""" requests = {} def _Add(request, callback, request_id): requests[request_id] = (request, callback) def _Execute(): for rid in requests: _, callback = requests[rid] callback( request_id=rid, response=mock.MagicMock(), exception=None) mock_batch = mock.MagicMock() mock_batch.add = _Add mock_batch.execute = _Execute self.Patch(apiclient.http, "BatchHttpRequest", return_value=mock_batch) @mock.patch.object(gcompute_client.ComputeClient, "WaitOnOperation") def testDeleteImages(self, mock_wait): """Test DeleteImages.""" self._SetupBatchHttpRequestMock() fake_images = ["fake_image_1", "fake_image_2"] mock_api = mock.MagicMock() resource_mock = mock.MagicMock() self.compute_client._service.images = mock.MagicMock( return_value=resource_mock) resource_mock.delete = mock.MagicMock(return_value=mock_api) # Call the API. deleted, failed, error_msgs = self.compute_client.DeleteImages( fake_images) # Verify calls = [ mock.call(project=PROJECT, image="fake_image_1"), mock.call(project=PROJECT, image="fake_image_2") ] resource_mock.delete.assert_has_calls(calls, any_order=True) self.assertEqual(mock_wait.call_count, 2) self.assertEqual(error_msgs, []) self.assertEqual(failed, []) self.assertEqual(set(deleted), set(fake_images)) def testListImages(self): """Test ListImages.""" fake_token = "fake_next_page_token" image_1 = "image_1" image_2 = "image_2" response_1 = {"items": [image_1], "nextPageToken": fake_token} response_2 = {"items": [image_2]} self.Patch( gcompute_client.ComputeClient, "Execute", side_effect=[response_1, response_2]) resource_mock = mock.MagicMock() self.compute_client._service.images = mock.MagicMock( return_value=resource_mock) resource_mock.list = mock.MagicMock() images = self.compute_client.ListImages() calls = [ mock.call(project=PROJECT, filter=None, pageToken=None), mock.call(project=PROJECT, filter=None, pageToken=fake_token) ] resource_mock.list.assert_has_calls(calls) self.assertEqual(images, [image_1, image_2]) def testListImagesFromExternalProject(self): """Test ListImages which accepts different project.""" image = "image_1" response = {"items": [image]} self.Patch(gcompute_client.ComputeClient, "Execute", side_effect=[response]) resource_mock = mock.MagicMock() self.compute_client._service.images = mock.MagicMock( return_value=resource_mock) resource_mock.list = mock.MagicMock() images = self.compute_client.ListImages( image_project="fake-project-2") calls = [ mock.call(project="fake-project-2", filter=None, pageToken=None)] resource_mock.list.assert_has_calls(calls) self.assertEqual(images, [image]) def testGetInstance(self): """Test GetInstance.""" resource_mock = mock.MagicMock() mock_api = mock.MagicMock() self.compute_client._service.instances = mock.MagicMock( return_value=resource_mock) resource_mock.get = mock.MagicMock(return_value=mock_api) mock_api.execute = mock.MagicMock(return_value={"name": self.INSTANCE}) result = self.compute_client.GetInstance(self.INSTANCE, self.ZONE) self.assertEqual(result, {"name": self.INSTANCE}) resource_mock.get.assert_called_with( project=PROJECT, zone=self.ZONE, instance=self.INSTANCE) def testListInstances(self): """Test ListInstances.""" fake_token = "fake_next_page_token" instance_1 = "instance_1" instance_2 = "instance_2" response_1 = {"items": [instance_1], "nextPageToken": fake_token} response_2 = {"items": [instance_2]} self.Patch( gcompute_client.ComputeClient, "Execute", side_effect=[response_1, response_2]) resource_mock = mock.MagicMock() self.compute_client._service.instances = mock.MagicMock( return_value=resource_mock) resource_mock.list = mock.MagicMock() instances = self.compute_client.ListInstances(self.ZONE) calls = [ mock.call( project=PROJECT, zone=self.ZONE, filter=None, pageToken=None), mock.call( project=PROJECT, zone=self.ZONE, filter=None, pageToken=fake_token), ] resource_mock.list.assert_has_calls(calls) self.assertEqual(instances, [instance_1, instance_2]) @mock.patch.object(gcompute_client.ComputeClient, "GetImage") @mock.patch.object(gcompute_client.ComputeClient, "GetNetworkUrl") @mock.patch.object(gcompute_client.ComputeClient, "GetSubnetworkUrl") @mock.patch.object(gcompute_client.ComputeClient, "GetMachineType") @mock.patch.object(gcompute_client.ComputeClient, "WaitOnOperation") def testCreateInstance(self, mock_wait, mock_get_mach_type, mock_get_subnetwork_url, mock_get_network_url, mock_get_image): """Test CreateInstance.""" mock_get_mach_type.return_value = {"selfLink": self.MACHINE_TYPE_URL} mock_get_network_url.return_value = self.NETWORK_URL mock_get_subnetwork_url.return_value = self.SUBNETWORK_URL mock_get_image.return_value = {"selfLink": self.IMAGE_URL} resource_mock = mock.MagicMock() self.compute_client._service.instances = mock.MagicMock( return_value=resource_mock) resource_mock.insert = mock.MagicMock() self.Patch( self.compute_client, "_GetExtraDiskArgs", return_value=[{"fake_extra_arg": "fake_extra_value"}]) extra_disk_name = "gce-x86-userdebug-2345-abcd-data" expected_disk_args = [self._disk_args] expected_disk_args.extend([{"fake_extra_arg": "fake_extra_value"}]) expected_scope = [] expected_scope.extend(self.compute_client.DEFAULT_INSTANCE_SCOPE) expected_scope.extend(self.EXTRA_SCOPES) expected_body = { "machineType": self.MACHINE_TYPE_URL, "name": self.INSTANCE, "networkInterfaces": [ { "network": self.NETWORK_URL, "subnetwork": self.SUBNETWORK_URL, "accessConfigs": [ {"name": "External NAT", "type": "ONE_TO_ONE_NAT"} ], } ], "disks": expected_disk_args, "serviceAccounts": [ {"email": "default", "scopes": expected_scope} ], "metadata": { "items": [{"key": self.METADATA[0], "value": self.METADATA[1]}], }, } self.compute_client.CreateInstance( instance=self.INSTANCE, image_name=self.IMAGE, machine_type=self.MACHINE_TYPE, metadata={self.METADATA[0]: self.METADATA[1]}, network=self.NETWORK, zone=self.ZONE, extra_disk_name=extra_disk_name, extra_scopes=self.EXTRA_SCOPES) resource_mock.insert.assert_called_with( project=PROJECT, zone=self.ZONE, body=expected_body) mock_wait.assert_called_with( mock.ANY, operation_scope=gcompute_client.OperationScope.ZONE, scope_name=self.ZONE) @mock.patch.object(gcompute_client.ComputeClient, "GetAcceleratorUrl") @mock.patch.object(gcompute_client.ComputeClient, "GetImage") @mock.patch.object(gcompute_client.ComputeClient, "GetNetworkUrl") @mock.patch.object(gcompute_client.ComputeClient, "GetSubnetworkUrl") @mock.patch.object(gcompute_client.ComputeClient, "GetMachineType") @mock.patch.object(gcompute_client.ComputeClient, "WaitOnOperation") def testCreateInstanceWithGpu(self, mock_wait, mock_get_mach, mock_get_subnetwork, mock_get_network, mock_get_image, mock_get_accel): """Test CreateInstance with a GPU parameter not set to None.""" mock_get_mach.return_value = {"selfLink": self.MACHINE_TYPE_URL} mock_get_network.return_value = self.NETWORK_URL mock_get_subnetwork.return_value = self.SUBNETWORK_URL mock_get_accel.return_value = self.ACCELERATOR_URL mock_get_image.return_value = {"selfLink": self.IMAGE_URL} resource_mock = mock.MagicMock() self.compute_client._service.instances = mock.MagicMock( return_value=resource_mock) resource_mock.insert = mock.MagicMock() expected_body = { "machineType": self.MACHINE_TYPE_URL, "name": self.INSTANCE, "networkInterfaces": [{ "network": self.NETWORK_URL, "subnetwork": self.SUBNETWORK_URL, "accessConfigs": [{ "name": "External NAT", "type": "ONE_TO_ONE_NAT" }], }], "disks": [self._disk_args], "serviceAccounts": [{ "email": "default", "scopes": self.compute_client.DEFAULT_INSTANCE_SCOPE }], "scheduling": { "onHostMaintenance": "terminate" }, "guestAccelerators": [{ "acceleratorCount": 1, "acceleratorType": "http://speedy-gpu" }], "metadata": { "items": [{ "key": self.METADATA[0], "value": self.METADATA[1] }], }, } self.compute_client.CreateInstance( instance=self.INSTANCE, image_name=self.IMAGE, machine_type=self.MACHINE_TYPE, metadata={self.METADATA[0]: self.METADATA[1]}, network=self.NETWORK, zone=self.ZONE, gpu=self.GPU, extra_scopes=None) resource_mock.insert.assert_called_with( project=PROJECT, zone=self.ZONE, body=expected_body) mock_wait.assert_called_with( mock.ANY, operation_scope=gcompute_client.OperationScope.ZONE, scope_name=self.ZONE) @mock.patch.object(gcompute_client.ComputeClient, "WaitOnOperation") def testDeleteInstance(self, mock_wait): """Test DeleteInstance.""" resource_mock = mock.MagicMock() self.compute_client._service.instances = mock.MagicMock( return_value=resource_mock) resource_mock.delete = mock.MagicMock() self.compute_client.DeleteInstance( instance=self.INSTANCE, zone=self.ZONE) resource_mock.delete.assert_called_with( project=PROJECT, zone=self.ZONE, instance=self.INSTANCE) mock_wait.assert_called_with( mock.ANY, operation_scope=gcompute_client.OperationScope.ZONE, scope_name=self.ZONE) @mock.patch.object(gcompute_client.ComputeClient, "WaitOnOperation") def testDeleteInstances(self, mock_wait): """Test DeleteInstances.""" self._SetupBatchHttpRequestMock() fake_instances = ["fake_instance_1", "fake_instance_2"] mock_api = mock.MagicMock() resource_mock = mock.MagicMock() self.compute_client._service.instances = mock.MagicMock( return_value=resource_mock) resource_mock.delete = mock.MagicMock(return_value=mock_api) deleted, failed, error_msgs = self.compute_client.DeleteInstances( fake_instances, self.ZONE) calls = [ mock.call( project=PROJECT, instance="fake_instance_1", zone=self.ZONE), mock.call( project=PROJECT, instance="fake_instance_2", zone=self.ZONE) ] resource_mock.delete.assert_has_calls(calls, any_order=True) self.assertEqual(mock_wait.call_count, 2) self.assertEqual(error_msgs, []) self.assertEqual(failed, []) self.assertEqual(set(deleted), set(fake_instances)) def testCreateDiskWithProject(self): """Test CreateDisk with images using a set project.""" source_project = "fake-image-project" expected_project_to_use = "fake-image-project" mock_wait = self.Patch(gcompute_client.ComputeClient, "WaitOnOperation") resource_mock = mock.MagicMock() self.compute_client._service.disks = mock.MagicMock( return_value=resource_mock) resource_mock.insert = mock.MagicMock() self.compute_client.CreateDisk( "fake_disk", "fake_image", 10, self.ZONE, source_project=source_project) resource_mock.insert.assert_called_with( project=PROJECT, zone=self.ZONE, sourceImage="projects/%s/global/images/fake_image" % expected_project_to_use, body={ "name": "fake_disk", "sizeGb": 10, "type": "projects/%s/zones/%s/diskTypes/pd-standard" % (PROJECT, self.ZONE) }) self.assertTrue(mock_wait.called) def testCreateDiskWithNoSourceProject(self): """Test CreateDisk with images with no set project.""" source_project = None expected_project_to_use = PROJECT mock_wait = self.Patch(gcompute_client.ComputeClient, "WaitOnOperation") resource_mock = mock.MagicMock() self.compute_client._service.disks = mock.MagicMock( return_value=resource_mock) resource_mock.insert = mock.MagicMock() self.compute_client.CreateDisk( "fake_disk", "fake_image", 10, self.ZONE, source_project=source_project) resource_mock.insert.assert_called_with( project=PROJECT, zone=self.ZONE, sourceImage="projects/%s/global/images/fake_image" % expected_project_to_use, body={ "name": "fake_disk", "sizeGb": 10, "type": "projects/%s/zones/%s/diskTypes/pd-standard" % (PROJECT, self.ZONE) }) self.assertTrue(mock_wait.called) def testCreateDiskWithTypeStandard(self): """Test CreateDisk with images using standard.""" disk_type = gcompute_client.PersistentDiskType.STANDARD expected_disk_type_string = "pd-standard" mock_wait = self.Patch(gcompute_client.ComputeClient, "WaitOnOperation") resource_mock = mock.MagicMock() self.compute_client._service.disks = mock.MagicMock( return_value=resource_mock) resource_mock.insert = mock.MagicMock() self.compute_client.CreateDisk( "fake_disk", "fake_image", 10, self.ZONE, source_project="fake-project", disk_type=disk_type) resource_mock.insert.assert_called_with( project=PROJECT, zone=self.ZONE, sourceImage="projects/%s/global/images/fake_image" % "fake-project", body={ "name": "fake_disk", "sizeGb": 10, "type": "projects/%s/zones/%s/diskTypes/%s" % (PROJECT, self.ZONE, expected_disk_type_string) }) self.assertTrue(mock_wait.called) def testCreateDiskWithTypeSSD(self): """Test CreateDisk with images using standard.""" disk_type = gcompute_client.PersistentDiskType.SSD expected_disk_type_string = "pd-ssd" mock_wait = self.Patch(gcompute_client.ComputeClient, "WaitOnOperation") resource_mock = mock.MagicMock() self.compute_client._service.disks = mock.MagicMock( return_value=resource_mock) resource_mock.insert = mock.MagicMock() self.compute_client.CreateDisk( "fake_disk", "fake_image", 10, self.ZONE, source_project="fake-project", disk_type=disk_type) resource_mock.insert.assert_called_with( project=PROJECT, zone=self.ZONE, sourceImage="projects/%s/global/images/fake_image" % "fake-project", body={ "name": "fake_disk", "sizeGb": 10, "type": "projects/%s/zones/%s/diskTypes/%s" % (PROJECT, self.ZONE, expected_disk_type_string) }) self.assertTrue(mock_wait.called) @mock.patch.object(gcompute_client.ComputeClient, "WaitOnOperation") def testAttachDisk(self, mock_wait): """Test AttachDisk.""" resource_mock = mock.MagicMock() self.compute_client._service.instances = mock.MagicMock( return_value=resource_mock) resource_mock.attachDisk = mock.MagicMock() self.compute_client.AttachDisk( "fake_instance_1", self.ZONE, deviceName="fake_disk", source="fake-selfLink") resource_mock.attachDisk.assert_called_with( project=PROJECT, zone=self.ZONE, instance="fake_instance_1", body={ "deviceName": "fake_disk", "source": "fake-selfLink" }) self.assertTrue(mock_wait.called) @mock.patch.object(gcompute_client.ComputeClient, "WaitOnOperation") def testDetachDisk(self, mock_wait): """Test DetachDisk.""" resource_mock = mock.MagicMock() self.compute_client._service.instances = mock.MagicMock( return_value=resource_mock) resource_mock.detachDisk = mock.MagicMock() self.compute_client.DetachDisk("fake_instance_1", self.ZONE, "fake_disk") resource_mock.detachDisk.assert_called_with( project=PROJECT, zone=self.ZONE, instance="fake_instance_1", deviceName="fake_disk") self.assertTrue(mock_wait.called) @mock.patch.object(gcompute_client.ComputeClient, "GetAcceleratorUrl") @mock.patch.object(gcompute_client.ComputeClient, "WaitOnOperation") def testAttachAccelerator(self, mock_wait, mock_get_accel): """Test AttachAccelerator.""" mock_get_accel.return_value = self.ACCELERATOR_URL resource_mock = mock.MagicMock() self.compute_client._service.instances = mock.MagicMock( return_value=resource_mock) resource_mock.attachAccelerator = mock.MagicMock() self.compute_client.AttachAccelerator("fake_instance_1", self.ZONE, 1, "nvidia-tesla-k80") resource_mock.setMachineResources.assert_called_with( project=PROJECT, zone=self.ZONE, instance="fake_instance_1", body={ "guestAccelerators": [{ "acceleratorType": self.ACCELERATOR_URL, "acceleratorCount": 1 }] }) self.assertTrue(mock_wait.called) @mock.patch.object(gcompute_client.ComputeClient, "WaitOnOperation") def testBatchExecuteOnInstances(self, mock_wait): """Test BatchExecuteOnInstances.""" self._SetupBatchHttpRequestMock() action = mock.MagicMock(return_value=mock.MagicMock()) fake_instances = ["fake_instance_1", "fake_instance_2"] done, failed, error_msgs = self.compute_client._BatchExecuteOnInstances( fake_instances, self.ZONE, action) calls = [mock.call(instance="fake_instance_1"), mock.call(instance="fake_instance_2")] action.assert_has_calls(calls, any_order=True) self.assertEqual(mock_wait.call_count, 2) self.assertEqual(set(done), set(fake_instances)) self.assertEqual(error_msgs, []) self.assertEqual(failed, []) @mock.patch.object(gcompute_client.ComputeClient, "WaitOnOperation") def testResetInstance(self, mock_wait): """Test ResetInstance.""" resource_mock = mock.MagicMock() self.compute_client._service.instances = mock.MagicMock( return_value=resource_mock) resource_mock.reset = mock.MagicMock() self.compute_client.ResetInstance( instance=self.INSTANCE, zone=self.ZONE) resource_mock.reset.assert_called_with( project=PROJECT, zone=self.ZONE, instance=self.INSTANCE) mock_wait.assert_called_with( mock.ANY, operation_scope=gcompute_client.OperationScope.ZONE, scope_name=self.ZONE) def _CompareMachineSizeTestHelper(self, machine_info_1, machine_info_2, expected_result=None, expected_error_type=None): """Helper class for testing CompareMachineSize. Args: machine_info_1: A dictionary representing the first machine size. machine_info_2: A dictionary representing the second machine size. expected_result: An integer, 0, 1 or -1, or None if not set. expected_error_type: An exception type, if set will check for exception. """ mock_get_mach_type = self.Patch( gcompute_client.ComputeClient, "GetMachineType", side_effect=[machine_info_1, machine_info_2]) if expected_error_type: self.assertRaises(expected_error_type, self.compute_client.CompareMachineSize, "name1", "name2", self.ZONE) else: result = self.compute_client.CompareMachineSize("name1", "name2", self.ZONE) self.assertEqual(result, expected_result) mock_get_mach_type.assert_has_calls( [mock.call("name1", self.ZONE), mock.call("name2", self.ZONE)]) def testCompareMachineSizeSmall(self): """Test CompareMachineSize where the first one is smaller.""" machine_info_1 = {"guestCpus": 10, "memoryMb": 100} machine_info_2 = {"guestCpus": 10, "memoryMb": 200} self._CompareMachineSizeTestHelper(machine_info_1, machine_info_2, -1) def testCompareMachineSizeSmallSmallerOnSecond(self): """Test CompareMachineSize where the first one is smaller.""" machine_info_1 = {"guestCpus": 11, "memoryMb": 100} machine_info_2 = {"guestCpus": 10, "memoryMb": 200} self._CompareMachineSizeTestHelper(machine_info_1, machine_info_2, -1) def testCompareMachineSizeLarge(self): """Test CompareMachineSize where the first one is larger.""" machine_info_1 = {"guestCpus": 11, "memoryMb": 200} machine_info_2 = {"guestCpus": 10, "memoryMb": 100} self._CompareMachineSizeTestHelper(machine_info_1, machine_info_2, 1) def testCompareMachineSizeLargeWithEqualElement(self): """Test CompareMachineSize where the first one is larger.""" machine_info_1 = {"guestCpus": 10, "memoryMb": 200} machine_info_2 = {"guestCpus": 10, "memoryMb": 100} self._CompareMachineSizeTestHelper(machine_info_1, machine_info_2, 1) def testCompareMachineSizeEqual(self): """Test CompareMachineSize where two machine sizes are equal.""" machine_info = {"guestCpus": 10, "memoryMb": 100} self._CompareMachineSizeTestHelper(machine_info, machine_info, 0) def testCompareMachineSizeBadMetric(self): """Test CompareMachineSize with bad metric.""" machine_info = {"unknown_metric": 10, "memoryMb": 100} self._CompareMachineSizeTestHelper( machine_info, machine_info, expected_error_type=errors.DriverError) def testGetMachineType(self): """Test GetMachineType.""" resource_mock = mock.MagicMock() mock_api = mock.MagicMock() self.compute_client._service.machineTypes = mock.MagicMock( return_value=resource_mock) resource_mock.get = mock.MagicMock(return_value=mock_api) mock_api.execute = mock.MagicMock( return_value={"name": self.MACHINE_TYPE}) result = self.compute_client.GetMachineType(self.MACHINE_TYPE, self.ZONE) self.assertEqual(result, {"name": self.MACHINE_TYPE}) resource_mock.get.assert_called_with( project=PROJECT, zone=self.ZONE, machineType=self.MACHINE_TYPE) def _GetSerialPortOutputTestHelper(self, response): """Helper function for testing GetSerialPortOutput. Args: response: A dictionary representing a fake response. """ resource_mock = mock.MagicMock() mock_api = mock.MagicMock() self.compute_client._service.instances = mock.MagicMock( return_value=resource_mock) resource_mock.getSerialPortOutput = mock.MagicMock( return_value=mock_api) mock_api.execute = mock.MagicMock(return_value=response) if "contents" in response: result = self.compute_client.GetSerialPortOutput( instance=self.INSTANCE, zone=self.ZONE) self.assertEqual(result, "fake contents") else: self.assertRaisesRegexp( errors.DriverError, "Malformed response.*", self.compute_client.GetSerialPortOutput, instance=self.INSTANCE, zone=self.ZONE) resource_mock.getSerialPortOutput.assert_called_with( project=PROJECT, zone=self.ZONE, instance=self.INSTANCE, port=1) def testGetSerialPortOutput(self): """Test GetSerialPortOutput.""" response = {"contents": "fake contents"} self._GetSerialPortOutputTestHelper(response) def testGetSerialPortOutputFail(self): """Test GetSerialPortOutputFail.""" response = {"malformed": "fake contents"} self._GetSerialPortOutputTestHelper(response) def testGetInstanceNamesByIPs(self): """Test GetInstanceNamesByIPs.""" good_instance = { "name": "instance_1", "networkInterfaces": [ { "accessConfigs": [ {"natIP": "172.22.22.22"}, ], }, ], } bad_instance = {"name": "instance_2"} self.Patch( gcompute_client.ComputeClient, "ListInstances", return_value=[good_instance, bad_instance]) ip_name_map = self.compute_client.GetInstanceNamesByIPs( ips=["172.22.22.22", "172.22.22.23"], zone=self.ZONE) self.assertEqual(ip_name_map, {"172.22.22.22": "instance_1", "172.22.22.23": None}) def testRsaNotInMetadata(self): """Test rsa not in metadata.""" fake_user = "fake_user" fake_ssh_key = "fake_ssh" metadata = { "kind": "compute#metadata", "fingerprint": "a-23icsyx4E=", "items": [ { "key": "sshKeys", "value": "%s:%s" % (fake_user, self.SSHKEY) } ] } # Test rsa doesn't exist in metadata. new_entry = "%s:%s" % (fake_user, fake_ssh_key) self.assertEqual(True, gcompute_client.RsaNotInMetadata(metadata, new_entry)) # Test rsa exists in metadata. exist_entry = "%s:%s" %(fake_user, self.SSHKEY) self.assertEqual(False, gcompute_client.RsaNotInMetadata(metadata, exist_entry)) def testGetSshKeyFromMetadata(self): """Test get ssh key from metadata.""" fake_user = "fake_user" metadata_key_exist_value_is_empty = { "kind": "compute#metadata", "fingerprint": "a-23icsyx4E=", "items": [ { "key": "sshKeys", "value": "" } ] } metadata_key_exist = { "kind": "compute#metadata", "fingerprint": "a-23icsyx4E=", "items": [ { "key": "sshKeys", "value": "%s:%s" % (fake_user, self.SSHKEY) } ] } metadata_key_not_exist = { "kind": "compute#metadata", "fingerprint": "a-23icsyx4E=", "items": [ { } ] } expected_key_exist_value_is_empty = { "key": "sshKeys", "value": "" } expected_key_exist = { "key": "sshKeys", "value": "%s:%s" % (fake_user, self.SSHKEY) } self.assertEqual(expected_key_exist_value_is_empty, gcompute_client.GetSshKeyFromMetadata(metadata_key_exist_value_is_empty)) self.assertEqual(expected_key_exist, gcompute_client.GetSshKeyFromMetadata(metadata_key_exist)) self.assertEqual(None, gcompute_client.GetSshKeyFromMetadata(metadata_key_not_exist)) def testGetRsaKeyPathExistsFalse(self): """Test the rsa key path not exists.""" fake_ssh_rsa_path = "/path/to/test_rsa.pub" self.Patch(os.path, "exists", return_value=False) self.assertRaisesRegexp(errors.DriverError, "RSA file %s does not exist." % fake_ssh_rsa_path, gcompute_client.GetRsaKey, ssh_rsa_path=fake_ssh_rsa_path) def testGetRsaKey(self): """Test get the rsa key.""" fake_ssh_rsa_path = "/path/to/test_rsa.pub" self.Patch(os.path, "exists", return_value=True) m = mock.mock_open(read_data=self.SSHKEY) with mock.patch("__builtin__.open", m): result = gcompute_client.GetRsaKey(fake_ssh_rsa_path) self.assertEqual(self.SSHKEY, result) def testUpdateRsaInMetadata(self): """Test update rsa in metadata.""" fake_ssh_key = "fake_ssh" fake_metadata_sshkeys_not_exist = { "kind": "compute#metadata", "fingerprint": "a-23icsyx4E=", "items": [ { "key": "not_sshKeys", "value": "" } ] } new_entry = "new_user:%s" % fake_ssh_key expected = { "kind": "compute#metadata", "fingerprint": "a-23icsyx4E=", "items": [ { "key": "not_sshKeys", "value": "" }, { "key": "sshKeys", "value": new_entry } ] } self.Patch(os.path, "exists", return_value=True) self.Patch(gcompute_client.ComputeClient, "WaitOnOperation") resource_mock = mock.MagicMock() self.compute_client.SetInstanceMetadata = mock.MagicMock( return_value=resource_mock) # Test the key item not exists in the metadata. self.compute_client.UpdateRsaInMetadata( "fake_zone", "fake_instance", fake_metadata_sshkeys_not_exist, new_entry) self.compute_client.SetInstanceMetadata.assert_called_with( "fake_zone", "fake_instance", expected) # Test the key item exists in the metadata. fake_metadata_ssh_keys_exists = { "kind": "compute#metadata", "fingerprint": "a-23icsyx4E=", "items": [ { "key": "sshKeys", "value": "old_user:%s" % self.SSHKEY } ] } expected_ssh_keys_exists = { "kind": "compute#metadata", "fingerprint": "a-23icsyx4E=", "items": [ { "key": "sshKeys", "value": "old_user:%s\n%s" % (self.SSHKEY, new_entry) } ] } self.compute_client.UpdateRsaInMetadata( "fake_zone", "fake_instance", fake_metadata_ssh_keys_exists, new_entry) self.compute_client.SetInstanceMetadata.assert_called_with( "fake_zone", "fake_instance", expected_ssh_keys_exists) def testAddSshRsaToInstance(self): """Test add ssh rsa key to instance.""" fake_user = "fake_user" instance_metadata_key_not_exist = { "metadata": { "kind": "compute#metadata", "fingerprint": "a-23icsyx4E=", "items": [ { "key": "sshKeys", "value": "" } ] } } instance_metadata_key_exist = { "metadata": { "kind": "compute#metadata", "fingerprint": "a-23icsyx4E=", "items": [ { "key": "sshKeys", "value": "%s:%s" % (fake_user, self.SSHKEY) } ] } } expected = { "kind": "compute#metadata", "fingerprint": "a-23icsyx4E=", "items": [ { "key": "sshKeys", "value": "%s:%s" % (fake_user, self.SSHKEY) } ] } self.Patch(os.path, "exists", return_value=True) m = mock.mock_open(read_data=self.SSHKEY) self.Patch(gcompute_client.ComputeClient, "WaitOnOperation") resource_mock = mock.MagicMock() self.compute_client._service.instances = mock.MagicMock( return_value=resource_mock) resource_mock.setMetadata = mock.MagicMock() # Test the key not exists in the metadata. self.Patch( gcompute_client.ComputeClient, "GetInstance", return_value=instance_metadata_key_not_exist) with mock.patch("__builtin__.open", m): self.compute_client.AddSshRsaInstanceMetadata( "fake_zone", fake_user, "/path/to/test_rsa.pub", "fake_instance") resource_mock.setMetadata.assert_called_with( project=PROJECT, zone="fake_zone", instance="fake_instance", body=expected) # Test the key already exists in the metadata. resource_mock.setMetadata.call_count = 0 self.Patch( gcompute_client.ComputeClient, "GetInstance", return_value=instance_metadata_key_exist) with mock.patch("__builtin__.open", m): self.compute_client.AddSshRsaInstanceMetadata( "fake_zone", fake_user, "/path/to/test_rsa.pub", "fake_instance") resource_mock.setMetadata.assert_not_called() @mock.patch.object(gcompute_client.ComputeClient, "WaitOnOperation") def testDeleteDisks(self, mock_wait): """Test DeleteDisks.""" self._SetupBatchHttpRequestMock() fake_disks = ["fake_disk_1", "fake_disk_2"] mock_api = mock.MagicMock() resource_mock = mock.MagicMock() self.compute_client._service.disks = mock.MagicMock( return_value=resource_mock) resource_mock.delete = mock.MagicMock(return_value=mock_api) # Call the API. deleted, failed, error_msgs = self.compute_client.DeleteDisks( fake_disks, zone=self.ZONE) # Verify calls = [ mock.call(project=PROJECT, disk="fake_disk_1", zone=self.ZONE), mock.call(project=PROJECT, disk="fake_disk_2", zone=self.ZONE) ] resource_mock.delete.assert_has_calls(calls, any_order=True) self.assertEqual(mock_wait.call_count, 2) self.assertEqual(error_msgs, []) self.assertEqual(failed, []) self.assertEqual(set(deleted), set(fake_disks)) def testRetryOnFingerPrintError(self): """Test RetryOnFingerPrintError.""" @utils.RetryOnException(gcompute_client._IsFingerPrintError, 10) def Raise412(sentinel): """Raise 412 HTTP exception.""" if not sentinel.hitFingerPrintConflict.called: sentinel.hitFingerPrintConflict() raise errors.HttpError(412, "resource labels have changed") return "Passed" sentinel = mock.MagicMock() result = Raise412(sentinel) self.assertEqual(1, sentinel.hitFingerPrintConflict.call_count) self.assertEqual("Passed", result) if __name__ == "__main__": unittest.main()