普通文本  |  3276行  |  133.72 KB

# -*- coding: utf-8 -*-
# Copyright 2011 Google Inc. All Rights Reserved.
# Copyright 2011, Nexenta Systems Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper functions for copy functionality."""

from __future__ import absolute_import

import base64
from collections import namedtuple
import csv
import datetime
import errno
import gzip
from hashlib import md5
import json
import logging
import mimetypes
from operator import attrgetter
import os
import pickle
import random
import re
import shutil
import stat
import subprocess
import tempfile
import textwrap
import time
import traceback

from boto import config
import crcmod

import gslib
from gslib.cloud_api import ArgumentException
from gslib.cloud_api import CloudApi
from gslib.cloud_api import NotFoundException
from gslib.cloud_api import PreconditionException
from gslib.cloud_api import Preconditions
from gslib.cloud_api import ResumableDownloadException
from gslib.cloud_api import ResumableUploadAbortException
from gslib.cloud_api import ResumableUploadException
from gslib.cloud_api import ResumableUploadStartOverException
from gslib.cloud_api_helper import GetDownloadSerializationData
from gslib.commands.compose import MAX_COMPOSE_ARITY
from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE
from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD
from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE
from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS
from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD
from gslib.cs_api_map import ApiSelector
from gslib.daisy_chain_wrapper import DaisyChainWrapper
from gslib.exception import CommandException
from gslib.exception import HashMismatchException
from gslib.file_part import FilePart
from gslib.hashing_helper import Base64EncodeHash
from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
from gslib.hashing_helper import CalculateHashesFromContents
from gslib.hashing_helper import CHECK_HASH_IF_FAST_ELSE_FAIL
from gslib.hashing_helper import CHECK_HASH_NEVER
from gslib.hashing_helper import ConcatCrc32c
from gslib.hashing_helper import GetDownloadHashAlgs
from gslib.hashing_helper import GetUploadHashAlgs
from gslib.hashing_helper import HashingFileUploadWrapper
from gslib.parallelism_framework_util import AtomicDict
from gslib.progress_callback import ConstructAnnounceText
from gslib.progress_callback import FileProgressCallbackHandler
from gslib.progress_callback import ProgressCallbackWithBackoff
from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper
from gslib.storage_url import ContainsWildcard
from gslib.storage_url import StorageUrlFromString
from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
from gslib.tracker_file import DeleteDownloadTrackerFiles
from gslib.tracker_file import DeleteTrackerFile
from gslib.tracker_file import GetTrackerFilePath
from gslib.tracker_file import RaiseUnwritableTrackerFileException
from gslib.tracker_file import ReadOrCreateDownloadTrackerFile
from gslib.tracker_file import TrackerFileType
from gslib.tracker_file import WriteDownloadComponentTrackerFile
from gslib.translation_helper import AddS3MarkerAclToObjectMetadata
from gslib.translation_helper import CopyObjectMetadata
from gslib.translation_helper import DEFAULT_CONTENT_TYPE
from gslib.translation_helper import GenerationFromUrlAndString
from gslib.translation_helper import ObjectMetadataFromHeaders
from gslib.translation_helper import PreconditionsFromHeaders
from gslib.translation_helper import S3MarkerAclFromObjectMetadata
from gslib.util import CheckFreeSpace
from gslib.util import CheckMultiprocessingAvailableAndInit
from gslib.util import CreateLock
from gslib.util import DEFAULT_FILE_BUFFER_SIZE
from gslib.util import DivideAndCeil
from gslib.util import GetCloudApiInstance
from gslib.util import GetFileSize
from gslib.util import GetJsonResumableChunkSize
from gslib.util import GetMaxRetryDelay
from gslib.util import GetNumRetries
from gslib.util import GetStreamFromFileUrl
from gslib.util import HumanReadableToBytes
from gslib.util import IS_WINDOWS
from gslib.util import IsCloudSubdirPlaceholder
from gslib.util import MakeHumanReadable
from gslib.util import MIN_SIZE_COMPUTE_LOGGING
from gslib.util import ResumableThreshold
from gslib.util import TEN_MIB
from gslib.util import UsingCrcmodExtension
from gslib.util import UTF8
from gslib.wildcard_iterator import CreateWildcardIterator

# pylint: disable=g-import-not-at-top
if IS_WINDOWS:
  import msvcrt

# Declare copy_helper_opts as a global because namedtuple isn't aware of
# assigning to a class member (which breaks pickling done by multiprocessing).
# For details see
# http://stackoverflow.com/questions/16377215/how-to-pickle-a-namedtuple-instance-correctly
# pylint: disable=global-at-module-level
global global_copy_helper_opts

# In-memory map of local files that are currently opened for write. Used to
# ensure that if we write to the same file twice (say, for example, because the
# user specified two identical source URLs), the writes occur serially.
global open_files_map, open_files_lock
open_files_map = (
    AtomicDict() if not CheckMultiprocessingAvailableAndInit().is_available
    else AtomicDict(manager=gslib.util.manager))

# We don't allow multiple processes on Windows, so using a process-safe lock
# would be unnecessary.
open_files_lock = CreateLock()

# For debugging purposes; if True, files and objects that fail hash validation
# will be saved with the below suffix appended.
_RENAME_ON_HASH_MISMATCH = False
_RENAME_ON_HASH_MISMATCH_SUFFIX = '_corrupt'

PARALLEL_UPLOAD_TEMP_NAMESPACE = (
    u'/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/')

PARALLEL_UPLOAD_STATIC_SALT = u"""
PARALLEL_UPLOAD_SALT_TO_PREVENT_COLLISIONS.
The theory is that no user will have prepended this to the front of
one of their object names and then done an MD5 hash of the name, and
then prepended PARALLEL_UPLOAD_TEMP_NAMESPACE to the front of their object
name. Note that there will be no problems with object name length since we
hash the original name.
"""

# When uploading a file, get the following fields in the response for
# filling in command output and manifests.
UPLOAD_RETURN_FIELDS = ['crc32c', 'etag', 'generation', 'md5Hash', 'size']

# This tuple is used only to encapsulate the arguments needed for
# command.Apply() in the parallel composite upload case.
# Note that content_type is used instead of a full apitools Object() because
# apitools objects are not picklable.
# filename: String name of file.
# file_start: start byte of file (may be in the middle of a file for partitioned
#             files).
# file_length: length of upload (may not be the entire length of a file for
#              partitioned files).
# src_url: FileUrl describing the source file.
# dst_url: CloudUrl describing the destination component file.
# canned_acl: canned_acl to apply to the uploaded file/component.
# content_type: content-type for final object, used for setting content-type
#               of components and final object.
# tracker_file: tracker file for this component.
# tracker_file_lock: tracker file lock for tracker file(s).
PerformParallelUploadFileToObjectArgs = namedtuple(
    'PerformParallelUploadFileToObjectArgs',
    'filename file_start file_length src_url dst_url canned_acl '
    'content_type tracker_file tracker_file_lock')

PerformSlicedDownloadObjectToFileArgs = namedtuple(
    'PerformSlicedDownloadObjectToFileArgs',
    'component_num src_url src_obj_metadata dst_url download_file_name '
    'start_byte end_byte')

PerformSlicedDownloadReturnValues = namedtuple(
    'PerformSlicedDownloadReturnValues',
    'component_num crc32c bytes_transferred server_encoding')

ObjectFromTracker = namedtuple('ObjectFromTracker',
                               'object_name generation')

# TODO: Refactor this file to be less cumbersome. In particular, some of the
# different paths (e.g., uploading a file to an object vs. downloading an
# object to a file) could be split into separate files.

# Chunk size to use while zipping/unzipping gzip files.
GZIP_CHUNK_SIZE = 8192

# Number of bytes to wait before updating a sliced download component tracker
# file.
TRACKERFILE_UPDATE_THRESHOLD = TEN_MIB

PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD = 150 * 1024 * 1024

# S3 requires special Multipart upload logic (that we currently don't implement)
# for files > 5GiB in size.
S3_MAX_UPLOAD_SIZE = 5 * 1024 * 1024 * 1024

# TODO: Create a multiprocessing framework value allocator, then use it instead
# of a dict.
global suggested_sliced_transfers, suggested_sliced_transfers_lock
suggested_sliced_transfers = (
    AtomicDict() if not CheckMultiprocessingAvailableAndInit().is_available
    else AtomicDict(manager=gslib.util.manager))
suggested_sliced_transfers_lock = CreateLock()


class FileConcurrencySkipError(Exception):
  """Raised when skipping a file due to a concurrent, duplicate copy."""


def _RmExceptionHandler(cls, e):
  """Simple exception handler to allow post-completion status."""
  cls.logger.error(str(e))


def _ParallelCopyExceptionHandler(cls, e):
  """Simple exception handler to allow post-completion status."""
  cls.logger.error(str(e))
  cls.op_failure_count += 1
  cls.logger.debug('\n\nEncountered exception while copying:\n%s\n',
                   traceback.format_exc())


def _PerformParallelUploadFileToObject(cls, args, thread_state=None):
  """Function argument to Apply for performing parallel composite uploads.

  Args:
    cls: Calling Command class.
    args: PerformParallelUploadFileToObjectArgs tuple describing the target.
    thread_state: gsutil Cloud API instance to use for the operation.

  Returns:
    StorageUrl representing a successfully uploaded component.
  """
  fp = FilePart(args.filename, args.file_start, args.file_length)
  gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
  with fp:
    # We take many precautions with the component names that make collisions
    # effectively impossible. Specifying preconditions will just allow us to
    # reach a state in which uploads will always fail on retries.
    preconditions = None

    # Fill in content type if one was provided.
    dst_object_metadata = apitools_messages.Object(
        name=args.dst_url.object_name,
        bucket=args.dst_url.bucket_name,
        contentType=args.content_type)

    try:
      if global_copy_helper_opts.canned_acl:
        # No canned ACL support in JSON, force XML API to be used for
        # upload/copy operations.
        orig_prefer_api = gsutil_api.prefer_api
        gsutil_api.prefer_api = ApiSelector.XML
      ret = _UploadFileToObject(args.src_url, fp, args.file_length,
                                args.dst_url, dst_object_metadata,
                                preconditions, gsutil_api, cls.logger, cls,
                                _ParallelCopyExceptionHandler,
                                gzip_exts=None, allow_splitting=False)
    finally:
      if global_copy_helper_opts.canned_acl:
        gsutil_api.prefer_api = orig_prefer_api

  component = ret[2]
  _AppendComponentTrackerToParallelUploadTrackerFile(
      args.tracker_file, component, args.tracker_file_lock)
  return ret


CopyHelperOpts = namedtuple('CopyHelperOpts', [
    'perform_mv',
    'no_clobber',
    'daisy_chain',
    'read_args_from_stdin',
    'print_ver',
    'use_manifest',
    'preserve_acl',
    'canned_acl',
    'skip_unsupported_objects',
    'test_callback_file'])


# pylint: disable=global-variable-undefined
def CreateCopyHelperOpts(perform_mv=False, no_clobber=False, daisy_chain=False,
                         read_args_from_stdin=False, print_ver=False,
                         use_manifest=False, preserve_acl=False,
                         canned_acl=None, skip_unsupported_objects=False,
                         test_callback_file=None):
  """Creates CopyHelperOpts for passing options to CopyHelper."""
  # We create a tuple with union of options needed by CopyHelper and any
  # copy-related functionality in CpCommand, RsyncCommand, or Command class.
  global global_copy_helper_opts
  global_copy_helper_opts = CopyHelperOpts(
      perform_mv=perform_mv,
      no_clobber=no_clobber,
      daisy_chain=daisy_chain,
      read_args_from_stdin=read_args_from_stdin,
      print_ver=print_ver,
      use_manifest=use_manifest,
      preserve_acl=preserve_acl,
      canned_acl=canned_acl,
      skip_unsupported_objects=skip_unsupported_objects,
      test_callback_file=test_callback_file)
  return global_copy_helper_opts


# pylint: disable=global-variable-undefined
# pylint: disable=global-variable-not-assigned
def GetCopyHelperOpts():
  """Returns namedtuple holding CopyHelper options."""
  global global_copy_helper_opts
  return global_copy_helper_opts


def _SelectDownloadStrategy(dst_url):
  """Get download strategy based on the destination object.

  Args:
    dst_url: Destination StorageUrl.

  Returns:
    gsutil Cloud API DownloadStrategy.
  """
  dst_is_special = False
  if dst_url.IsFileUrl():
    # Check explicitly first because os.stat doesn't work on 'nul' in Windows.
    if dst_url.object_name == os.devnull:
      dst_is_special = True
    try:
      mode = os.stat(dst_url.object_name).st_mode
      if stat.S_ISCHR(mode):
        dst_is_special = True
    except OSError:
      pass

  if dst_is_special:
    return CloudApi.DownloadStrategy.ONE_SHOT
  else:
    return CloudApi.DownloadStrategy.RESUMABLE


def _GetUploadTrackerData(tracker_file_name, logger):
  """Reads tracker data from an upload tracker file if it exists.

  Args:
    tracker_file_name: Tracker file name for this upload.
    logger: for outputting log messages.

  Returns:
    Serialization data if the tracker file already exists (resume existing
    upload), None otherwise.
  """
  tracker_file = None

  # If we already have a matching tracker file, get the serialization data
  # so that we can resume the upload.
  try:
    tracker_file = open(tracker_file_name, 'r')
    tracker_data = tracker_file.read()
    return tracker_data
  except IOError as e:
    # Ignore non-existent file (happens first time a upload is attempted on an
    # object, or when re-starting an upload after a
    # ResumableUploadStartOverException), but warn user for other errors.
    if e.errno != errno.ENOENT:
      logger.warn('Couldn\'t read upload tracker file (%s): %s. Restarting '
                  'upload from scratch.', tracker_file_name, e.strerror)
  finally:
    if tracker_file:
      tracker_file.close()


def InsistDstUrlNamesContainer(exp_dst_url, have_existing_dst_container,
                               command_name):
  """Ensures the destination URL names a container.

  Acceptable containers include directory, bucket, bucket
  subdir, and non-existent bucket subdir.

  Args:
    exp_dst_url: Wildcard-expanded destination StorageUrl.
    have_existing_dst_container: bool indicator of whether exp_dst_url
      names a container (directory, bucket, or existing bucket subdir).
    command_name: Name of command making call. May not be the same as the
        calling class's self.command_name in the case of commands implemented
        atop other commands (like mv command).

  Raises:
    CommandException: if the URL being checked does not name a container.
  """
  if ((exp_dst_url.IsFileUrl() and not exp_dst_url.IsDirectory()) or
      (exp_dst_url.IsCloudUrl() and exp_dst_url.IsBucket()
       and not have_existing_dst_container)):
    raise CommandException('Destination URL must name a directory, bucket, '
                           'or bucket\nsubdirectory for the multiple '
                           'source form of the %s command.' % command_name)


