普通文本  |  198行  |  7.53 KB

#
# Copyright 2015 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Integration tests for uploading and downloading to GCS.

These tests exercise most of the corner cases for upload/download of
files in apitools, via GCS. There are no performance tests here yet.
"""

import json
import os
import unittest

import six

import apitools.base.py as apitools_base
import storage

_CLIENT = None


def _GetClient():
    global _CLIENT  # pylint: disable=global-statement
    if _CLIENT is None:
        _CLIENT = storage.StorageV1()
    return _CLIENT


class DownloadsTest(unittest.TestCase):
    _DEFAULT_BUCKET = 'apitools'
    _TESTDATA_PREFIX = 'testdata'

    def setUp(self):
        self.__client = _GetClient()
        self.__ResetDownload()

    def __ResetDownload(self, auto_transfer=False):
        self.__buffer = six.StringIO()
        self.__download = storage.Download.FromStream(
            self.__buffer, auto_transfer=auto_transfer)

    def __GetTestdataFileContents(self, filename):
        file_path = os.path.join(
            os.path.dirname(__file__), self._TESTDATA_PREFIX, filename)
        file_contents = open(file_path).read()
        self.assertIsNotNone(
            file_contents, msg=('Could not read file %s' % filename))
        return file_contents

    @classmethod
    def __GetRequest(cls, filename):
        object_name = os.path.join(cls._TESTDATA_PREFIX, filename)
        return storage.StorageObjectsGetRequest(
            bucket=cls._DEFAULT_BUCKET, object=object_name)

    def __GetFile(self, request):
        response = self.__client.objects.Get(request, download=self.__download)
        self.assertIsNone(response, msg=(
            'Unexpected nonempty response for file download: %s' % response))

    def __GetAndStream(self, request):
        self.__GetFile(request)
        self.__download.StreamInChunks()

    def testZeroBytes(self):
        request = self.__GetRequest('zero_byte_file')
        self.__GetAndStream(request)
        self.assertEqual(0, self.__buffer.tell())

    def testObjectDoesNotExist(self):
        self.__ResetDownload(auto_transfer=True)
        with self.assertRaises(apitools_base.HttpError):
            self.__GetFile(self.__GetRequest('nonexistent_file'))

    def testAutoTransfer(self):
        self.__ResetDownload(auto_transfer=True)
        self.__GetFile(self.__GetRequest('fifteen_byte_file'))
        file_contents = self.__GetTestdataFileContents('fifteen_byte_file')
        self.assertEqual(15, self.__buffer.tell())
        self.__buffer.seek(0)
        self.assertEqual(file_contents, self.__buffer.read())

    def testFilenameWithSpaces(self):
        self.__ResetDownload(auto_transfer=True)
        self.__GetFile(self.__GetRequest('filename with spaces'))
        # NOTE(craigcitro): We add _ here to make this play nice with blaze.
        file_contents = self.__GetTestdataFileContents('filename_with_spaces')
        self.assertEqual(15, self.__buffer.tell())
        self.__buffer.seek(0)
        self.assertEqual(file_contents, self.__buffer.read())

    def testGetRange(self):
        # TODO(craigcitro): Test about a thousand more corner cases.
        file_contents = self.__GetTestdataFileContents('fifteen_byte_file')
        self.__GetFile(self.__GetRequest('fifteen_byte_file'))
        self.__download.GetRange(5, 10)
        self.assertEqual(6, self.__buffer.tell())
        self.__buffer.seek(0)
        self.assertEqual(file_contents[5:11], self.__buffer.read())

    def testGetRangeWithNegativeStart(self):
        file_contents = self.__GetTestdataFileContents('fifteen_byte_file')
        self.__GetFile(self.__GetRequest('fifteen_byte_file'))
        self.__download.GetRange(-3)
        self.assertEqual(3, self.__buffer.tell())
        self.__buffer.seek(0)
        self.assertEqual(file_contents[-3:], self.__buffer.read())

    def testGetRangeWithPositiveStart(self):
        file_contents = self.__GetTestdataFileContents('fifteen_byte_file')
        self.__GetFile(self.__GetRequest('fifteen_byte_file'))
        self.__download.GetRange(2)
        self.assertEqual(13, self.__buffer.tell())
        self.__buffer.seek(0)
        self.assertEqual(file_contents[2:15], self.__buffer.read())

    def testSmallChunksizes(self):
        file_contents = self.__GetTestdataFileContents('fifteen_byte_file')
        request = self.__GetRequest('fifteen_byte_file')
        for chunksize in (2, 3, 15, 100):
            self.__ResetDownload()
            self.__download.chunksize = chunksize
            self.__GetAndStream(request)
            self.assertEqual(15, self.__buffer.tell())
            self.__buffer.seek(0)
            self.assertEqual(file_contents, self.__buffer.read(15))

    def testLargeFileChunksizes(self):
        request = self.__GetRequest('thirty_meg_file')
        for chunksize in (1048576, 40 * 1048576):
            self.__ResetDownload()
            self.__download.chunksize = chunksize
            self.__GetAndStream(request)
            self.__buffer.seek(0)

    def testAutoGzipObject(self):
        # TODO(craigcitro): Move this to a new object once we have a more
        # permanent one, see: http://b/12250275
        request = storage.StorageObjectsGetRequest(
            bucket='ottenl-gzip', object='50K.txt')
        # First, try without auto-transfer.
        self.__GetFile(request)
        self.assertEqual(0, self.__buffer.tell())
        self.__download.StreamInChunks()
        self.assertEqual(50000, self.__buffer.tell())
        # Next, try with auto-transfer.
        self.__ResetDownload(auto_transfer=True)
        self.__GetFile(request)
        self.assertEqual(50000, self.__buffer.tell())

    def testSmallGzipObject(self):
        request = self.__GetRequest('zero-gzipd.html')
        self.__GetFile(request)
        self.assertEqual(0, self.__buffer.tell())
        additional_headers = {'accept-encoding': 'gzip, deflate'}
        self.__download.StreamInChunks(additional_headers=additional_headers)
        self.assertEqual(0, self.__buffer.tell())

    def testSerializedDownload(self):

        def _ProgressCallback(unused_response, download_object):
            print 'Progress %s' % download_object.progress

        file_contents = self.__GetTestdataFileContents('fifteen_byte_file')
        object_name = os.path.join(self._TESTDATA_PREFIX, 'fifteen_byte_file')
        request = storage.StorageObjectsGetRequest(
            bucket=self._DEFAULT_BUCKET, object=object_name)
        response = self.__client.objects.Get(request)
        # pylint: disable=attribute-defined-outside-init
        self.__buffer = six.StringIO()
        download_data = json.dumps({
            'auto_transfer': False,
            'progress': 0,
            'total_size': response.size,
            'url': response.mediaLink,
        })
        self.__download = storage.Download.FromData(
            self.__buffer, download_data, http=self.__client.http)
        self.__download.StreamInChunks(callback=_ProgressCallback)
        self.assertEqual(15, self.__buffer.tell())
        self.__buffer.seek(0)
        self.assertEqual(file_contents, self.__buffer.read(15))

if __name__ == '__main__':
    unittest.main()