def _ShouldTreatDstUrlAsBucketSubDir(have_multiple_srcs, dst_url,
                                     have_existing_dest_subdir,
                                     src_url_names_container,
                                     recursion_requested):
  """Checks whether dst_url should be treated as a bucket "sub-directory".

  The decision about whether something constitutes a bucket "sub-directory"
  depends on whether there are multiple sources in this request and whether
  there is an existing bucket subdirectory. For example, when running the
  command:
    gsutil cp file gs://bucket/abc
  if there's no existing gs://bucket/abc bucket subdirectory we should copy
  file to the object gs://bucket/abc. In contrast, if
  there's an existing gs://bucket/abc bucket subdirectory we should copy
  file to gs://bucket/abc/file. And regardless of whether gs://bucket/abc
  exists, when running the command:
    gsutil cp file1 file2 gs://bucket/abc
  we should copy file1 to gs://bucket/abc/file1 (and similarly for file2).
  Finally, for recursive copies, if the source is a container then we should
  copy to a container as the target.  For example, when running the command:
    gsutil cp -r dir1 gs://bucket/dir2
  we should copy the subtree of dir1 to gs://bucket/dir2.

  Note that we don't disallow naming a bucket "sub-directory" where there's
  already an object at that URL. For example it's legitimate (albeit
  confusing) to have an object called gs://bucket/dir and
  then run the command
  gsutil cp file1 file2 gs://bucket/dir
  Doing so will end up with objects gs://bucket/dir, gs://bucket/dir/file1,
  and gs://bucket/dir/file2.

  Args:
    have_multiple_srcs: Bool indicator of whether this is a multi-source
        operation.
    dst_url: StorageUrl to check.
    have_existing_dest_subdir: bool indicator whether dest is an existing
      subdirectory.
    src_url_names_container: bool indicator of whether the source URL
      is a container.
    recursion_requested: True if a recursive operation has been requested.

  Returns:
    bool indicator.
  """
  if have_existing_dest_subdir:
    return True
  if dst_url.IsCloudUrl():
    return (have_multiple_srcs or
            (src_url_names_container and recursion_requested))


def _ShouldTreatDstUrlAsSingleton(have_multiple_srcs,
                                  have_existing_dest_subdir, dst_url,
                                  recursion_requested):
  """Checks that dst_url names a single file/object after wildcard expansion.

  It is possible that an object path might name a bucket sub-directory.

  Args:
    have_multiple_srcs: Bool indicator of whether this is a multi-source
        operation.
    have_existing_dest_subdir: bool indicator whether dest is an existing
      subdirectory.
    dst_url: StorageUrl to check.
    recursion_requested: True if a recursive operation has been requested.

  Returns:
    bool indicator.
  """
  if recursion_requested:
    return False
  if dst_url.IsFileUrl():
    return not dst_url.IsDirectory()
  else:  # dst_url.IsCloudUrl()
    return (not have_multiple_srcs and
            not have_existing_dest_subdir and
            dst_url.IsObject())


def ConstructDstUrl(src_url, exp_src_url, src_url_names_container,
                    have_multiple_srcs, exp_dst_url, have_existing_dest_subdir,
                    recursion_requested):
  """Constructs the destination URL for a given exp_src_url/exp_dst_url pair.

  Uses context-dependent naming rules that mimic Linux cp and mv behavior.

  Args:
    src_url: Source StorageUrl to be copied.
    exp_src_url: Single StorageUrl from wildcard expansion of src_url.
    src_url_names_container: True if src_url names a container (including the
        case of a wildcard-named bucket subdir (like gs://bucket/abc,
        where gs://bucket/abc/* matched some objects).
    have_multiple_srcs: True if this is a multi-source request. This can be
        true if src_url wildcard-expanded to multiple URLs or if there were
        multiple source URLs in the request.
    exp_dst_url: the expanded StorageUrl requested for the cp destination.
        Final written path is constructed from this plus a context-dependent
        variant of src_url.
    have_existing_dest_subdir: bool indicator whether dest is an existing
      subdirectory.
    recursion_requested: True if a recursive operation has been requested.

  Returns:
    StorageUrl to use for copy.

  Raises:
    CommandException if destination object name not specified for
    source and source is a stream.
  """
  if _ShouldTreatDstUrlAsSingleton(
      have_multiple_srcs, have_existing_dest_subdir, exp_dst_url,
      recursion_requested):
    # We're copying one file or object to one file or object.
    return exp_dst_url

  if exp_src_url.IsFileUrl() and exp_src_url.IsStream():
    if have_existing_dest_subdir:
      raise CommandException('Destination object name needed when '
                             'source is a stream')
    return exp_dst_url

  if not recursion_requested and not have_multiple_srcs:
    # We're copying one file or object to a subdirectory. Append final comp
    # of exp_src_url to exp_dst_url.
    src_final_comp = exp_src_url.object_name.rpartition(src_url.delim)[-1]
    return StorageUrlFromString('%s%s%s' % (
        exp_dst_url.url_string.rstrip(exp_dst_url.delim),
        exp_dst_url.delim, src_final_comp))

  # Else we're copying multiple sources to a directory, bucket, or a bucket
  # "sub-directory".

  # Ensure exp_dst_url ends in delim char if we're doing a multi-src copy or
  # a copy to a directory. (The check for copying to a directory needs
  # special-case handling so that the command:
  #   gsutil cp gs://bucket/obj dir
  # will turn into file://dir/ instead of file://dir -- the latter would cause
  # the file "dirobj" to be created.)
  # Note: need to check have_multiple_srcs or src_url.names_container()
  # because src_url could be a bucket containing a single object, named
  # as gs://bucket.
  if ((have_multiple_srcs or src_url_names_container or
       (exp_dst_url.IsFileUrl() and exp_dst_url.IsDirectory()))
      and not exp_dst_url.url_string.endswith(exp_dst_url.delim)):
    exp_dst_url = StorageUrlFromString('%s%s' % (exp_dst_url.url_string,
                                                 exp_dst_url.delim))

  # Making naming behavior match how things work with local Linux cp and mv
  # operations depends on many factors, including whether the destination is a
  # container, the plurality of the source(s), and whether the mv command is
  # being used:
  # 1. For the "mv" command that specifies a non-existent destination subdir,
  #    renaming should occur at the level of the src subdir, vs appending that
  #    subdir beneath the dst subdir like is done for copying. For example:
  #      gsutil rm -r gs://bucket
  #      gsutil cp -r dir1 gs://bucket
  #      gsutil cp -r dir2 gs://bucket/subdir1
  #      gsutil mv gs://bucket/subdir1 gs://bucket/subdir2
  #    would (if using cp naming behavior) end up with paths like:
  #      gs://bucket/subdir2/subdir1/dir2/.svn/all-wcprops
  #    whereas mv naming behavior should result in:
  #      gs://bucket/subdir2/dir2/.svn/all-wcprops
  # 2. Copying from directories, buckets, or bucket subdirs should result in
  #    objects/files mirroring the source directory hierarchy. For example:
  #      gsutil cp dir1/dir2 gs://bucket
  #    should create the object gs://bucket/dir2/file2, assuming dir1/dir2
  #    contains file2).
  #    To be consistent with Linux cp behavior, there's one more wrinkle when
  #    working with subdirs: The resulting object names depend on whether the
  #    destination subdirectory exists. For example, if gs://bucket/subdir
  #    exists, the command:
  #      gsutil cp -r dir1/dir2 gs://bucket/subdir
  #    should create objects named like gs://bucket/subdir/dir2/a/b/c. In
  #    contrast, if gs://bucket/subdir does not exist, this same command
  #    should create objects named like gs://bucket/subdir/a/b/c.
  # 3. Copying individual files or objects to dirs, buckets or bucket subdirs
  #    should result in objects/files named by the final source file name
  #    component. Example:
  #      gsutil cp dir1/*.txt gs://bucket
  #    should create the objects gs://bucket/f1.txt and gs://bucket/f2.txt,
  #    assuming dir1 contains f1.txt and f2.txt.

  recursive_move_to_new_subdir = False
  if (global_copy_helper_opts.perform_mv and recursion_requested
      and src_url_names_container and not have_existing_dest_subdir):
    # Case 1. Handle naming rules for bucket subdir mv. Here we want to
    # line up the src_url against its expansion, to find the base to build
    # the new name. For example, running the command:
    #   gsutil mv gs://bucket/abcd gs://bucket/xyz
    # when processing exp_src_url=gs://bucket/abcd/123
    # exp_src_url_tail should become /123
    # Note: mv.py code disallows wildcard specification of source URL.
    recursive_move_to_new_subdir = True
    exp_src_url_tail = (
        exp_src_url.url_string[len(src_url.url_string):])
    dst_key_name = '%s/%s' % (exp_dst_url.object_name.rstrip('/'),
                              exp_src_url_tail.strip('/'))

  elif src_url_names_container and (exp_dst_url.IsCloudUrl() or
                                    exp_dst_url.IsDirectory()):
    # Case 2.  Container copy to a destination other than a file.
    # Build dst_key_name from subpath of exp_src_url past
    # where src_url ends. For example, for src_url=gs://bucket/ and
    # exp_src_url=gs://bucket/src_subdir/obj, dst_key_name should be
    # src_subdir/obj.
    src_url_path_sans_final_dir = GetPathBeforeFinalDir(src_url)
    dst_key_name = exp_src_url.versionless_url_string[
        len(src_url_path_sans_final_dir):].lstrip(src_url.delim)
    # Handle case where dst_url is a non-existent subdir.
    if not have_existing_dest_subdir:
      dst_key_name = dst_key_name.partition(src_url.delim)[-1]
    # Handle special case where src_url was a directory named with '.' or
    # './', so that running a command like:
    #   gsutil cp -r . gs://dest
    # will produce obj names of the form gs://dest/abc instead of
    # gs://dest/./abc.
    if dst_key_name.startswith('.%s' % os.sep):
      dst_key_name = dst_key_name[2:]

  else:
    # Case 3.
    dst_key_name = exp_src_url.object_name.rpartition(src_url.delim)[-1]

  if (not recursive_move_to_new_subdir and (
      exp_dst_url.IsFileUrl() or _ShouldTreatDstUrlAsBucketSubDir(
          have_multiple_srcs, exp_dst_url, have_existing_dest_subdir,
          src_url_names_container, recursion_requested))):
    if exp_dst_url.object_name and exp_dst_url.object_name.endswith(
        exp_dst_url.delim):
      dst_key_name = '%s%s%s' % (
          exp_dst_url.object_name.rstrip(exp_dst_url.delim),
          exp_dst_url.delim, dst_key_name)
    else:
      delim = exp_dst_url.delim if exp_dst_url.object_name else ''
      dst_key_name = '%s%s%s' % (exp_dst_url.object_name or '',
                                 delim, dst_key_name)

  new_exp_dst_url = exp_dst_url.Clone()
  new_exp_dst_url.object_name = dst_key_name.replace(src_url.delim,
                                                     exp_dst_url.delim)
  return new_exp_dst_url


def _CreateDigestsFromDigesters(digesters):
  digests = {}
  if digesters:
    for alg in digesters:
      digests[alg] = base64.encodestring(
          digesters[alg].digest()).rstrip('\n')
  return digests


def _CreateDigestsFromLocalFile(logger, algs, file_name, final_file_name,
                                src_obj_metadata):
  """Creates a base64 CRC32C and/or MD5 digest from file_name.

  Args:
    logger: For outputting log messages.
    algs: List of algorithms to compute.
    file_name: File to digest.
    final_file_name: Permanent location to be used for the downloaded file
                     after validation (used for logging).
    src_obj_metadata: Metadata of source object.

  Returns:
    Dict of algorithm name : base 64 encoded digest
  """
  hash_dict = {}
  if 'md5' in algs:
    hash_dict['md5'] = md5()
  if 'crc32c' in algs:
    hash_dict['crc32c'] = crcmod.predefined.Crc('crc-32c')
  with open(file_name, 'rb') as fp:
    CalculateHashesFromContents(
        fp, hash_dict, ProgressCallbackWithBackoff(
            src_obj_metadata.size,
            FileProgressCallbackHandler(
                ConstructAnnounceText('Hashing', final_file_name),
                logger).call))
  digests = {}
  for alg_name, digest in hash_dict.iteritems():
    digests[alg_name] = Base64EncodeHash(digest.hexdigest())
  return digests


def _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata,
                      dst_obj_metadata):
  """Validates integrity of two cloud objects copied via daisy-chain.

  Args:
    logger: for outputting log messages.
    src_url: CloudUrl for source cloud object.
    dst_url: CloudUrl for destination cloud object.
    src_obj_metadata: Cloud Object metadata for object being downloaded from.
    dst_obj_metadata: Cloud Object metadata for object being uploaded to.

  Raises:
    CommandException: if cloud digests don't match local digests.
  """
  checked_one = False
  download_hashes = {}
  upload_hashes = {}
  if src_obj_metadata.md5Hash:
    download_hashes['md5'] = src_obj_metadata.md5Hash
  if src_obj_metadata.crc32c:
    download_hashes['crc32c'] = src_obj_metadata.crc32c
  if dst_obj_metadata.md5Hash:
    upload_hashes['md5'] = dst_obj_metadata.md5Hash
  if dst_obj_metadata.crc32c:
    upload_hashes['crc32c'] = dst_obj_metadata.crc32c

  for alg, upload_b64_digest in upload_hashes.iteritems():
    if alg not in download_hashes:
      continue

    download_b64_digest = download_hashes[alg]
    logger.debug(
        'Comparing source vs destination %s-checksum for %s. (%s/%s)', alg,
        dst_url, download_b64_digest, upload_b64_digest)
    if download_b64_digest != upload_b64_digest:
      raise HashMismatchException(
          '%s signature for source object (%s) doesn\'t match '
          'destination object digest (%s). Object (%s) will be deleted.' % (
              alg, download_b64_digest, upload_b64_digest, dst_url))
    checked_one = True
  if not checked_one:
    # One known way this can currently happen is when downloading objects larger
    # than 5 GiB from S3 (for which the etag is not an MD5).
    logger.warn(
        'WARNING: Found no hashes to validate object downloaded from %s and '
        'uploaded to %s. Integrity cannot be assured without hashes.',
        src_url, dst_url)


def _CheckHashes(logger, obj_url, obj_metadata, file_name, digests,
                 is_upload=False):
  """Validates integrity by comparing cloud digest to local digest.

  Args:
    logger: for outputting log messages.
    obj_url: CloudUrl for cloud object.
    obj_metadata: Cloud Object being downloaded from or uploaded to.
    file_name: Local file name on disk being downloaded to or uploaded from
               (used only for logging).
    digests: Computed Digests for the object.
    is_upload: If true, comparing for an uploaded object (controls logging).

  Raises:
    CommandException: if cloud digests don't match local digests.
  """
  local_hashes = digests
  cloud_hashes = {}
  if obj_metadata.md5Hash:
    cloud_hashes['md5'] = obj_metadata.md5Hash.rstrip('\n')
  if obj_metadata.crc32c:
    cloud_hashes['crc32c'] = obj_metadata.crc32c.rstrip('\n')

  checked_one = False
  for alg in local_hashes:
    if alg not in cloud_hashes:
      continue

    local_b64_digest = local_hashes[alg]
    cloud_b64_digest = cloud_hashes[alg]
    logger.debug(
        'Comparing local vs cloud %s-checksum for %s. (%s/%s)', alg, file_name,
        local_b64_digest, cloud_b64_digest)
    if local_b64_digest != cloud_b64_digest:

      raise HashMismatchException(
          '%s signature computed for local file (%s) doesn\'t match '
          'cloud-supplied digest (%s). %s (%s) will be deleted.' % (
              alg, local_b64_digest, cloud_b64_digest,
              'Cloud object' if is_upload else 'Local file',
              obj_url if is_upload else file_name))
    checked_one = True
  if not checked_one:
    if is_upload:
      logger.warn(
          'WARNING: Found no hashes to validate object uploaded to %s. '
          'Integrity cannot be assured without hashes.', obj_url)
    else:
    # One known way this can currently happen is when downloading objects larger
    # than 5 GB from S3 (for which the etag is not an MD5).
      logger.warn(
          'WARNING: Found no hashes to validate object downloaded to %s. '
          'Integrity cannot be assured without hashes.', file_name)


def IsNoClobberServerException(e):
  """Checks to see if the server attempted to clobber a file.

  In this case we specified via a precondition that we didn't want the file
  clobbered.

  Args:
    e: The Exception that was generated by a failed copy operation

  Returns:
    bool indicator - True indicates that the server did attempt to clobber
        an existing file.
  """
  return ((isinstance(e, PreconditionException)) or
          (isinstance(e, ResumableUploadException) and '412' in e.message))


def CheckForDirFileConflict(exp_src_url, dst_url):
  """Checks whether copying exp_src_url into dst_url is not possible.

     This happens if a directory exists in local file system where a file
     needs to go or vice versa. In that case we print an error message and
     exits. Example: if the file "./x" exists and you try to do:
       gsutil cp gs://mybucket/x/y .
     the request can't succeed because it requires a directory where
     the file x exists.

     Note that we don't enforce any corresponding restrictions for buckets,
     because the flat namespace semantics for buckets doesn't prohibit such
     cases the way hierarchical file systems do. For example, if a bucket
     contains an object called gs://bucket/dir and then you run the command:
       gsutil cp file1 file2 gs://bucket/dir
     you'll end up with objects gs://bucket/dir, gs://bucket/dir/file1, and
     gs://bucket/dir/file2.

  Args:
    exp_src_url: Expanded source StorageUrl.
    dst_url: Destination StorageUrl.

  Raises:
    CommandException: if errors encountered.
  """
  if dst_url.IsCloudUrl():
    # The problem can only happen for file destination URLs.
    return
  dst_path = dst_url.object_name
  final_dir = os.path.dirname(dst_path)
  if os.path.isfile(final_dir):
    raise CommandException('Cannot retrieve %s because a file exists '
                           'where a directory needs to be created (%s).' %
                           (exp_src_url.url_string, final_dir))
  if os.path.isdir(dst_path):
    raise CommandException('Cannot retrieve %s because a directory exists '
                           '(%s) where the file needs to be created.' %
                           (exp_src_url.url_string, dst_path))


def _PartitionFile(fp, file_size, src_url, content_type, canned_acl,
                   dst_bucket_url, random_prefix, tracker_file,
                   tracker_file_lock):
  """Partitions a file into FilePart objects to be uploaded and later composed.

  These objects, when composed, will match the original file. This entails
  splitting the file into parts, naming and forming a destination URL for each
  part, and also providing the PerformParallelUploadFileToObjectArgs
  corresponding to each part.

  Args:
    fp: The file object to be partitioned.
    file_size: The size of fp, in bytes.
    src_url: Source FileUrl from the original command.
    content_type: content type for the component and final objects.
    canned_acl: The user-provided canned_acl, if applicable.
    dst_bucket_url: CloudUrl for the destination bucket
    random_prefix: The randomly-generated prefix used to prevent collisions
                   among the temporary component names.
    tracker_file: The path to the parallel composite upload tracker file.
    tracker_file_lock: The lock protecting access to the tracker file.

  Returns:
    dst_args: The destination URIs for the temporary component objects.
  """
  parallel_composite_upload_component_size = HumanReadableToBytes(
      config.get('GSUtil', 'parallel_composite_upload_component_size',
                 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE))
  (num_components, component_size) = _GetPartitionInfo(
      file_size, MAX_COMPOSE_ARITY, parallel_composite_upload_component_size)

  dst_args = {}  # Arguments to create commands and pass to subprocesses.
  file_names = []  # Used for the 2-step process of forming dst_args.
  for i in range(num_components):
    # "Salt" the object name with something a user is very unlikely to have
    # used in an object name, then hash the extended name to make sure
    # we don't run into problems with name length. Using a deterministic
    # naming scheme for the temporary components allows users to take
    # advantage of resumable uploads for each component.
    encoded_name = (PARALLEL_UPLOAD_STATIC_SALT + fp.name).encode(UTF8)
    content_md5 = md5()
    content_md5.update(encoded_name)
    digest = content_md5.hexdigest()
    temp_file_name = (random_prefix + PARALLEL_UPLOAD_TEMP_NAMESPACE +
                      digest + '_' + str(i))
    tmp_dst_url = dst_bucket_url.Clone()
    tmp_dst_url.object_name = temp_file_name

    if i < (num_components - 1):
      # Every component except possibly the last is the same size.
      file_part_length = component_size
    else:
      # The last component just gets all of the remaining bytes.
      file_part_length = (file_size - ((num_components -1) * component_size))
    offset = i * component_size
    func_args = PerformParallelUploadFileToObjectArgs(
        fp.name, offset, file_part_length, src_url, tmp_dst_url, canned_acl,
        content_type, tracker_file, tracker_file_lock)
    file_names.append(temp_file_name)
    dst_args[temp_file_name] = func_args

  return dst_args


def _DoParallelCompositeUpload(fp, src_url, dst_url, dst_obj_metadata,
                               canned_acl, file_size, preconditions, gsutil_api,
                               command_obj, copy_exception_handler):
  """Uploads a local file to a cloud object using parallel composite upload.

  The file is partitioned into parts, and then the parts are uploaded in
  parallel, composed to form the original destination object, and deleted.

  Args:
    fp: The file object to be uploaded.
    src_url: FileUrl representing the local file.
    dst_url: CloudUrl representing the destination file.
    dst_obj_metadata: apitools Object describing the destination object.
    canned_acl: The canned acl to apply to the object, if any.
    file_size: The size of the source file in bytes.
    preconditions: Cloud API Preconditions for the final object.
    gsutil_api: gsutil Cloud API instance to use.
    command_obj: Command object (for calling Apply).
    copy_exception_handler: Copy exception handler (for use in Apply).

  Returns:
    Elapsed upload time, uploaded Object with generation, crc32c, and size
    fields populated.
  """
  start_time = time.time()
  dst_bucket_url = StorageUrlFromString(dst_url.bucket_url_string)
  api_selector = gsutil_api.GetApiSelector(provider=dst_url.scheme)
  # Determine which components, if any, have already been successfully
  # uploaded.
  tracker_file = GetTrackerFilePath(dst_url, TrackerFileType.PARALLEL_UPLOAD,
                                    api_selector, src_url)
  tracker_file_lock = CreateLock()
  (random_prefix, existing_components) = (
      _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock))

  # Create the initial tracker file for the upload.
  _CreateParallelUploadTrackerFile(tracker_file, random_prefix,
                                   existing_components, tracker_file_lock)

  # Get the set of all components that should be uploaded.
  dst_args = _PartitionFile(
      fp, file_size, src_url, dst_obj_metadata.contentType, canned_acl,
      dst_bucket_url, random_prefix, tracker_file, tracker_file_lock)

  (components_to_upload, existing_components, existing_objects_to_delete) = (
      FilterExistingComponents(dst_args, existing_components, dst_bucket_url,
                               gsutil_api))

  # In parallel, copy all of the file parts that haven't already been
  # uploaded to temporary objects.
  cp_results = command_obj.Apply(
      _PerformParallelUploadFileToObject, components_to_upload,
      copy_exception_handler, ('op_failure_count', 'total_bytes_transferred'),
      arg_checker=gslib.command.DummyArgChecker,
      parallel_operations_override=True, should_return_results=True)
  uploaded_components = []
  for cp_result in cp_results:
    uploaded_components.append(cp_result[2])
  components = uploaded_components + existing_components

  if len(components) == len(dst_args):
    # Only try to compose if all of the components were uploaded successfully.

    def _GetComponentNumber(component):
      return int(component.object_name[component.object_name.rfind('_')+1:])
    # Sort the components so that they will be composed in the correct order.
    components = sorted(components, key=_GetComponentNumber)

    request_components = []
    for component_url in components:
      src_obj_metadata = (
          apitools_messages.ComposeRequest.SourceObjectsValueListEntry(
              name=component_url.object_name))
      if component_url.HasGeneration():
        src_obj_metadata.generation = long(component_url.generation)
      request_components.append(src_obj_metadata)

    composed_object = gsutil_api.ComposeObject(
        request_components, dst_obj_metadata, preconditions=preconditions,
        provider=dst_url.scheme, fields=['generation', 'crc32c', 'size'])

    try:
      # Make sure only to delete things that we know were successfully
      # uploaded (as opposed to all of the objects that we attempted to
      # create) so that we don't delete any preexisting objects, except for
      # those that were uploaded by a previous, failed run and have since
      # changed (but still have an old generation lying around).
      objects_to_delete = components + existing_objects_to_delete
      command_obj.Apply(
          _DeleteTempComponentObjectFn, objects_to_delete, _RmExceptionHandler,
          arg_checker=gslib.command.DummyArgChecker,
          parallel_operations_override=True)
    except Exception:  # pylint: disable=broad-except
      # If some of the delete calls fail, don't cause the whole command to
      # fail. The copy was successful iff the compose call succeeded, so
      # reduce this to a warning.
      logging.warning(
          'Failed to delete some of the following temporary objects:\n' +
          '\n'.join(dst_args.keys()))
    finally:
      with tracker_file_lock:
        if os.path.exists(tracker_file):
          os.unlink(tracker_file)
  else:
    # Some of the components failed to upload. In this case, we want to exit
    # without deleting the objects.
    raise CommandException(
        'Some temporary components were not uploaded successfully. '
        'Please retry this upload.')

  elapsed_time = time.time() - start_time
  return elapsed_time, composed_object


def _ShouldDoParallelCompositeUpload(logger, allow_splitting, src_url, dst_url,
                                     file_size, canned_acl=None):
  """Determines whether parallel composite upload strategy should be used.

  Args:
    logger: for outputting log messages.
    allow_splitting: If false, then this function returns false.
    src_url: FileUrl corresponding to a local file.
    dst_url: CloudUrl corresponding to destination cloud object.
    file_size: The size of the source file, in bytes.
    canned_acl: Canned ACL to apply to destination object, if any.

  Returns:
    True iff a parallel upload should be performed on the source file.
  """
  global suggested_slice_transfers, suggested_sliced_transfers_lock
  parallel_composite_upload_threshold = HumanReadableToBytes(config.get(
      'GSUtil', 'parallel_composite_upload_threshold',
      DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD))

  all_factors_but_size = (
      allow_splitting  # Don't split the pieces multiple times.
      and not src_url.IsStream()  # We can't partition streams.
      and dst_url.scheme == 'gs'  # Compose is only for gs.
      and not canned_acl)  # TODO: Implement canned ACL support for compose.

  # Since parallel composite uploads are disabled by default, make user aware of
  # them.
  # TODO: Once compiled crcmod is being distributed by major Linux distributions
  # remove this check.
  if (all_factors_but_size and parallel_composite_upload_threshold == 0
      and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD):
    with suggested_sliced_transfers_lock:
      if not suggested_sliced_transfers.get('suggested'):
        logger.info('\n'.join(textwrap.wrap(
            '==> NOTE: You are uploading one or more large file(s), which '
            'would run significantly faster if you enable parallel composite '
            'uploads. This feature can be enabled by editing the '
            '"parallel_composite_upload_threshold" value in your .boto '
            'configuration file. However, note that if you do this you and any '
            'users that download such composite files will need to have a '
            'compiled crcmod installed (see "gsutil help crcmod").')) + '\n')
        suggested_sliced_transfers['suggested'] = True

  return (all_factors_but_size
          and parallel_composite_upload_threshold > 0
          and file_size >= parallel_composite_upload_threshold)


def ExpandUrlToSingleBlr(url_str, gsutil_api, debug, project_id,
                         treat_nonexistent_object_as_subdir=False):
  """Expands wildcard if present in url_str.

  Args:
    url_str: String representation of requested url.
    gsutil_api: gsutil Cloud API instance to use.
    debug: debug level to use (for iterators).
    project_id: project ID to use (for iterators).
    treat_nonexistent_object_as_subdir: indicates if should treat a non-existent
                                        object as a subdir.

  Returns:
      (exp_url, have_existing_dst_container)
      where exp_url is a StorageUrl
      and have_existing_dst_container is a bool indicating whether
      exp_url names an existing directory, bucket, or bucket subdirectory.
      In the case where we match a subdirectory AND an object, the
      object is returned.

  Raises:
    CommandException: if url_str matched more than 1 URL.
  """
  # Handle wildcarded url case.
  if ContainsWildcard(url_str):
    blr_expansion = list(CreateWildcardIterator(url_str, gsutil_api,
                                                debug=debug,
                                                project_id=project_id))
    if len(blr_expansion) != 1:
      raise CommandException('Destination (%s) must match exactly 1 URL' %
                             url_str)
    blr = blr_expansion[0]
    # BLR is either an OBJECT, PREFIX, or BUCKET; the latter two represent
    # directories.
    return (StorageUrlFromString(blr.url_string), not blr.IsObject())

  storage_url = StorageUrlFromString(url_str)

  # Handle non-wildcarded URL.
  if storage_url.IsFileUrl():
    return (storage_url, storage_url.IsDirectory())

  # At this point we have a cloud URL.
  if storage_url.IsBucket():
    return (storage_url, True)

  # For object/prefix URLs, there are four cases that indicate the destination
  # is a cloud subdirectory; these are always considered to be an existing
  # container. Checking each case allows gsutil to provide Unix-like
  # destination folder semantics, but requires up to three HTTP calls, noted
  # below.

  # Case 1: If a placeholder object ending with '/' exists.
  if IsCloudSubdirPlaceholder(storage_url):
    return (storage_url, True)

  # HTTP call to make an eventually consistent check for a matching prefix,
  # _$folder$, or empty listing.
  expansion_empty = True
  list_iterator = gsutil_api.ListObjects(
      storage_url.bucket_name, prefix=storage_url.object_name, delimiter='/',
      provider=storage_url.scheme, fields=['prefixes', 'items/name'])
  for obj_or_prefix in list_iterator:
    # To conserve HTTP calls for the common case, we make a single listing
    # that covers prefixes and object names. Listing object names covers the
    # _$folder$ case and the nonexistent-object-as-subdir case. However, if
    # there are many existing objects for which the target URL is an exact
    # prefix, this listing could be paginated and span multiple HTTP calls.
    # If this case becomes common, we could heurestically abort the
    # listing operation after the first page of results and just query for the
    # _$folder$ object directly using GetObjectMetadata.
    expansion_empty = False

    if obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.PREFIX:
      # Case 2: If there is a matching prefix when listing the destination URL.
      return (storage_url, True)
    elif (obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.OBJECT and
          obj_or_prefix.data.name == storage_url.object_name + '_$folder$'):
      # Case 3: If a placeholder object matching destination + _$folder$
      # exists.
      return (storage_url, True)

  # Case 4: If no objects/prefixes matched, and nonexistent objects should be
  # treated as subdirectories.
  return (storage_url, expansion_empty and treat_nonexistent_object_as_subdir)


def FixWindowsNaming(src_url, dst_url):
  """Translates Windows pathnames to cloud pathnames.

  Rewrites the destination URL built by ConstructDstUrl().

  Args:
    src_url: Source StorageUrl to be copied.
    dst_url: The destination StorageUrl built by ConstructDstUrl().

  Returns:
    StorageUrl to use for copy.
  """
  if (src_url.IsFileUrl() and src_url.delim == '\\'
      and dst_url.IsCloudUrl()):
    trans_url_str = re.sub(r'\\', '/', dst_url.url_string)
    dst_url = StorageUrlFromString(trans_url_str)
  return dst_url


def SrcDstSame(src_url, dst_url):
  """Checks if src_url and dst_url represent the same object or file.

  We don't handle anything about hard or symbolic links.

  Args:
    src_url: Source StorageUrl.
    dst_url: Destination StorageUrl.

  Returns:
    Bool indicator.
  """
  if src_url.IsFileUrl() and dst_url.IsFileUrl():
    # Translate a/b/./c to a/b/c, so src=dst comparison below works.
    new_src_path = os.path.normpath(src_url.object_name)
    new_dst_path = os.path.normpath(dst_url.object_name)
    return new_src_path == new_dst_path
  else:
    return (src_url.url_string == dst_url.url_string and
            src_url.generation == dst_url.generation)


def _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata):
  """Logs copy operation, including Content-Type if appropriate.

  Args:
    logger: logger instance to use for output.
    src_url: Source StorageUrl.
    dst_url: Destination StorageUrl.
    dst_obj_metadata: Object-specific metadata that should be overidden during
                      the copy.
  """
  if (dst_url.IsCloudUrl() and dst_obj_metadata and
      dst_obj_metadata.contentType):
    content_type_msg = ' [Content-Type=%s]' % dst_obj_metadata.contentType
  else:
    content_type_msg = ''
  if src_url.IsFileUrl() and src_url.IsStream():
    logger.info('Copying from <STDIN>%s...', content_type_msg)
  else:
    logger.info('Copying %s%s...', src_url.url_string, content_type_msg)


# pylint: disable=undefined-variable
def _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url,
                            dst_obj_metadata, preconditions, gsutil_api,
                            logger):
  """Performs copy-in-the cloud from specified src to dest object.

  Args:
    src_url: Source CloudUrl.
    src_obj_metadata: Metadata for source object; must include etag and size.
    dst_url: Destination CloudUrl.
    dst_obj_metadata: Object-specific metadata that should be overidden during
                      the copy.
    preconditions: Preconditions to use for the copy.
    gsutil_api: gsutil Cloud API instance to use for the copy.
    logger: logging.Logger for log message output.

  Returns:
    (elapsed_time, bytes_transferred, dst_url with generation,
    md5 hash of destination) excluding overhead like initial GET.

  Raises:
    CommandException: if errors encountered.
  """
  start_time = time.time()

  progress_callback = FileProgressCallbackHandler(
      ConstructAnnounceText('Copying', dst_url.url_string), logger).call
  if global_copy_helper_opts.test_callback_file:
    with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
      progress_callback = pickle.loads(test_fp.read()).call
  dst_obj = gsutil_api.CopyObject(
      src_obj_metadata, dst_obj_metadata, src_generation=src_url.generation,
      canned_acl=global_copy_helper_opts.canned_acl,
      preconditions=preconditions, progress_callback=progress_callback,
      provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS)

  end_time = time.time()

  result_url = dst_url.Clone()
  result_url.generation = GenerationFromUrlAndString(result_url,
                                                     dst_obj.generation)

  return (end_time - start_time, src_obj_metadata.size, result_url,
          dst_obj.md5Hash)


def _SetContentTypeFromFile(src_url, dst_obj_metadata):
  """Detects and sets Content-Type if src_url names a local file.

  Args:
    src_url: Source StorageUrl.
    dst_obj_metadata: Object-specific metadata that should be overidden during
                     the copy.
  """
  # contentType == '' if user requested default type.
  if (dst_obj_metadata.contentType is None and src_url.IsFileUrl()
      and not src_url.IsStream()):
    # Only do content type recognition if src_url is a file. Object-to-object
    # copies with no -h Content-Type specified re-use the content type of the
    # source object.
    object_name = src_url.object_name
    content_type = None
    # Streams (denoted by '-') are expected to be 'application/octet-stream'
    # and 'file' would partially consume them.
    if object_name != '-':
      if config.getbool('GSUtil', 'use_magicfile', False):
        p = subprocess.Popen(['file', '--mime-type', object_name],
                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        output, error = p.communicate()
        p.stdout.close()
        p.stderr.close()
        if p.returncode != 0 or error:
          raise CommandException(
              'Encountered error running "file --mime-type %s" '
              '(returncode=%d).\n%s' % (object_name, p.returncode, error))
        # Parse output by removing line delimiter and splitting on last ":
        content_type = output.rstrip().rpartition(': ')[2]
      else:
        content_type = mimetypes.guess_type(object_name)[0]
    if not content_type:
      content_type = DEFAULT_CONTENT_TYPE
    dst_obj_metadata.contentType = content_type


# pylint: disable=undefined-variable
def _UploadFileToObjectNonResumable(src_url, src_obj_filestream,
                                    src_obj_size, dst_url, dst_obj_metadata,
                                    preconditions, gsutil_api, logger):
  """Uploads the file using a non-resumable strategy.

  Args:
    src_url: Source StorageUrl to upload.
    src_obj_filestream: File pointer to uploadable bytes.
    src_obj_size: Size of the source object.
    dst_url: Destination StorageUrl for the upload.
    dst_obj_metadata: Metadata for the target object.
    preconditions: Preconditions for the upload, if any.
    gsutil_api: gsutil Cloud API instance to use for the upload.
    logger: For outputting log messages.

  Returns:
    Elapsed upload time, uploaded Object with generation, md5, and size fields
    populated.
  """
  progress_callback = FileProgressCallbackHandler(
      ConstructAnnounceText('Uploading', dst_url.url_string), logger).call
  if global_copy_helper_opts.test_callback_file:
    with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
      progress_callback = pickle.loads(test_fp.read()).call
  start_time = time.time()

  if src_url.IsStream():
    # TODO: gsutil-beta: Provide progress callbacks for streaming uploads.
    uploaded_object = gsutil_api.UploadObjectStreaming(
        src_obj_filestream, object_metadata=dst_obj_metadata,
        canned_acl=global_copy_helper_opts.canned_acl,
        preconditions=preconditions, progress_callback=progress_callback,
        provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS)
  else:
    uploaded_object = gsutil_api.UploadObject(
        src_obj_filestream, object_metadata=dst_obj_metadata,
        canned_acl=global_copy_helper_opts.canned_acl, size=src_obj_size,
        preconditions=preconditions, progress_callback=progress_callback,
        provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS)
  end_time = time.time()
  elapsed_time = end_time - start_time

  return elapsed_time, uploaded_object


# pylint: disable=undefined-variable
def _UploadFileToObjectResumable(src_url, src_obj_filestream,
                                 src_obj_size, dst_url, dst_obj_metadata,
                                 preconditions, gsutil_api, logger):
  """Uploads the file using a resumable strategy.

  Args:
    src_url: Source FileUrl to upload.  Must not be a stream.
    src_obj_filestream: File pointer to uploadable bytes.
    src_obj_size: Size of the source object.
    dst_url: Destination StorageUrl for the upload.
    dst_obj_metadata: Metadata for the target object.
    preconditions: Preconditions for the upload, if any.
    gsutil_api: gsutil Cloud API instance to use for the upload.
    logger: for outputting log messages.

  Returns:
    Elapsed upload time, uploaded Object with generation, md5, and size fields
    populated.
  """
  tracker_file_name = GetTrackerFilePath(
      dst_url, TrackerFileType.UPLOAD,
      gsutil_api.GetApiSelector(provider=dst_url.scheme))

  def _UploadTrackerCallback(serialization_data):
    """Creates a new tracker file for starting an upload from scratch.

    This function is called by the gsutil Cloud API implementation and the
    the serialization data is implementation-specific.

    Args:
      serialization_data: Serialization data used in resuming the upload.
    """
    tracker_file = None
    try:
      tracker_file = open(tracker_file_name, 'w')
      tracker_file.write(str(serialization_data))
    except IOError as e:
      RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror)
    finally:
      if tracker_file:
        tracker_file.close()

  # This contains the upload URL, which will uniquely identify the
  # destination object.
  tracker_data = _GetUploadTrackerData(tracker_file_name, logger)
  if tracker_data:
    logger.info(
        'Resuming upload for %s', src_url.url_string)

  retryable = True

  progress_callback = FileProgressCallbackHandler(
      ConstructAnnounceText('Uploading', dst_url.url_string), logger).call
  if global_copy_helper_opts.test_callback_file:
    with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
      progress_callback = pickle.loads(test_fp.read()).call

  start_time = time.time()
  num_startover_attempts = 0
  # This loop causes us to retry when the resumable upload failed in a way that
  # requires starting over with a new upload ID. Retries within a single upload
  # ID within the current process are handled in
  # gsutil_api.UploadObjectResumable, and retries within a single upload ID
  # spanning processes happens if an exception occurs not caught below (which
  # will leave the tracker file in place, and cause the upload ID to be reused
  # the next time the user runs gsutil and attempts the same upload).
  while retryable:
    try:
      uploaded_object = gsutil_api.UploadObjectResumable(
          src_obj_filestream, object_metadata=dst_obj_metadata,
          canned_acl=global_copy_helper_opts.canned_acl,
          preconditions=preconditions, provider=dst_url.scheme,
          size=src_obj_size, serialization_data=tracker_data,
          fields=UPLOAD_RETURN_FIELDS,
          tracker_callback=_UploadTrackerCallback,
          progress_callback=progress_callback)
      retryable = False
    except ResumableUploadStartOverException, e:
      # This can happen, for example, if the server sends a 410 response code.
      # In that case the current resumable upload ID can't be reused, so delete
      # the tracker file and try again up to max retries.
      num_startover_attempts += 1
      retryable = (num_startover_attempts < GetNumRetries())
      if not retryable:
        raise

      # If the server sends a 404 response code, then the upload should only
      # be restarted if it was the object (and not the bucket) that was missing.
      try:
        gsutil_api.GetBucket(dst_obj_metadata.bucket, provider=dst_url.scheme)
      except NotFoundException:
        raise

      logger.info('Restarting upload from scratch after exception %s', e)
      DeleteTrackerFile(tracker_file_name)
      tracker_data = None
      src_obj_filestream.seek(0)
      # Reset the progress callback handler.
      progress_callback = FileProgressCallbackHandler(
          ConstructAnnounceText('Uploading', dst_url.url_string), logger).call
      logger.info('\n'.join(textwrap.wrap(
          'Resumable upload of %s failed with a response code indicating we '
          'need to start over with a new resumable upload ID. Backing off '
          'and retrying.' % src_url.url_string)))
      time.sleep(min(random.random() * (2 ** num_startover_attempts),
                     GetMaxRetryDelay()))
    except ResumableUploadAbortException:
      retryable = False
      raise
    finally:
      if not retryable:
        DeleteTrackerFile(tracker_file_name)

  end_time = time.time()
  elapsed_time = end_time - start_time

  return (elapsed_time, uploaded_object)


def _CompressFileForUpload(src_url, src_obj_filestream, src_obj_size, logger):
  """Compresses a to-be-uploaded local file to save bandwidth.

  Args:
    src_url: Source FileUrl.
    src_obj_filestream: Read stream of the source file - will be consumed
                        and closed.
    src_obj_size: Size of the source file.
    logger: for outputting log messages.

  Returns:
    StorageUrl path to compressed file, compressed file size.
  """
  # TODO: Compress using a streaming model as opposed to all at once here.
  if src_obj_size >= MIN_SIZE_COMPUTE_LOGGING:
    logger.info(
        'Compressing %s (to tmp)...', src_url)
  (gzip_fh, gzip_path) = tempfile.mkstemp()
  gzip_fp = None
  try:
    # Check for temp space. Assume the compressed object is at most 2x
    # the size of the object (normally should compress to smaller than
    # the object)
    if CheckFreeSpace(gzip_path) < 2*int(src_obj_size):
      raise CommandException('Inadequate temp space available to compress '
                             '%s. See the CHANGING TEMP DIRECTORIES section '
                             'of "gsutil help cp" for more info.' % src_url)
    gzip_fp = gzip.open(gzip_path, 'wb')
    data = src_obj_filestream.read(GZIP_CHUNK_SIZE)
    while data:
      gzip_fp.write(data)
      data = src_obj_filestream.read(GZIP_CHUNK_SIZE)
  finally:
    if gzip_fp:
      gzip_fp.close()
    os.close(gzip_fh)
    src_obj_filestream.close()
  gzip_size = os.path.getsize(gzip_path)
  return StorageUrlFromString(gzip_path), gzip_size


def _UploadFileToObject(src_url, src_obj_filestream, src_obj_size,
                        dst_url, dst_obj_metadata, preconditions, gsutil_api,
                        logger, command_obj, copy_exception_handler,
                        gzip_exts=None, allow_splitting=True):
  """Uploads a local file to an object.

  Args:
    src_url: Source FileUrl.
    src_obj_filestream: Read stream of the source file to be read and closed.
    src_obj_size: Size of the source file.
    dst_url: Destination CloudUrl.
    dst_obj_metadata: Metadata to be applied to the destination object.
    preconditions: Preconditions to use for the copy.
    gsutil_api: gsutil Cloud API to use for the copy.
    logger: for outputting log messages.
    command_obj: command object for use in Apply in parallel composite uploads.
    copy_exception_handler: For handling copy exceptions during Apply.
    gzip_exts: List of file extensions to gzip prior to upload, if any.
    allow_splitting: Whether to allow the file to be split into component
                     pieces for an parallel composite upload.

  Returns:
    (elapsed_time, bytes_transferred, dst_url with generation,
    md5 hash of destination) excluding overhead like initial GET.

  Raises:
    CommandException: if errors encountered.
  """
  if not dst_obj_metadata or not dst_obj_metadata.contentLanguage:
    content_language = config.get_value('GSUtil', 'content_language')
    if content_language:
      dst_obj_metadata.contentLanguage = content_language

  fname_parts = src_url.object_name.split('.')
  upload_url = src_url
  upload_stream = src_obj_filestream
  upload_size = src_obj_size
  zipped_file = False
  if gzip_exts and len(fname_parts) > 1 and fname_parts[-1] in gzip_exts:
    upload_url, upload_size = _CompressFileForUpload(
        src_url, src_obj_filestream, src_obj_size, logger)
    upload_stream = open(upload_url.object_name, 'rb')
    dst_obj_metadata.contentEncoding = 'gzip'
    zipped_file = True

  elapsed_time = None
  uploaded_object = None
  hash_algs = GetUploadHashAlgs()
  digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})

  parallel_composite_upload = _ShouldDoParallelCompositeUpload(
      logger, allow_splitting, upload_url, dst_url, src_obj_size,
      canned_acl=global_copy_helper_opts.canned_acl)

  if (src_url.IsStream() and
      gsutil_api.GetApiSelector(provider=dst_url.scheme) == ApiSelector.JSON):
    orig_stream = upload_stream
    # Add limited seekable properties to the stream via buffering.
    upload_stream = ResumableStreamingJsonUploadWrapper(
        orig_stream, GetJsonResumableChunkSize())

  if not parallel_composite_upload and len(hash_algs):
    # Parallel composite uploads calculate hashes per-component in subsequent
    # calls to this function, but the composition of the final object is a
    # cloud-only operation.
    wrapped_filestream = HashingFileUploadWrapper(upload_stream, digesters,
                                                  hash_algs, upload_url, logger)
  else:
    wrapped_filestream = upload_stream

  try:
    if parallel_composite_upload:
      elapsed_time, uploaded_object = _DoParallelCompositeUpload(
          upload_stream, upload_url, dst_url, dst_obj_metadata,
          global_copy_helper_opts.canned_acl, upload_size, preconditions,
          gsutil_api, command_obj, copy_exception_handler)
    elif upload_size < ResumableThreshold() or src_url.IsStream():
      elapsed_time, uploaded_object = _UploadFileToObjectNonResumable(
          upload_url, wrapped_filestream, upload_size, dst_url,
          dst_obj_metadata, preconditions, gsutil_api, logger)
    else:
      elapsed_time, uploaded_object = _UploadFileToObjectResumable(
          upload_url, wrapped_filestream, upload_size, dst_url,
          dst_obj_metadata, preconditions, gsutil_api, logger)

  finally:
    if zipped_file:
      try:
        os.unlink(upload_url.object_name)
      # Windows sometimes complains the temp file is locked when you try to
      # delete it.
      except Exception:  # pylint: disable=broad-except
        logger.warning(
            'Could not delete %s. This can occur in Windows because the '
            'temporary file is still locked.', upload_url.object_name)
    # In the gzip case, this is the gzip stream.  _CompressFileForUpload will
    # have already closed the original source stream.
    upload_stream.close()

  if not parallel_composite_upload:
    try:
      digests = _CreateDigestsFromDigesters(digesters)
      _CheckHashes(logger, dst_url, uploaded_object, src_url.object_name,
                   digests, is_upload=True)
    except HashMismatchException:
      if _RENAME_ON_HASH_MISMATCH:
        corrupted_obj_metadata = apitools_messages.Object(
            name=dst_obj_metadata.name,
            bucket=dst_obj_metadata.bucket,
            etag=uploaded_object.etag)
        dst_obj_metadata.name = (dst_url.object_name +
                                 _RENAME_ON_HASH_MISMATCH_SUFFIX)
        gsutil_api.CopyObject(corrupted_obj_metadata,
                              dst_obj_metadata, provider=dst_url.scheme)
      # If the digest doesn't match, delete the object.
      gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name,
                              generation=uploaded_object.generation,
                              provider=dst_url.scheme)
      raise

  result_url = dst_url.Clone()

  result_url.generation = uploaded_object.generation
  result_url.generation = GenerationFromUrlAndString(
      result_url, uploaded_object.generation)

  return (elapsed_time, uploaded_object.size, result_url,
          uploaded_object.md5Hash)


def _GetDownloadFile(dst_url, src_obj_metadata, logger):
  """Creates a new download file, and deletes the file that will be replaced.

  Names and creates a temporary file for this download. Also, if there is an
  existing file at the path where this file will be placed after the download
  is completed, that file will be deleted.

  Args:
    dst_url: Destination FileUrl.
    src_obj_metadata: Metadata from the source object.
    logger: for outputting log messages.

  Returns:
    (download_file_name, need_to_unzip)
    download_file_name: The name of the temporary file to which the object will
                        be downloaded.
    need_to_unzip: If true, a temporary zip file was used and must be
                   uncompressed as part of validation.
  """
  dir_name = os.path.dirname(dst_url.object_name)
  if dir_name and not os.path.exists(dir_name):
    # Do dir creation in try block so can ignore case where dir already
    # exists. This is needed to avoid a race condition when running gsutil
    # -m cp.
    try:
      os.makedirs(dir_name)
    except OSError, e:
      if e.errno != errno.EEXIST:
        raise

  need_to_unzip = False
  # For gzipped objects download to a temp file and unzip. For the XML API,
  # this represents the result of a HEAD request. For the JSON API, this is
  # the stored encoding which the service may not respect. However, if the
  # server sends decompressed bytes for a file that is stored compressed
  # (double compressed case), there is no way we can validate the hash and
  # we will fail our hash check for the object.
  if (src_obj_metadata.contentEncoding and
      src_obj_metadata.contentEncoding.lower().endswith('gzip')):
    need_to_unzip = True
    download_file_name = _GetDownloadTempZipFileName(dst_url)
    logger.info(
        'Downloading to temp gzip filename %s', download_file_name)
  else:
    download_file_name = _GetDownloadTempFileName(dst_url)

  # If a file exists at the permanent destination (where the file will be moved
  # after the download is completed), delete it here to reduce disk space
  # requirements.
  if os.path.exists(dst_url.object_name):
    os.unlink(dst_url.object_name)

  # Downloads open the temporary download file in r+b mode, which requires it
  # to already exist, so we create it here if it doesn't exist already.
  fp = open(download_file_name, 'ab')
  fp.close()
  return download_file_name, need_to_unzip


def _ShouldDoSlicedDownload(download_strategy, src_obj_metadata,
                            allow_splitting, logger):
  """Determines whether the sliced download strategy should be used.

  Args:
    download_strategy: CloudApi download strategy.
    src_obj_metadata: Metadata from the source object.
    allow_splitting: If false, then this function returns false.
    logger: logging.Logger for log message output.

  Returns:
    True iff a sliced download should be performed on the source file.
  """
  sliced_object_download_threshold = HumanReadableToBytes(config.get(
      'GSUtil', 'sliced_object_download_threshold',
      DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD))

  max_components = config.getint(
      'GSUtil', 'sliced_object_download_max_components',
      DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS)

  # Don't use sliced download if it will prevent us from performing an
  # integrity check.
  check_hashes_config = config.get(
      'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL)
  parallel_hashing = src_obj_metadata.crc32c and UsingCrcmodExtension(crcmod)
  hashing_okay = parallel_hashing or check_hashes_config == CHECK_HASH_NEVER

  use_slice = (
      allow_splitting
      and download_strategy is not CloudApi.DownloadStrategy.ONE_SHOT
      and max_components > 1
      and hashing_okay
      and sliced_object_download_threshold > 0
      and src_obj_metadata.size >= sliced_object_download_threshold)

  if (not use_slice
      and src_obj_metadata.size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD
      and not UsingCrcmodExtension(crcmod)
      and check_hashes_config != CHECK_HASH_NEVER):
    with suggested_sliced_transfers_lock:
      if not suggested_sliced_transfers.get('suggested'):
        logger.info('\n'.join(textwrap.wrap(
            '==> NOTE: You are downloading one or more large file(s), which '
            'would run significantly faster if you enabled sliced object '
            'uploads. This feature is enabled by default but requires that '
            'compiled crcmod be installed (see "gsutil help crcmod").')) + '\n')
        suggested_sliced_transfers['suggested'] = True

  return use_slice


def _PerformSlicedDownloadObjectToFile(cls, args, thread_state=None):
  """Function argument to Apply for performing sliced downloads.

  Args:
    cls: Calling Command class.
    args: PerformSlicedDownloadObjectToFileArgs tuple describing the target.
    thread_state: gsutil Cloud API instance to use for the operation.

  Returns:
    PerformSlicedDownloadReturnValues named-tuple filled with:
    component_num: The component number for this download.
    crc32c: CRC32C hash value (integer) of the downloaded bytes.
    bytes_transferred: The number of bytes transferred, potentially less
                       than the component size if the download was resumed.
  """
  gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
  hash_algs = GetDownloadHashAlgs(
      cls.logger, consider_crc32c=args.src_obj_metadata.crc32c)
  digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})

  (bytes_transferred, server_encoding) = (
      _DownloadObjectToFileResumable(args.src_url, args.src_obj_metadata,
                                     args.dst_url, args.download_file_name,
                                     gsutil_api, cls.logger, digesters,
                                     component_num=args.component_num,
                                     start_byte=args.start_byte,
                                     end_byte=args.end_byte))

  crc32c_val = None
  if 'crc32c' in digesters:
    crc32c_val = digesters['crc32c'].crcValue
  return PerformSlicedDownloadReturnValues(
      args.component_num, crc32c_val, bytes_transferred, server_encoding)


def _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url,
                                        download_file_name, logger,
                                        api_selector, num_components):
  """Maintains sliced download tracker files in order to permit resumability.

  Reads or creates a sliced download tracker file representing this object
  download. Upon an attempt at cross-process resumption, the contents of the
  sliced download tracker file are verified to make sure a resumption is
  possible and appropriate. In the case that a resumption should not be
  attempted, existing component tracker files are deleted (to prevent child
  processes from attempting resumption), and a new sliced download tracker
  file is created.

  Args:
    src_obj_metadata: Metadata from the source object. Must include etag and
                      generation.
    dst_url: Destination FileUrl.
    download_file_name: Temporary file name to be used for the download.
    logger: for outputting log messages.
    api_selector: The Cloud API implementation used.
    num_components: The number of components to perform this download with.
  """
  assert src_obj_metadata.etag
  tracker_file = None

  # Only can happen if the resumable threshold is set higher than the
  # parallel transfer threshold.
  if src_obj_metadata.size < ResumableThreshold():
    return

  tracker_file_name = GetTrackerFilePath(dst_url,
                                         TrackerFileType.SLICED_DOWNLOAD,
                                         api_selector)

  # Check to see if we should attempt resuming the download.
  try:
    fp = open(download_file_name, 'rb')
    existing_file_size = GetFileSize(fp)
    # A parallel resumption should be attempted only if the destination file
    # size is exactly the same as the source size and the tracker file matches.
    if existing_file_size == src_obj_metadata.size:
      tracker_file = open(tracker_file_name, 'r')
      tracker_file_data = json.load(tracker_file)
      if (tracker_file_data['etag'] == src_obj_metadata.etag and
          tracker_file_data['generation'] == src_obj_metadata.generation and
          tracker_file_data['num_components'] == num_components):
        return
      else:
        tracker_file.close()
        logger.warn('Sliced download tracker file doesn\'t match for '
                    'download of %s. Restarting download from scratch.' %
                    dst_url.object_name)

  except (IOError, ValueError) as e:
    # Ignore non-existent file (happens first time a download
    # is attempted on an object), but warn user for other errors.
    if isinstance(e, ValueError) or e.errno != errno.ENOENT:
      logger.warn('Couldn\'t read sliced download tracker file (%s): %s. '
                  'Restarting download from scratch.' %
                  (tracker_file_name, str(e)))
  finally:
    if fp:
      fp.close()
    if tracker_file:
      tracker_file.close()

  # Delete component tracker files to guarantee download starts from scratch.
  DeleteDownloadTrackerFiles(dst_url, api_selector)

  # Create a new sliced download tracker file to represent this download.
  try:
    with open(tracker_file_name, 'w') as tracker_file:
      tracker_file_data = {'etag': src_obj_metadata.etag,
                           'generation': src_obj_metadata.generation,
                           'num_components': num_components}
      tracker_file.write(json.dumps(tracker_file_data))
  except IOError as e:
    RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror)


class SlicedDownloadFileWrapper(object):
  """Wraps a file object to be used in GetObjectMedia for sliced downloads.

  In order to allow resumability, the file object used by each thread in a
  sliced object download should be wrapped using SlicedDownloadFileWrapper.
  Passing a SlicedDownloadFileWrapper object to GetObjectMedia will allow the
  download component tracker file for this component to be updated periodically,
  while the downloaded bytes are normally written to file.
  """

  def __init__(self, fp, tracker_file_name, src_obj_metadata, start_byte,
               end_byte):
    """Initializes the SlicedDownloadFileWrapper.

    Args:
      fp: The already-open file object to be used for writing in
          GetObjectMedia. Data will be written to file starting at the current
          seek position.
      tracker_file_name: The name of the tracker file for this component.
      src_obj_metadata: Metadata from the source object. Must include etag and
                        generation.
      start_byte: The first byte to be downloaded for this parallel component.
      end_byte: The last byte to be downloaded for this parallel component.
    """
    self._orig_fp = fp
    self._tracker_file_name = tracker_file_name
    self._src_obj_metadata = src_obj_metadata
    self._last_tracker_file_byte = None
    self._start_byte = start_byte
    self._end_byte = end_byte

  def write(self, data):  # pylint: disable=invalid-name
    current_file_pos = self._orig_fp.tell()
    assert (self._start_byte <= current_file_pos and
            current_file_pos + len(data) <= self._end_byte + 1)

    self._orig_fp.write(data)
    current_file_pos = self._orig_fp.tell()

    threshold = TRACKERFILE_UPDATE_THRESHOLD
    if (self._last_tracker_file_byte is None or
        current_file_pos - self._last_tracker_file_byte > threshold or
        current_file_pos == self._end_byte + 1):
      WriteDownloadComponentTrackerFile(
          self._tracker_file_name, self._src_obj_metadata, current_file_pos)
      self._last_tracker_file_byte = current_file_pos

  def seek(self, offset, whence=os.SEEK_SET):  # pylint: disable=invalid-name
    if whence == os.SEEK_END:
      self._orig_fp.seek(offset + self._end_byte + 1)
    else:
      self._orig_fp.seek(offset, whence)
    assert self._start_byte <= self._orig_fp.tell() <= self._end_byte + 1

  def tell(self):  # pylint: disable=invalid-name
    return self._orig_fp.tell()

  def flush(self):  # pylint: disable=invalid-name
    self._orig_fp.flush()

  def close(self):  # pylint: disable=invalid-name
    if self._orig_fp:
      self._orig_fp.close()


def _PartitionObject(src_url, src_obj_metadata, dst_url,
                     download_file_name):
  """Partitions an object into components to be downloaded.

  Each component is a byte range of the object. The byte ranges
  of the returned components are mutually exclusive and collectively
  exhaustive. The byte ranges are inclusive at both end points.

  Args:
    src_url: Source CloudUrl.
    src_obj_metadata: Metadata from the source object.
    dst_url: Destination FileUrl.
    download_file_name: Temporary file name to be used for the download.

  Returns:
    components_to_download: A list of PerformSlicedDownloadObjectToFileArgs
                            to be used in Apply for the sliced download.
  """
  sliced_download_component_size = HumanReadableToBytes(
      config.get('GSUtil', 'sliced_object_download_component_size',
                 DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE))

  max_components = config.getint(
      'GSUtil', 'sliced_object_download_max_components',
      DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS)

  num_components, component_size = _GetPartitionInfo(
      src_obj_metadata.size, max_components, sliced_download_component_size)

  components_to_download = []
  component_lengths = []
  for i in range(num_components):
    start_byte = i * component_size
    end_byte = min((i + 1) * (component_size) - 1, src_obj_metadata.size - 1)
    component_lengths.append(end_byte - start_byte + 1)
    components_to_download.append(
        PerformSlicedDownloadObjectToFileArgs(
            i, src_url, src_obj_metadata, dst_url, download_file_name,
            start_byte, end_byte))
  return components_to_download, component_lengths


def _DoSlicedDownload(src_url, src_obj_metadata, dst_url, download_file_name,
                      command_obj, logger, copy_exception_handler,
                      api_selector):
  """Downloads a cloud object to a local file using sliced download.

  Byte ranges are decided for each thread/process, and then the parts are
  downloaded in parallel.

  Args:
    src_url: Source CloudUrl.
    src_obj_metadata: Metadata from the source object.
    dst_url: Destination FileUrl.
    download_file_name: Temporary file name to be used for download.
    command_obj: command object for use in Apply in parallel composite uploads.
    logger: for outputting log messages.
    copy_exception_handler: For handling copy exceptions during Apply.
    api_selector: The Cloud API implementation used.

  Returns:
    (bytes_transferred, crc32c)
    bytes_transferred: Number of bytes transferred from server this call.
    crc32c: a crc32c hash value (integer) for the downloaded bytes, or None if
            crc32c hashing wasn't performed.
  """
  components_to_download, component_lengths = _PartitionObject(
      src_url, src_obj_metadata, dst_url, download_file_name)

  num_components = len(components_to_download)
  _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url,
                                      download_file_name, logger,
                                      api_selector, num_components)

  # Resize the download file so each child process can seek to its start byte.
  with open(download_file_name, 'ab') as fp:
    fp.truncate(src_obj_metadata.size)

  cp_results = command_obj.Apply(
      _PerformSlicedDownloadObjectToFile, components_to_download,
      copy_exception_handler, arg_checker=gslib.command.DummyArgChecker,
      parallel_operations_override=True, should_return_results=True)

  if len(cp_results) < num_components:
    raise CommandException(
        'Some components of %s were not downloaded successfully. '
        'Please retry this download.' % dst_url.object_name)

  # Crc32c hashes have to be concatenated in the correct order.
  cp_results = sorted(cp_results, key=attrgetter('component_num'))
  crc32c = cp_results[0].crc32c
  if crc32c is not None:
    for i in range(1, num_components):
      crc32c = ConcatCrc32c(crc32c, cp_results[i].crc32c,
                            component_lengths[i])

  bytes_transferred = 0
  expect_gzip = (src_obj_metadata.contentEncoding and
                 src_obj_metadata.contentEncoding.lower().endswith('gzip'))
  for cp_result in cp_results:
    bytes_transferred += cp_result.bytes_transferred
    server_gzip = (cp_result.server_encoding and
                   cp_result.server_encoding.lower().endswith('gzip'))
    # If the server gzipped any components on the fly, we will have no chance of
    # properly reconstructing the file.
    if server_gzip and not expect_gzip:
      raise CommandException(
          'Download of %s failed because the server sent back data with an '
          'unexpected encoding.' % dst_url.object_name)

  return bytes_transferred, crc32c


def _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url,
                                   download_file_name, gsutil_api, logger,
                                   digesters, component_num=None, start_byte=0,
                                   end_byte=None):
  """Downloads an object to a local file using the resumable strategy.

  Args:
    src_url: Source CloudUrl.
    src_obj_metadata: Metadata from the source object.
    dst_url: Destination FileUrl.
    download_file_name: Temporary file name to be used for download.
    gsutil_api: gsutil Cloud API instance to use for the download.
    logger: for outputting log messages.
    digesters: Digesters corresponding to the hash algorithms that will be used
               for validation.
    component_num: Which component of a sliced download this call is for, or
                   None if this is not a sliced download.
    start_byte: The first byte of a byte range for a sliced download.
    end_byte: The last byte of a byte range for a sliced download.

  Returns:
    (bytes_transferred, server_encoding)
    bytes_transferred: Number of bytes transferred from server this call.
    server_encoding: Content-encoding string if it was detected that the server
                     sent encoded bytes during transfer, None otherwise.
  """
  if end_byte is None:
    end_byte = src_obj_metadata.size - 1
  download_size = end_byte - start_byte + 1

  is_sliced = component_num is not None
  api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
  server_encoding = None

  # Used for logging
  download_name = dst_url.object_name
  if is_sliced:
    download_name += ' component %d' % component_num

  try:
    fp = open(download_file_name, 'r+b')
    fp.seek(start_byte)
    api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
    existing_file_size = GetFileSize(fp)

    tracker_file_name, download_start_byte = (
        ReadOrCreateDownloadTrackerFile(src_obj_metadata, dst_url, logger,
                                        api_selector, start_byte,
                                        existing_file_size, component_num))

    if download_start_byte < start_byte or download_start_byte > end_byte + 1:
      DeleteTrackerFile(tracker_file_name)
      raise CommandException(
          'Resumable download start point for %s is not in the correct byte '
          'range. Deleting tracker file, so if you re-try this download it '
          'will start from scratch' % download_name)

    download_complete = (download_start_byte == start_byte + download_size)
    resuming = (download_start_byte != start_byte) and not download_complete
    if resuming:
      logger.info('Resuming download for %s', download_name)
    elif download_complete:
      logger.info(
          'Download already complete for %s, skipping download but '
          'will run integrity checks.', download_name)

    # This is used for resuming downloads, but also for passing the mediaLink
    # and size into the download for new downloads so that we can avoid
    # making an extra HTTP call.
    serialization_data = GetDownloadSerializationData(
        src_obj_metadata, progress=download_start_byte)

    if resuming or download_complete:
      # Catch up our digester with the hash data.
      bytes_digested = 0
      total_bytes_to_digest = download_start_byte - start_byte
      hash_callback = ProgressCallbackWithBackoff(
          total_bytes_to_digest,
          FileProgressCallbackHandler(
              ConstructAnnounceText('Hashing',
                                    dst_url.url_string), logger).call)

      while bytes_digested < total_bytes_to_digest:
        bytes_to_read = min(DEFAULT_FILE_BUFFER_SIZE,
                            total_bytes_to_digest - bytes_digested)
        data = fp.read(bytes_to_read)
        bytes_digested += bytes_to_read
        for alg_name in digesters:
          digesters[alg_name].update(data)
        hash_callback.Progress(len(data))

    elif not is_sliced:
      # Delete file contents and start entire object download from scratch.
      fp.truncate(0)
      existing_file_size = 0

    progress_callback = FileProgressCallbackHandler(
        ConstructAnnounceText('Downloading', dst_url.url_string), logger,
        start_byte, download_size).call

    if global_copy_helper_opts.test_callback_file:
      with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
        progress_callback = pickle.loads(test_fp.read()).call

    if is_sliced and src_obj_metadata.size >= ResumableThreshold():
      fp = SlicedDownloadFileWrapper(fp, tracker_file_name, src_obj_metadata,
                                     start_byte, end_byte)

    # TODO: With gzip encoding (which may occur on-the-fly and not be part of
    # the object's metadata), when we request a range to resume, it's possible
    # that the server will just resend the entire object, which means our
    # caught-up hash will be incorrect.  We recalculate the hash on
    # the local file in the case of a failed gzip hash anyway, but it would
    # be better if we actively detected this case.
    if not download_complete:
      fp.seek(download_start_byte)
      server_encoding = gsutil_api.GetObjectMedia(
          src_url.bucket_name, src_url.object_name, fp,
          start_byte=download_start_byte, end_byte=end_byte,
          generation=src_url.generation, object_size=src_obj_metadata.size,
          download_strategy=CloudApi.DownloadStrategy.RESUMABLE,
          provider=src_url.scheme, serialization_data=serialization_data,
          digesters=digesters, progress_callback=progress_callback)

  except ResumableDownloadException as e:
    logger.warning('Caught ResumableDownloadException (%s) for download of %s.',
                   e.reason, download_name)
    raise
  finally:
    if fp:
      fp.close()

  bytes_transferred = end_byte - download_start_byte + 1
  return bytes_transferred, server_encoding


def _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url,
                                      download_file_name, gsutil_api, logger,
                                      digesters):
  """Downloads an object to a local file using the non-resumable strategy.

  Args:
    src_url: Source CloudUrl.
    src_obj_metadata: Metadata from the source object.
    dst_url: Destination FileUrl.
    download_file_name: Temporary file name to be used for download.
    gsutil_api: gsutil Cloud API instance to use for the download.
    logger: for outputting log messages.
    digesters: Digesters corresponding to the hash algorithms that will be used
               for validation.
  Returns:
    (bytes_transferred, server_encoding)
    bytes_transferred: Number of bytes transferred from server this call.
    server_encoding: Content-encoding string if it was detected that the server
                     sent encoded bytes during transfer, None otherwise.
  """
  try:
    fp = open(download_file_name, 'w')

    # This is used to pass the mediaLink and the size into the download so that
    # we can avoid making an extra HTTP call.
    serialization_data = GetDownloadSerializationData(src_obj_metadata)

    progress_callback = FileProgressCallbackHandler(
        ConstructAnnounceText('Downloading', dst_url.url_string), logger).call

    if global_copy_helper_opts.test_callback_file:
      with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
        progress_callback = pickle.loads(test_fp.read()).call

    server_encoding = gsutil_api.GetObjectMedia(
        src_url.bucket_name, src_url.object_name, fp,
        generation=src_url.generation, object_size=src_obj_metadata.size,
        download_strategy=CloudApi.DownloadStrategy.ONE_SHOT,
        provider=src_url.scheme, serialization_data=serialization_data,
        digesters=digesters, progress_callback=progress_callback)
  finally:
    if fp:
      fp.close()

  return src_obj_metadata.size, server_encoding


def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
                          gsutil_api, logger, command_obj,
                          copy_exception_handler, allow_splitting=True):
  """Downloads an object to a local file.

  Args:
    src_url: Source CloudUrl.
    src_obj_metadata: Metadata from the source object.
    dst_url: Destination FileUrl.
    gsutil_api: gsutil Cloud API instance to use for the download.
    logger: for outputting log messages.
    command_obj: command object for use in Apply in sliced downloads.
    copy_exception_handler: For handling copy exceptions during Apply.
    allow_splitting: Whether or not to allow sliced download.
  Returns:
    (elapsed_time, bytes_transferred, dst_url, md5), where time elapsed
    excludes initial GET.

  Raises:
    FileConcurrencySkipError: if this download is already in progress.
    CommandException: if other errors encountered.
  """
  global open_files_map, open_files_lock
  if dst_url.object_name.endswith(dst_url.delim):
    logger.warn('\n'.join(textwrap.wrap(
        'Skipping attempt to download to filename ending with slash (%s). This '
        'typically happens when using gsutil to download from a subdirectory '
        'created by the Cloud Console (https://cloud.google.com/console)'
        % dst_url.object_name)))
    return (0, 0, dst_url, '')

  api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme)
  download_strategy = _SelectDownloadStrategy(dst_url)
  sliced_download = _ShouldDoSlicedDownload(
      download_strategy, src_obj_metadata, allow_splitting, logger)

  download_file_name, need_to_unzip = _GetDownloadFile(
      dst_url, src_obj_metadata, logger)

  # Ensure another process/thread is not already writing to this file.
  with open_files_lock:
    if open_files_map.get(download_file_name, False):
      raise FileConcurrencySkipError
    open_files_map[download_file_name] = True

  # Set up hash digesters.
  consider_md5 = src_obj_metadata.md5Hash and not sliced_download
  hash_algs = GetDownloadHashAlgs(logger, consider_md5=consider_md5,
                                  consider_crc32c=src_obj_metadata.crc32c)
  digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})

  # Tracks whether the server used a gzip encoding.
  server_encoding = None
  download_complete = (src_obj_metadata.size == 0)
  bytes_transferred = 0

  start_time = time.time()
  if not download_complete:
    if sliced_download:
      (bytes_transferred, crc32c) = (
          _DoSlicedDownload(src_url, src_obj_metadata, dst_url,
                            download_file_name, command_obj, logger,
                            copy_exception_handler, api_selector))
      if 'crc32c' in digesters:
        digesters['crc32c'].crcValue = crc32c
    elif download_strategy is CloudApi.DownloadStrategy.ONE_SHOT:
      (bytes_transferred, server_encoding) = (
          _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url,
                                            download_file_name, gsutil_api,
                                            logger, digesters))
    elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE:
      (bytes_transferred, server_encoding) = (
          _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url,
                                         download_file_name, gsutil_api, logger,
                                         digesters))
    else:
      raise CommandException('Invalid download strategy %s chosen for'
                             'file %s' % (download_strategy,
                                          download_file_name))
  end_time = time.time()

  server_gzip = server_encoding and server_encoding.lower().endswith('gzip')
  local_md5 = _ValidateAndCompleteDownload(
      logger, src_url, src_obj_metadata, dst_url, need_to_unzip, server_gzip,
      digesters, hash_algs, download_file_name, api_selector, bytes_transferred)

  with open_files_lock:
    open_files_map.delete(download_file_name)

  return (end_time - start_time, bytes_transferred, dst_url, local_md5)


def _GetDownloadTempZipFileName(dst_url):
  """Returns temporary file name for a temporarily compressed download."""
  return '%s_.gztmp' % dst_url.object_name


def _GetDownloadTempFileName(dst_url):
  """Returns temporary download file name for uncompressed downloads."""
  return '%s_.gstmp' % dst_url.object_name


def _ValidateAndCompleteDownload(logger, src_url, src_obj_metadata, dst_url,
                                 need_to_unzip, server_gzip, digesters,
                                 hash_algs, download_file_name,
                                 api_selector, bytes_transferred):
  """Validates and performs necessary operations on a downloaded file.

  Validates the integrity of the downloaded file using hash_algs. If the file
  was compressed (temporarily), the file will be decompressed. Then, if the
  integrity of the file was successfully validated, the file will be moved
  from its temporary download location to its permanent location on disk.

  Args:
    logger: For outputting log messages.
    src_url: StorageUrl for the source object.
    src_obj_metadata: Metadata for the source object, potentially containing
                      hash values.
    dst_url: StorageUrl describing the destination file.
    need_to_unzip: If true, a temporary zip file was used and must be
                   uncompressed as part of validation.
    server_gzip: If true, the server gzipped the bytes (regardless of whether
                 the object metadata claimed it was gzipped).
    digesters: dict of {string, hash digester} that contains up-to-date digests
               computed during the download. If a digester for a particular
               algorithm is None, an up-to-date digest is not available and the
               hash must be recomputed from the local file.
    hash_algs: dict of {string, hash algorithm} that can be used if digesters
               don't have up-to-date digests.
    download_file_name: Temporary file name that was used for download.
    api_selector: The Cloud API implementation used (used tracker file naming).
    bytes_transferred: Number of bytes downloaded (used for logging).

  Returns:
    An MD5 of the local file, if one was calculated as part of the integrity
    check.
  """
  final_file_name = dst_url.object_name
  file_name = download_file_name
  digesters_succeeded = True

  for alg in digesters:
    # If we get a digester with a None algorithm, the underlying
    # implementation failed to calculate a digest, so we will need to
    # calculate one from scratch.
    if not digesters[alg]:
      digesters_succeeded = False
      break

  if digesters_succeeded:
    local_hashes = _CreateDigestsFromDigesters(digesters)
  else:
    local_hashes = _CreateDigestsFromLocalFile(
        logger, hash_algs, file_name, final_file_name, src_obj_metadata)

  digest_verified = True
  hash_invalid_exception = None
  try:
    _CheckHashes(logger, src_url, src_obj_metadata, final_file_name,
                 local_hashes)
    DeleteDownloadTrackerFiles(dst_url, api_selector)
  except HashMismatchException, e:
    # If an non-gzipped object gets sent with gzip content encoding, the hash
    # we calculate will match the gzipped bytes, not the original object. Thus,
    # we'll need to calculate and check it after unzipping.
    if server_gzip:
      logger.debug(
          'Hash did not match but server gzipped the content, will '
          'recalculate.')
      digest_verified = False
    elif api_selector == ApiSelector.XML:
      logger.debug(
          'Hash did not match but server may have gzipped the content, will '
          'recalculate.')
      # Save off the exception in case this isn't a gzipped file.
      hash_invalid_exception = e
      digest_verified = False
    else:
      DeleteDownloadTrackerFiles(dst_url, api_selector)
      if _RENAME_ON_HASH_MISMATCH:
        os.rename(file_name,
                  final_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX)
      else:
        os.unlink(file_name)
      raise

  if need_to_unzip or server_gzip:
    # Log that we're uncompressing if the file is big enough that
    # decompressing would make it look like the transfer "stalled" at the end.
    if bytes_transferred > TEN_MIB:
      logger.info(
          'Uncompressing temporarily gzipped file to %s...', final_file_name)

    gzip_fp = None
    try:
      # Downloaded temporarily gzipped file, unzip to file without '_.gztmp'
      # suffix.
      gzip_fp = gzip.open(file_name, 'rb')
      with open(final_file_name, 'wb') as f_out:
        data = gzip_fp.read(GZIP_CHUNK_SIZE)
        while data:
          f_out.write(data)
          data = gzip_fp.read(GZIP_CHUNK_SIZE)
    except IOError, e:
      # In the XML case where we don't know if the file was gzipped, raise
      # the original hash exception if we find that it wasn't.
      if 'Not a gzipped file' in str(e) and hash_invalid_exception:
        # Linter improperly thinks we're raising None despite the above check.
        # pylint: disable=raising-bad-type
        raise hash_invalid_exception
    finally:
      if gzip_fp:
        gzip_fp.close()

    os.unlink(file_name)
    file_name = final_file_name

  if not digest_verified:
    try:
      # Recalculate hashes on the unzipped local file.
      local_hashes = _CreateDigestsFromLocalFile(
          logger, hash_algs, file_name, final_file_name, src_obj_metadata)
      _CheckHashes(logger, src_url, src_obj_metadata, final_file_name,
                   local_hashes)
      DeleteDownloadTrackerFiles(dst_url, api_selector)
    except HashMismatchException:
      DeleteDownloadTrackerFiles(dst_url, api_selector)
      if _RENAME_ON_HASH_MISMATCH:
        os.rename(file_name,
                  file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX)
      else:
        os.unlink(file_name)
      raise

  if file_name != final_file_name:
    # Data is still in a temporary file, so move it to a permanent location.
    if os.path.exists(final_file_name):
      os.unlink(final_file_name)
    os.rename(file_name,
              final_file_name)

  if 'md5' in local_hashes:
    return local_hashes['md5']


def _CopyFileToFile(src_url, dst_url):
  """Copies a local file to a local file.

  Args:
    src_url: Source FileUrl.
    dst_url: Destination FileUrl.
  Returns:
    (elapsed_time, bytes_transferred, dst_url, md5=None).

  Raises:
    CommandException: if errors encountered.
  """
  src_fp = GetStreamFromFileUrl(src_url)
  dir_name = os.path.dirname(dst_url.object_name)
  if dir_name and not os.path.exists(dir_name):
    os.makedirs(dir_name)
  dst_fp = open(dst_url.object_name, 'wb')
  start_time = time.time()
  shutil.copyfileobj(src_fp, dst_fp)
  end_time = time.time()
  return (end_time - start_time, os.path.getsize(dst_url.object_name),
          dst_url, None)


def _DummyTrackerCallback(_):
  pass


# pylint: disable=undefined-variable
def _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, dst_url,
                                dst_obj_metadata, preconditions, gsutil_api,
                                logger):
  """Copies from src_url to dst_url in "daisy chain" mode.

  See -D OPTION documentation about what daisy chain mode is.

  Args:
    src_url: Source CloudUrl
    src_obj_metadata: Metadata from source object
    dst_url: Destination CloudUrl
    dst_obj_metadata: Object-specific metadata that should be overidden during
                      the copy.
    preconditions: Preconditions to use for the copy.
    gsutil_api: gsutil Cloud API to use for the copy.
    logger: For outputting log messages.

  Returns:
    (elapsed_time, bytes_transferred, dst_url with generation,
    md5 hash of destination) excluding overhead like initial GET.

  Raises:
    CommandException: if errors encountered.
  """
  # We don't attempt to preserve ACLs across providers because
  # GCS and S3 support different ACLs and disjoint principals.
  if (global_copy_helper_opts.preserve_acl
      and src_url.scheme != dst_url.scheme):
    raise NotImplementedError(
        'Cross-provider cp -p not supported')
  if not global_copy_helper_opts.preserve_acl:
    dst_obj_metadata.acl = []

  # Don't use callbacks for downloads on the daisy chain wrapper because
  # upload callbacks will output progress, but respect test hooks if present.
  progress_callback = None
  if global_copy_helper_opts.test_callback_file:
    with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
      progress_callback = pickle.loads(test_fp.read()).call

  start_time = time.time()
  upload_fp = DaisyChainWrapper(src_url, src_obj_metadata.size, gsutil_api,
                                progress_callback=progress_callback)
  uploaded_object = None
  if src_obj_metadata.size == 0:
    # Resumable uploads of size 0 are not supported.
    uploaded_object = gsutil_api.UploadObject(
        upload_fp, object_metadata=dst_obj_metadata,
        canned_acl=global_copy_helper_opts.canned_acl,
        preconditions=preconditions, provider=dst_url.scheme,
        fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size)
  else:
    # TODO: Support process-break resumes. This will resume across connection
    # breaks and server errors, but the tracker callback is a no-op so this
    # won't resume across gsutil runs.
    # TODO: Test retries via test_callback_file.
    uploaded_object = gsutil_api.UploadObjectResumable(
        upload_fp, object_metadata=dst_obj_metadata,
        canned_acl=global_copy_helper_opts.canned_acl,
        preconditions=preconditions, provider=dst_url.scheme,
        fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size,
        progress_callback=FileProgressCallbackHandler(
            ConstructAnnounceText('Uploading', dst_url.url_string),
            logger).call,
        tracker_callback=_DummyTrackerCallback)
  end_time = time.time()

  try:
    _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata,
                      uploaded_object)
  except HashMismatchException:
    if _RENAME_ON_HASH_MISMATCH:
      corrupted_obj_metadata = apitools_messages.Object(
          name=dst_obj_metadata.name,
          bucket=dst_obj_metadata.bucket,
          etag=uploaded_object.etag)
      dst_obj_metadata.name = (dst_url.object_name +
                               _RENAME_ON_HASH_MISMATCH_SUFFIX)
      gsutil_api.CopyObject(corrupted_obj_metadata,
                            dst_obj_metadata, provider=dst_url.scheme)
    # If the digest doesn't match, delete the object.
    gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name,
                            generation=uploaded_object.generation,
                            provider=dst_url.scheme)
    raise

  result_url = dst_url.Clone()
  result_url.generation = GenerationFromUrlAndString(
      result_url, uploaded_object.generation)

  return (end_time - start_time, src_obj_metadata.size, result_url,
          uploaded_object.md5Hash)


# pylint: disable=undefined-variable
# pylint: disable=too-many-statements
def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj,
                copy_exception_handler, allow_splitting=True,
                headers=None, manifest=None, gzip_exts=None):
  """Performs copy from src_url to dst_url, handling various special cases.

  Args:
    logger: for outputting log messages.
    src_url: Source StorageUrl.
    dst_url: Destination StorageUrl.
    gsutil_api: gsutil Cloud API instance to use for the copy.
    command_obj: command object for use in Apply in parallel composite uploads
        and sliced object downloads.
    copy_exception_handler: for handling copy exceptions during Apply.
    allow_splitting: Whether to allow the file to be split into component
                     pieces for an parallel composite upload or download.
    headers: optional headers to use for the copy operation.
    manifest: optional manifest for tracking copy operations.
    gzip_exts: List of file extensions to gzip for uploads, if any.

  Returns:
    (elapsed_time, bytes_transferred, version-specific dst_url) excluding
    overhead like initial GET.

  Raises:
    ItemExistsError: if no clobber flag is specified and the destination
        object already exists.
    SkipUnsupportedObjectError: if skip_unsupported_objects flag is specified
        and the source is an unsupported type.
    CommandException: if other errors encountered.
  """
  if headers:
    dst_obj_headers = headers.copy()
  else:
    dst_obj_headers = {}

  # Create a metadata instance for each destination object so metadata
  # such as content-type can be applied per-object.
  # Initialize metadata from any headers passed in via -h.
  dst_obj_metadata = ObjectMetadataFromHeaders(dst_obj_headers)

  if dst_url.IsCloudUrl() and dst_url.scheme == 'gs':
    preconditions = PreconditionsFromHeaders(dst_obj_headers)
  else:
    preconditions = Preconditions()

  src_obj_metadata = None
  src_obj_filestream = None
  if src_url.IsCloudUrl():
    src_obj_fields = None
    if dst_url.IsCloudUrl():
      # For cloud or daisy chain copy, we need every copyable field.
      # If we're not modifying or overriding any of the fields, we can get
      # away without retrieving the object metadata because the copy
      # operation can succeed with just the destination bucket and object
      # name.  But if we are sending any metadata, the JSON API will expect a
      # complete object resource.  Since we want metadata like the object size
      # for our own tracking, we just get all of the metadata here.
      src_obj_fields = ['cacheControl', 'componentCount',
                        'contentDisposition', 'contentEncoding',
                        'contentLanguage', 'contentType', 'crc32c',
                        'etag', 'generation', 'md5Hash', 'mediaLink',
                        'metadata', 'metageneration', 'size']
      # We only need the ACL if we're going to preserve it.
      if global_copy_helper_opts.preserve_acl:
        src_obj_fields.append('acl')
      if (src_url.scheme == dst_url.scheme
          and not global_copy_helper_opts.daisy_chain):
        copy_in_the_cloud = True
      else:
        copy_in_the_cloud = False
    else:
      # Just get the fields needed to validate the download.
      src_obj_fields = ['crc32c', 'contentEncoding', 'contentType', 'etag',
                        'mediaLink', 'md5Hash', 'size', 'generation']

    if (src_url.scheme == 's3' and
        global_copy_helper_opts.skip_unsupported_objects):
      src_obj_fields.append('storageClass')

    try:
      src_generation = GenerationFromUrlAndString(src_url, src_url.generation)
      src_obj_metadata = gsutil_api.GetObjectMetadata(
          src_url.bucket_name, src_url.object_name,
          generation=src_generation, provider=src_url.scheme,
          fields=src_obj_fields)
    except NotFoundException:
      raise CommandException(
          'NotFoundException: Could not retrieve source object %s.' %
          src_url.url_string)
    if (src_url.scheme == 's3' and
        global_copy_helper_opts.skip_unsupported_objects and
        src_obj_metadata.storageClass == 'GLACIER'):
      raise SkipGlacierError()

    src_obj_size = src_obj_metadata.size
    dst_obj_metadata.contentType = src_obj_metadata.contentType
    if global_copy_helper_opts.preserve_acl:
      dst_obj_metadata.acl = src_obj_metadata.acl
      # Special case for S3-to-S3 copy URLs using
      # global_copy_helper_opts.preserve_acl.
      # dst_url will be verified in _CopyObjToObjDaisyChainMode if it
      # is not s3 (and thus differs from src_url).
      if src_url.scheme == 's3':
        acl_text = S3MarkerAclFromObjectMetadata(src_obj_metadata)
        if acl_text:
          AddS3MarkerAclToObjectMetadata(dst_obj_metadata, acl_text)
  else:
    try:
      src_obj_filestream = GetStreamFromFileUrl(src_url)
    except Exception, e:  # pylint: disable=broad-except
      if command_obj.continue_on_error:
        message = 'Error copying %s: %s' % (src_url, str(e))
        command_obj.op_failure_count += 1
        logger.error(message)
        return
      else:
        raise CommandException('Error opening file "%s": %s.' % (src_url,
                                                                 e.message))
    if src_url.IsStream():
      src_obj_size = None
    else:
      src_obj_size = os.path.getsize(src_url.object_name)

  if global_copy_helper_opts.use_manifest:
    # Set the source size in the manifest.
    manifest.Set(src_url.url_string, 'size', src_obj_size)

  if (dst_url.scheme == 's3' and src_obj_size > S3_MAX_UPLOAD_SIZE
      and src_url != 's3'):
    raise CommandException(
        '"%s" exceeds the maximum gsutil-supported size for an S3 upload. S3 '
        'objects greater than %s in size require multipart uploads, which '
        'gsutil does not support.' % (src_url,
                                      MakeHumanReadable(S3_MAX_UPLOAD_SIZE)))

  # On Windows, stdin is opened as text mode instead of binary which causes
  # problems when piping a binary file, so this switches it to binary mode.
  if IS_WINDOWS and src_url.IsFileUrl() and src_url.IsStream():
    msvcrt.setmode(GetStreamFromFileUrl(src_url).fileno(), os.O_BINARY)

  if global_copy_helper_opts.no_clobber:
    # There are two checks to prevent clobbering:
    # 1) The first check is to see if the URL
    #    already exists at the destination and prevent the upload/download
    #    from happening. This is done by the exists() call.
    # 2) The second check is only relevant if we are writing to gs. We can
    #    enforce that the server only writes the object if it doesn't exist
    #    by specifying the header below. This check only happens at the
    #    server after the complete file has been uploaded. We specify this
    #    header to prevent a race condition where a destination file may
    #    be created after the first check and before the file is fully
    #    uploaded.
    # In order to save on unnecessary uploads/downloads we perform both
    # checks. However, this may come at the cost of additional HTTP calls.
    if preconditions.gen_match:
      raise ArgumentException('Specifying x-goog-if-generation-match is '
                              'not supported with cp -n')
    else:
      preconditions.gen_match = 0
    if dst_url.IsFileUrl() and os.path.exists(dst_url.object_name):
      # The local file may be a partial. Check the file sizes.
      if src_obj_size == os.path.getsize(dst_url.object_name):
        raise ItemExistsError()
    elif dst_url.IsCloudUrl():
      try:
        dst_object = gsutil_api.GetObjectMetadata(
            dst_url.bucket_name, dst_url.object_name, provider=dst_url.scheme)
      except NotFoundException:
        dst_object = None
      if dst_object:
        raise ItemExistsError()

  if dst_url.IsCloudUrl():
    # Cloud storage API gets object and bucket name from metadata.
    dst_obj_metadata.name = dst_url.object_name
    dst_obj_metadata.bucket = dst_url.bucket_name
    if src_url.IsCloudUrl():
      # Preserve relevant metadata from the source object if it's not already
      # provided from the headers.
      CopyObjectMetadata(src_obj_metadata, dst_obj_metadata, override=False)
      src_obj_metadata.name = src_url.object_name
      src_obj_metadata.bucket = src_url.bucket_name
    else:
      _SetContentTypeFromFile(src_url, dst_obj_metadata)
  else:
    # Files don't have Cloud API metadata.
    dst_obj_metadata = None

  _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata)

  if src_url.IsCloudUrl():
    if dst_url.IsFileUrl():
      return _DownloadObjectToFile(src_url, src_obj_metadata, dst_url,
                                   gsutil_api, logger, command_obj,
                                   copy_exception_handler,
                                   allow_splitting=allow_splitting)
    elif copy_in_the_cloud:
      return _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url,
                                     dst_obj_metadata, preconditions,
                                     gsutil_api, logger)
    else:
      return _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata,
                                         dst_url, dst_obj_metadata,
                                         preconditions, gsutil_api, logger)
  else:  # src_url.IsFileUrl()
    if dst_url.IsCloudUrl():
      return _UploadFileToObject(
          src_url, src_obj_filestream, src_obj_size, dst_url,
          dst_obj_metadata, preconditions, gsutil_api, logger, command_obj,
          copy_exception_handler, gzip_exts=gzip_exts,
          allow_splitting=allow_splitting)
    else:  # dst_url.IsFileUrl()
      return _CopyFileToFile(src_url, dst_url)


class Manifest(object):
  """Stores the manifest items for the CpCommand class."""

  def __init__(self, path):
    # self.items contains a dictionary of rows
    self.items = {}
    self.manifest_filter = {}
    self.lock = CreateLock()

    self.manifest_path = os.path.expanduser(path)
    self._ParseManifest()
    self._CreateManifestFile()

  def _ParseManifest(self):
    """Load and parse a manifest file.

    This information will be used to skip any files that have a skip or OK
    status.
    """
    try:
      if os.path.exists(self.manifest_path):
        with open(self.manifest_path, 'rb') as f:
          first_row = True
          reader = csv.reader(f)
          for row in reader:
            if first_row:
              try:
                source_index = row.index('Source')
                result_index = row.index('Result')
              except ValueError:
                # No header and thus not a valid manifest file.
                raise CommandException(
                    'Missing headers in manifest file: %s' % self.manifest_path)
            first_row = False
            source = row[source_index]
            result = row[result_index]
            if result in ['OK', 'skip']:
              # We're always guaranteed to take the last result of a specific
              # source url.
              self.manifest_filter[source] = result
    except IOError:
      raise CommandException('Could not parse %s' % self.manifest_path)

  def WasSuccessful(self, src):
    """Returns whether the specified src url was marked as successful."""
    return src in self.manifest_filter

  def _CreateManifestFile(self):
    """Opens the manifest file and assigns it to the file pointer."""
    try:
      if ((not os.path.exists(self.manifest_path))
          or (os.stat(self.manifest_path).st_size == 0)):
        # Add headers to the new file.
        with open(self.manifest_path, 'wb', 1) as f:
          writer = csv.writer(f)
          writer.writerow(['Source',
                           'Destination',
                           'Start',
                           'End',
                           'Md5',
                           'UploadId',
                           'Source Size',
                           'Bytes Transferred',
                           'Result',
                           'Description'])
    except IOError:
      raise CommandException('Could not create manifest file.')

  def Set(self, url, key, value):
    if value is None:
      # In case we don't have any information to set we bail out here.
      # This is so that we don't clobber existing information.
      # To zero information pass '' instead of None.
      return
    if url in self.items:
      self.items[url][key] = value
    else:
      self.items[url] = {key: value}

  def Initialize(self, source_url, destination_url):
    # Always use the source_url as the key for the item. This is unique.
    self.Set(source_url, 'source_uri', source_url)
    self.Set(source_url, 'destination_uri', destination_url)
    self.Set(source_url, 'start_time', datetime.datetime.utcnow())

  def SetResult(self, source_url, bytes_transferred, result,
                description=''):
    self.Set(source_url, 'bytes', bytes_transferred)
    self.Set(source_url, 'result', result)
    self.Set(source_url, 'description', description)
    self.Set(source_url, 'end_time', datetime.datetime.utcnow())
    self._WriteRowToManifestFile(source_url)
    self._RemoveItemFromManifest(source_url)

  def _WriteRowToManifestFile(self, url):
    """Writes a manifest entry to the manifest file for the url argument."""
    row_item = self.items[url]
    data = [
        str(row_item['source_uri'].encode(UTF8)),
        str(row_item['destination_uri'].encode(UTF8)),
        '%sZ' % row_item['start_time'].isoformat(),
        '%sZ' % row_item['end_time'].isoformat(),
        row_item['md5'] if 'md5' in row_item else '',
        row_item['upload_id'] if 'upload_id' in row_item else '',
        str(row_item['size']) if 'size' in row_item else '',
        str(row_item['bytes']) if 'bytes' in row_item else '',
        row_item['result'],
        row_item['description'].encode(UTF8)]

    # Aquire a lock to prevent multiple threads writing to the same file at
    # the same time. This would cause a garbled mess in the manifest file.
    with self.lock:
      with open(self.manifest_path, 'a', 1) as f:  # 1 == line buffered
        writer = csv.writer(f)
        writer.writerow(data)

  def _RemoveItemFromManifest(self, url):
    # Remove the item from the dictionary since we're done with it and
    # we don't want the dictionary to grow too large in memory for no good
    # reason.
    del self.items[url]


class ItemExistsError(Exception):
  """Exception class for objects that are skipped because they already exist."""
  pass


class SkipUnsupportedObjectError(Exception):
  """Exception for objects skipped because they are an unsupported type."""

  def __init__(self):
    super(SkipUnsupportedObjectError, self).__init__()
    self.unsupported_type = 'Unknown'


class SkipGlacierError(SkipUnsupportedObjectError):
  """Exception for objects skipped because they are an unsupported type."""

  def __init__(self):
    super(SkipGlacierError, self).__init__()
    self.unsupported_type = 'GLACIER'


def GetPathBeforeFinalDir(url):
  """Returns the path section before the final directory component of the URL.

  This handles cases for file system directories, bucket, and bucket
  subdirectories. Example: for gs://bucket/dir/ we'll return 'gs://bucket',
  and for file://dir we'll return file://

  Args:
    url: StorageUrl representing a filesystem directory, cloud bucket or
         bucket subdir.

  Returns:
    String name of above-described path, sans final path separator.
  """
  sep = url.delim
  if url.IsFileUrl():
    past_scheme = url.url_string[len('file://'):]
    if past_scheme.find(sep) == -1:
      return 'file://'
    else:
      return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0]
  if url.IsBucket():
    return '%s://' % url.scheme
  # Else it names a bucket subdir.
  return url.url_string.rstrip(sep).rpartition(sep)[0]


def _GetPartitionInfo(file_size, max_components, default_component_size):
  """Gets info about a file partition for parallel file/object transfers.

  Args:
    file_size: The number of bytes in the file to be partitioned.
    max_components: The maximum number of components that can be composed.
    default_component_size: The size of a component, assuming that
                            max_components is infinite.
  Returns:
    The number of components in the partitioned file, and the size of each
    component (except the last, which will have a different size iff
    file_size != 0 (mod num_components)).
  """
  # num_components = ceil(file_size / default_component_size)
  num_components = DivideAndCeil(file_size, default_component_size)

  # num_components must be in the range [2, max_components]
  num_components = max(min(num_components, max_components), 2)

  # component_size = ceil(file_size / num_components)
  component_size = DivideAndCeil(file_size, num_components)
  return (num_components, component_size)


def _DeleteTempComponentObjectFn(cls, url_to_delete, thread_state=None):
  """Wrapper func to be used with command.Apply to delete temporary objects."""
  gsutil_api = GetCloudApiInstance(cls, thread_state)
  try:
    gsutil_api.DeleteObject(
        url_to_delete.bucket_name, url_to_delete.object_name,
        generation=url_to_delete.generation, provider=url_to_delete.scheme)
  except NotFoundException:
    # The temporary object could already be gone if a retry was
    # issued at a lower layer but the original request succeeded.
    # Barring other errors, the top-level command should still report success,
    # so don't raise here.
    pass


def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock):
  """Parse the tracker file from the last parallel composite upload attempt.

  If it exists, the tracker file is of the format described in
  _CreateParallelUploadTrackerFile. If the file doesn't exist or cannot be
  read, then the upload will start from the beginning.

  Args:
    tracker_file: The name of the file to parse.
    tracker_file_lock: Lock protecting access to the tracker file.

  Returns:
    random_prefix: A randomly-generated prefix to the name of the
                   temporary components.
    existing_objects: A list of ObjectFromTracker objects representing
                      the set of files that have already been uploaded.
  """

  def GenerateRandomPrefix():
    return str(random.randint(1, (10 ** 10) - 1))

  existing_objects = []
  try:
    with tracker_file_lock:
      with open(tracker_file, 'r') as fp:
        lines = fp.readlines()
        lines = [line.strip() for line in lines]
        if not lines:
          print('Parallel upload tracker file (%s) was invalid. '
                'Restarting upload from scratch.' % tracker_file)
          lines = [GenerateRandomPrefix()]

  except IOError as e:
    # We can't read the tracker file, so generate a new random prefix.
    lines = [GenerateRandomPrefix()]

    # Ignore non-existent file (happens first time an upload
    # is attempted on a file), but warn user for other errors.
    if e.errno != errno.ENOENT:
      # Will restart because we failed to read in the file.
      print('Couldn\'t read parallel upload tracker file (%s): %s. '
            'Restarting upload from scratch.' % (tracker_file, e.strerror))

  # The first line contains the randomly-generated prefix.
  random_prefix = lines[0]

  # The remaining lines were written in pairs to describe a single component
  # in the form:
  #   object_name (without random prefix)
  #   generation
  # Newlines are used as the delimiter because only newlines and carriage
  # returns are invalid characters in object names, and users can specify
  # a custom prefix in the config file.
  i = 1
  while i < len(lines):
    (name, generation) = (lines[i], lines[i+1])
    if not generation:
      # Cover the '' case.
      generation = None
    existing_objects.append(ObjectFromTracker(name, generation))
    i += 2
  return (random_prefix, existing_objects)


def _AppendComponentTrackerToParallelUploadTrackerFile(tracker_file, component,
                                                       tracker_file_lock):
  """Appends info about the uploaded component to an existing tracker file.

  Follows the format described in _CreateParallelUploadTrackerFile.

  Args:
    tracker_file: Tracker file to append to.
    component: Component that was uploaded.
    tracker_file_lock: Thread and process-safe Lock for the tracker file.
  """
  lines = _GetParallelUploadTrackerFileLinesForComponents([component])
  lines = [line + '\n' for line in lines]
  with tracker_file_lock:
    with open(tracker_file, 'a') as f:
      f.writelines(lines)


def _CreateParallelUploadTrackerFile(tracker_file, random_prefix, components,
                                     tracker_file_lock):
  """Writes information about components that were successfully uploaded.

  This way the upload can be resumed at a later date. The tracker file has
  the format:
    random_prefix
    temp_object_1_name
    temp_object_1_generation
    .
    .
    .
    temp_object_N_name
    temp_object_N_generation
    where N is the number of components that have been successfully uploaded.

  Args:
    tracker_file: The name of the parallel upload tracker file.
    random_prefix: The randomly-generated prefix that was used for
                   for uploading any existing components.
    components: A list of ObjectFromTracker objects that were uploaded.
    tracker_file_lock: The lock protecting access to the tracker file.
  """
  lines = [random_prefix]
  lines += _GetParallelUploadTrackerFileLinesForComponents(components)
  lines = [line + '\n' for line in lines]
  try:
    with tracker_file_lock:
      open(tracker_file, 'w').close()  # Clear the file.
      with open(tracker_file, 'w') as f:
        f.writelines(lines)
  except IOError as e:
    RaiseUnwritableTrackerFileException(tracker_file, e.strerror)


def _GetParallelUploadTrackerFileLinesForComponents(components):
  """Return a list of the lines for use in a parallel upload tracker file.

  The lines represent the given components, using the format as described in
  _CreateParallelUploadTrackerFile.

  Args:
    components: A list of ObjectFromTracker objects that were uploaded.

  Returns:
    Lines describing components with their generation for outputting to the
    tracker file.
  """
  lines = []
  for component in components:
    generation = None
    generation = component.generation
    if not generation:
      generation = ''
    lines += [component.object_name, str(generation)]
  return lines


def FilterExistingComponents(dst_args, existing_components, bucket_url,
                             gsutil_api):
  """Determines course of action for component objects.

  Given the list of all target objects based on partitioning the file and
  the list of objects that have already been uploaded successfully,
  this function determines which objects should be uploaded, which
  existing components are still valid, and which existing components should
  be deleted.

  Args:
    dst_args: The map of file_name -> PerformParallelUploadFileToObjectArgs
              calculated by partitioning the file.
    existing_components: A list of ObjectFromTracker objects that have been
                         uploaded in the past.
    bucket_url: CloudUrl of the bucket in which the components exist.
    gsutil_api: gsutil Cloud API instance to use for retrieving object metadata.

  Returns:
    components_to_upload: List of components that need to be uploaded.
    uploaded_components: List of components that have already been
                         uploaded and are still valid.
    existing_objects_to_delete: List of components that have already
                                been uploaded, but are no longer valid
                                and are in a versioned bucket, and
                                therefore should be deleted.
  """
  components_to_upload = []
  existing_component_names = [component.object_name
                              for component in existing_components]
  for component_name in dst_args:
    if component_name not in existing_component_names:
      components_to_upload.append(dst_args[component_name])

  objects_already_chosen = []

  # Don't reuse any temporary components whose MD5 doesn't match the current
  # MD5 of the corresponding part of the file. If the bucket is versioned,
  # also make sure that we delete the existing temporary version.
  existing_objects_to_delete = []
  uploaded_components = []
  for tracker_object in existing_components:
    if (tracker_object.object_name not in dst_args.keys()
        or tracker_object.object_name in objects_already_chosen):
      # This could happen if the component size has changed. This also serves
      # to handle object names that get duplicated in the tracker file due
      # to people doing things they shouldn't (e.g., overwriting an existing
      # temporary component in a versioned bucket).

      url = bucket_url.Clone()
      url.object_name = tracker_object.object_name
      url.generation = tracker_object.generation
      existing_objects_to_delete.append(url)
      continue

    dst_arg = dst_args[tracker_object.object_name]
    file_part = FilePart(dst_arg.filename, dst_arg.file_start,
                         dst_arg.file_length)
    # TODO: calculate MD5's in parallel when possible.
    content_md5 = CalculateB64EncodedMd5FromContents(file_part)

    try:
      # Get the MD5 of the currently-existing component.
      dst_url = dst_arg.dst_url
      dst_metadata = gsutil_api.GetObjectMetadata(
          dst_url.bucket_name, dst_url.object_name,
          generation=dst_url.generation, provider=dst_url.scheme,
          fields=['md5Hash', 'etag'])
      cloud_md5 = dst_metadata.md5Hash
    except Exception:  # pylint: disable=broad-except
      # We don't actually care what went wrong - we couldn't retrieve the
      # object to check the MD5, so just upload it again.
      cloud_md5 = None

    if cloud_md5 != content_md5:
      components_to_upload.append(dst_arg)
      objects_already_chosen.append(tracker_object.object_name)
      if tracker_object.generation:
        # If the old object doesn't have a generation (i.e., it isn't in a
        # versioned bucket), then we will just overwrite it anyway.
        invalid_component_with_generation = dst_arg.dst_url.Clone()
        invalid_component_with_generation.generation = tracker_object.generation
        existing_objects_to_delete.append(invalid_component_with_generation)
    else:
      url = dst_arg.dst_url.Clone()
      url.generation = tracker_object.generation
      uploaded_components.append(url)
      objects_already_chosen.append(tracker_object.object_name)

  if uploaded_components:
    logging.info('Found %d existing temporary components to reuse.',
                 len(uploaded_components))

  return (components_to_upload, uploaded_components,
          existing_objects_to_delete)