# -*- coding: utf-8 -*-
# Copyright 2014 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation of Unix-like rsync command."""

from __future__ import absolute_import

import errno
import heapq
import io
from itertools import islice
import os
import re
import tempfile
import textwrap
import traceback
import urllib

from boto import config
import crcmod

from gslib import copy_helper
from gslib.bucket_listing_ref import BucketListingObject
from gslib.cloud_api import NotFoundException
from gslib.command import Command
from gslib.command import DummyArgChecker
from gslib.command_argument import CommandArgument
from gslib.copy_helper import CreateCopyHelperOpts
from gslib.copy_helper import SkipUnsupportedObjectError
from gslib.cs_api_map import ApiSelector
from gslib.exception import CommandException
from gslib.hashing_helper import CalculateB64EncodedCrc32cFromContents
from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
from gslib.hashing_helper import SLOW_CRCMOD_WARNING
from gslib.plurality_checkable_iterator import PluralityCheckableIterator
from gslib.sig_handling import GetCaughtSignals
from gslib.sig_handling import RegisterSignalHandler
from gslib.storage_url import StorageUrlFromString
from gslib.util import GetCloudApiInstance
from gslib.util import IsCloudSubdirPlaceholder
from gslib.util import TEN_MIB
from gslib.util import UsingCrcmodExtension
from gslib.util import UTF8
from gslib.wildcard_iterator import CreateWildcardIterator


_SYNOPSIS = """
  gsutil rsync [-c] [-C] [-d] [-e] [-n] [-p] [-r] [-U] [-x] src_url dst_url
"""

_DETAILED_HELP_TEXT = ("""
<B>SYNOPSIS</B>
""" + _SYNOPSIS + """


<B>DESCRIPTION</B>
  The gsutil rsync command makes the contents under dst_url the same as the
  contents under src_url, by copying any missing files/objects, and (if the
  -d option is specified) deleting any extra files/objects. For example, to
  make gs://mybucket/data match the contents of the local directory "data"
  you could do:

    gsutil rsync -d data gs://mybucket/data

  To recurse into directories use the -r option:

    gsutil rsync -d -r data gs://mybucket/data

  To copy only new/changed files without deleting extra files from
  gs://mybucket/data leave off the -d option:

    gsutil rsync -r data gs://mybucket/data

  If you have a large number of objects to synchronize you might want to use the
  gsutil -m option, to perform parallel (multi-threaded/multi-processing)
  synchronization:

    gsutil -m rsync -d -r data gs://mybucket/data

  The -m option typically will provide a large performance boost if either the
  source or destination (or both) is a cloud URL. If both source and
  destination are file URLs the -m option will typically thrash the disk and
  slow synchronization down.

  To make the local directory "data" the same as the contents of
  gs://mybucket/data:

    gsutil rsync -d -r gs://mybucket/data data

  To make the contents of gs://mybucket2 the same as gs://mybucket1:

    gsutil rsync -d -r gs://mybucket1 gs://mybucket2

  You can also mirror data across local directories:

    gsutil rsync -d -r dir1 dir2

  To mirror your content across clouds:

    gsutil rsync -d -r gs://my-gs-bucket s3://my-s3-bucket

  Note: If you are synchronizing a large amount of data between clouds you might
  consider setting up a
  `Google Compute Engine <https://cloud.google.com/products/compute-engine>`_
  account and running gsutil there. Since cross-provider gsutil data transfers
  flow through the machine where gsutil is running, doing this can make your
  transfer run significantly faster than running gsutil on your local
  workstation.


<B>BE CAREFUL WHEN USING -d OPTION!</B>
  The rsync -d option is very useful and commonly used, because it provides a
  means of making the contents of a destination bucket or directory match those
  of a source bucket or directory. However, please exercise caution when you
  use this option: It's possible to delete large amounts of data accidentally
  if, for example, you erroneously reverse source and destination. For example,
  if you meant to synchronize a local directory from a bucket in the cloud but
  instead run the command:

    gsutil -m rsync -r -d ./your-dir gs://your-bucket

  and your-dir is currently empty, you will quickly delete all of the objects in
  gs://your-bucket.

  You can also cause large amounts of data to be lost quickly by specifying a
  subdirectory of the destination as the source of an rsync. For example, the
  command:

    gsutil -m rsync -r -d gs://your-bucket/data gs://your-bucket

  would cause most or all of the objects in gs://your-bucket to be deleted
  (some objects may survive if there are any with names that sort lower than
  "data" under gs://your-bucket/data).

  In addition to paying careful attention to the source and destination you
  specify with the rsync command, there are two more safety measures your can
  take when using gsutil rsync -d:

    1. Try running the command with the rsync -n option first, to see what it
       would do without actually performing the operations. For example, if
       you run the command:

         gsutil -m rsync -r -d -n gs://your-bucket/data gs://your-bucket
       
       it will be immediately evident that running that command without the -n
       option would cause many objects to be deleted.

    2. Enable object versioning in your bucket, which will allow you to restore
       objects if you accidentally delete them. For more details see
       "gsutil help versions".


<B>IMPACT OF BUCKET LISTING EVENTUAL CONSISTENCY</B>
  The rsync command operates by listing the source and destination URLs, and
  then performing copy and remove operations according to the differences
  between these listings. Because bucket listing is eventually (not strongly)
  consistent, if you upload new objects or delete objects from a bucket and then
  immediately run gsutil rsync with that bucket as the source or destination,
  it's possible the rsync command will not see the recent updates and thus
  synchronize incorrectly. You can rerun the rsync operation again later to
  correct the incorrect synchronization.


<B>CHECKSUM VALIDATION AND FAILURE HANDLING</B>
  At the end of every upload or download, the gsutil rsync command validates
  that the checksum of the source file/object matches the checksum of the
  destination file/object. If the checksums do not match, gsutil will delete
  the invalid copy and print a warning message. This very rarely happens, but
  if it does, please contact gs-team@google.com.

  The rsync command will retry when failures occur, but if enough failures
  happen during a particular copy or delete operation the command will skip that
  object and move on. At the end of the synchronization run if any failures were
  not successfully retried, the rsync command will report the count of failures,
  and exit with non-zero status. At this point you can run the rsync command
  again, and it will attempt any remaining needed copy and/or delete operations.

  Note that there are cases where retrying will never succeed, such as if you
  don't have write permission to the destination bucket or if the destination
  path for some objects is longer than the maximum allowed length.

  For more details about gsutil's retry handling, please see
  "gsutil help retries".


<B>CHANGE DETECTION ALGORITHM</B>
  To determine if a file or object has changed gsutil rsync first checks whether
  the source and destination sizes match. If they match, it next checks if their
  checksums match, using checksums if available (see below). Unlike the Unix
  rsync command, gsutil rsync does not use timestamps to determine if the
  file/object changed, because the GCS API does not permit the caller to set an
  object's timestamp (hence, timestamps of identical files/objects cannot be
  made to match).

  Checksums will not be available in two cases:

  1. When synchronizing to or from a file system. By default, gsutil does not
     checksum files, because of the slowdown caused when working with large
     files. You can cause gsutil to checksum files by using the gsutil rsync -c
     option, at the cost of increased local disk I/O and run time when working
     with large files. You should consider using the -c option if your files can
     change without changing sizes (e.g., if you have files that contain fixed
     width data, such as timestamps).

  2. When comparing composite GCS objects with objects at a cloud provider that
     does not support CRC32C (which is the only checksum available for composite
     objects). See 'gsutil help compose' for details about composite objects.


<B>COPYING IN THE CLOUD AND METADATA PRESERVATION</B>
  If both the source and destination URL are cloud URLs from the same provider,
  gsutil copies data "in the cloud" (i.e., without downloading to and uploading
  from the machine where you run gsutil). In addition to the performance and
  cost advantages of doing this, copying in the cloud preserves metadata (like
  Content-Type and Cache-Control). In contrast, when you download data from the
  cloud it ends up in a file, which has no associated metadata. Thus, unless you
  have some way to hold on to or re-create that metadata, synchronizing a bucket
  to a directory in the local file system will not retain the metadata.

  Note that by default, the gsutil rsync command does not copy the ACLs of
  objects being synchronized and instead will use the default bucket ACL (see
  "gsutil help defacl"). You can override this behavior with the -p option (see
  OPTIONS below).


<B>SLOW CHECKSUMS</B>
  If you find that CRC32C checksum computation runs slowly, this is likely
  because you don't have a compiled CRC32c on your system. Try running:

    gsutil ver -l

  If the output contains:

    compiled crcmod: False

  you are running a Python library for computing CRC32C, which is much slower
  than using the compiled code. For information on getting a compiled CRC32C
  implementation, see 'gsutil help crc32c'.


<B>LIMITATIONS</B>
  1. The gsutil rsync command doesn't make the destination object's timestamps
     match those of the source object (it can't; timestamp setting is not
     allowed by the GCS API).

  2. The gsutil rsync command considers only the current object generations in
     the source and destination buckets when deciding what to copy / delete. If
     versioning is enabled in the destination bucket then gsutil rsync's
     overwriting or deleting objects will end up creating versions, but the
     command doesn't try to make the archived generations match in the source
     and destination buckets.



<B>OPTIONS</B>
  -c            Causes the rsync command to compute checksums for files if the
                size of source and destination match, and then compare
                checksums.  This option increases local disk I/O and run time
                if either src_url or dst_url are on the local file system.

  -C            If an error occurs, continue to attempt to copy the remaining
                files. If errors occurred, gsutil's exit status will be non-zero
                even if this flag is set. This option is implicitly set when
                running "gsutil -m rsync...".  Note: -C only applies to the
                actual copying operation. If an error occurs while iterating
                over the files in the local directory (e.g., invalid Unicode
                file name) gsutil will print an error message and abort.

  -d            Delete extra files under dst_url not found under src_url. By
                default extra files are not deleted. Note: this option can
                delete data quickly if you specify the wrong source/destination
                combination. See the help section above,
                "BE CAREFUL WHEN USING -d OPTION!".

  -e            Exclude symlinks. When specified, symbolic links will be
                ignored.

  -n            Causes rsync to run in "dry run" mode, i.e., just outputting
                what would be copied or deleted without actually doing any
                copying/deleting.

  -p            Causes ACLs to be preserved when synchronizing in the cloud.
                Note that this option has performance and cost implications when
                using the XML API, as it requires separate HTTP calls for
                interacting with ACLs. The performance issue can be mitigated to
                some degree by using gsutil -m rsync to cause parallel
                synchronization. Also, this option only works if you have OWNER
                access to all of the objects that are copied.

                You can avoid the additional performance and cost of using
                rsync -p if you want all objects in the destination bucket to
                end up with the same ACL by setting a default object ACL on that
                bucket instead of using rsync -p. See 'help gsutil defacl'.

  -R, -r        Causes directories, buckets, and bucket subdirectories to be
                synchronized recursively. If you neglect to use this option
                gsutil will make only the top-level directory in the source
                and destination URLs match, skipping any sub-directories.

  -U            Skip objects with unsupported object types instead of failing.
                Unsupported object types are Amazon S3 Objects in the GLACIER
                storage class.

  -x pattern    Causes files/objects matching pattern to be excluded, i.e., any
                matching files/objects will not be copied or deleted. Note that
                the pattern is a Python regular expression, not a wildcard (so,
                matching any string ending in 'abc' would be specified using
                '.*abc' rather than '*abc'). Note also that the exclude path is
                always relative (similar to Unix rsync or tar exclude options).
                For example, if you run the command:

                  gsutil rsync -x 'data./.*\\.txt' dir gs://my-bucket

                it will skip the file dir/data1/a.txt.

                You can use regex alternation to specify multiple exclusions,
                for example:

                  gsutil rsync -x '.*\\.txt|.*\\.jpg' dir gs://my-bucket
""")


class _DiffAction(object):
  COPY = 'copy'
  REMOVE = 'remove'


_NA = '-'
_OUTPUT_BUFFER_SIZE = 64 * 1024
_PROGRESS_REPORT_LISTING_COUNT = 10000


# Tracks files we need to clean up at end or if interrupted.
_tmp_files = []


# pylint: disable=unused-argument
def _HandleSignals(signal_num, cur_stack_frame):
  """Called when rsync command is killed with SIGINT, SIGQUIT or SIGTERM."""
  CleanUpTempFiles()


def CleanUpTempFiles():
  """Cleans up temp files.

  This function allows the main (RunCommand) function to clean up at end of
  operation, or if gsutil rsync is interrupted (e.g., via ^C). This is necessary
  because tempfile.NamedTemporaryFile doesn't allow the created file to be
  re-opened in read mode on Windows, so we have to use tempfile.mkstemp, which
  doesn't automatically delete temp files.
  """
  try:
    for fname in _tmp_files:
      os.unlink(fname)
  except:  # pylint: disable=bare-except
    pass


class _DiffToApply(object):
  """Class that encapsulates info needed to apply diff for one object."""

  def __init__(self, src_url_str, dst_url_str, diff_action):
    """Constructor.

    Args:
      src_url_str: The source URL string, or None if diff_action is REMOVE.
      dst_url_str: The destination URL string.
      diff_action: _DiffAction to be applied.
    """
    self.src_url_str = src_url_str
    self.dst_url_str = dst_url_str
    self.diff_action = diff_action


def _DiffToApplyArgChecker(command_instance, diff_to_apply):
  """Arg checker that skips symlinks if -e flag specified."""
  if (diff_to_apply.diff_action == _DiffAction.REMOVE
      or not command_instance.exclude_symlinks):
    # No src URL is populated for REMOVE actions.
    return True
  exp_src_url = StorageUrlFromString(diff_to_apply.src_url_str)
  if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name):
    command_instance.logger.info('Skipping symbolic link %s...', exp_src_url)
    return False
  return True


def _ComputeNeededFileChecksums(logger, src_url_str, src_size, src_crc32c,
                                src_md5, dst_url_str, dst_size, dst_crc32c,
                                dst_md5):
  """Computes any file checksums needed by _ObjectsMatch.

  Args:
    logger: logging.logger for outputting log messages.
    src_url_str: Source URL string.
    src_size: Source size
    src_crc32c: Source CRC32c.
    src_md5: Source MD5.
    dst_url_str: Destination URL string.
    dst_size: Destination size
    dst_crc32c: Destination CRC32c.
    dst_md5: Destination MD5.

  Returns:
    (src_crc32c, src_md5, dst_crc32c, dst_md5)
  """
  src_url = StorageUrlFromString(src_url_str)
  dst_url = StorageUrlFromString(dst_url_str)
  if src_url.IsFileUrl():
    if dst_crc32c != _NA or dst_url.IsFileUrl():
      if src_size > TEN_MIB:
        logger.info('Computing MD5 for %s...', src_url_str)
      with open(src_url.object_name, 'rb') as fp:
        src_crc32c = CalculateB64EncodedCrc32cFromContents(fp)
    elif dst_md5 != _NA or dst_url.IsFileUrl():
      if dst_size > TEN_MIB:
        logger.info('Computing MD5 for %s...', dst_url_str)
      with open(src_url.object_name, 'rb') as fp:
        src_md5 = CalculateB64EncodedMd5FromContents(fp)
  if dst_url.IsFileUrl():
    if src_crc32c != _NA:
      if src_size > TEN_MIB:
        logger.info('Computing CRC32C for %s...', src_url_str)
      with open(dst_url.object_name, 'rb') as fp:
        dst_crc32c = CalculateB64EncodedCrc32cFromContents(fp)
    elif src_md5 != _NA:
      if dst_size > TEN_MIB:
        logger.info('Computing CRC32C for %s...', dst_url_str)
      with open(dst_url.object_name, 'rb') as fp:
        dst_md5 = CalculateB64EncodedMd5FromContents(fp)
  return (src_crc32c, src_md5, dst_crc32c, dst_md5)


def _ListUrlRootFunc(cls, args_tuple, thread_state=None):
  """Worker function for listing files/objects under to be sync'd.

  Outputs sorted list to out_file_name, formatted per _BuildTmpOutputLine. We
  sort the listed URLs because we don't want to depend on consistent sort
  order across file systems and cloud providers.

  Args:
    cls: Command instance.
    args_tuple: (base_url_str, out_file_name, desc), where base_url_str is
                top-level URL string to list; out_filename is name of file to
                which sorted output should be written; desc is 'source' or
                'destination'.
    thread_state: gsutil Cloud API instance to use.
  """
  gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
  (base_url_str, out_filename, desc) = args_tuple
  # We sort while iterating over base_url_str, allowing parallelism of batched
  # sorting with collecting the listing.
  out_file = io.open(out_filename, mode='w', encoding=UTF8)
  try:
    _BatchSort(_FieldedListingIterator(cls, gsutil_api, base_url_str, desc),
               out_file)
  except Exception as e:  # pylint: disable=broad-except
    # Abandon rsync if an exception percolates up to this layer - retryable
    # exceptions are handled in the lower layers, so we got a non-retryable
    # exception (like 404 bucket not found) and proceeding would either be
    # futile or could result in data loss - for example:
    #     gsutil rsync -d gs://non-existent-bucket ./localdir
    # would delete files from localdir.
    cls.logger.error(
        'Caught non-retryable exception while listing %s: %s' %
        (base_url_str, e))
    cls.non_retryable_listing_failures = 1
  out_file.close()


def _LocalDirIterator(base_url):
  """A generator that yields a BLR for each file in a local directory.

     We use this function instead of WildcardIterator for listing a local
     directory without recursion, because the glob.globi implementation called
     by WildcardIterator skips "dot" files (which we don't want to do when
     synchronizing to or from a local directory).

  Args:
    base_url: URL for the directory over which to iterate.

  Yields:
    BucketListingObject for each file in the directory.
  """
  for filename in os.listdir(base_url.object_name):
    filename = os.path.join(base_url.object_name, filename)
    if os.path.isfile(filename):
      yield BucketListingObject(StorageUrlFromString(filename), None)


def _FieldedListingIterator(cls, gsutil_api, base_url_str, desc):
  """Iterator over base_url_str formatting output per _BuildTmpOutputLine.

  Args:
    cls: Command instance.
    gsutil_api: gsutil Cloud API instance to use for bucket listing.
    base_url_str: The top-level URL string over which to iterate.
    desc: 'source' or 'destination'.

  Yields:
    Output line formatted per _BuildTmpOutputLine.
  """
  base_url = StorageUrlFromString(base_url_str)
  if base_url.scheme == 'file' and not cls.recursion_requested:
    iterator = _LocalDirIterator(base_url)
  else:
    if cls.recursion_requested:
      wildcard = '%s/**' % base_url_str.rstrip('/\\')
    else:
      wildcard = '%s/*' % base_url_str.rstrip('/\\')
    iterator = CreateWildcardIterator(
        wildcard, gsutil_api, debug=cls.debug,
        project_id=cls.project_id).IterObjects(
            # Request just the needed fields, to reduce bandwidth usage.
            bucket_listing_fields=['crc32c', 'md5Hash', 'name', 'size'])

  i = 0
  for blr in iterator:
    # Various GUI tools (like the GCS web console) create placeholder objects
    # ending with '/' when the user creates an empty directory. Normally these
    # tools should delete those placeholders once objects have been written
    # "under" the directory, but sometimes the placeholders are left around.
    # We need to filter them out here, otherwise if the user tries to rsync
    # from GCS to a local directory it will result in a directory/file
    # conflict (e.g., trying to download an object called "mydata/" where the
    # local directory "mydata" exists).
    url = blr.storage_url
    if IsCloudSubdirPlaceholder(url, blr=blr):
      # We used to output the message 'Skipping cloud sub-directory placeholder
      # object...' but we no longer do so because it caused customer confusion.
      continue
    if (cls.exclude_symlinks and url.IsFileUrl()
        and os.path.islink(url.object_name)):
      continue
    if cls.exclude_pattern:
      str_to_check = url.url_string[len(base_url_str):]
      if str_to_check.startswith(url.delim):
        str_to_check = str_to_check[1:]
      if cls.exclude_pattern.match(str_to_check):
        continue
    i += 1
    if i % _PROGRESS_REPORT_LISTING_COUNT == 0:
      cls.logger.info('At %s listing %d...', desc, i)
    yield _BuildTmpOutputLine(blr)


def _BuildTmpOutputLine(blr):
  """Builds line to output to temp file for given BucketListingRef.

  Args:
    blr: The BucketListingRef.

  Returns:
    The output line, formatted as _EncodeUrl(URL)<sp>size<sp>crc32c<sp>md5
    where crc32c will only be present for GCS URLs, and md5 will only be
    present for cloud URLs that aren't composite objects. A missing field is
    populated with '-'.
  """
  crc32c = _NA
  md5 = _NA
  url = blr.storage_url
  if url.IsFileUrl():
    size = os.path.getsize(url.object_name)
  elif url.IsCloudUrl():
    size = blr.root_object.size
    crc32c = blr.root_object.crc32c or _NA
    md5 = blr.root_object.md5Hash or _NA
  else:
    raise CommandException('Got unexpected URL type (%s)' % url.scheme)
  return '%s %d %s %s\n' % (_EncodeUrl(url.url_string), size, crc32c, md5)


def _EncodeUrl(url_string):
  """Encodes url_str with quote plus encoding and UTF8 character encoding.

  We use this for all URL encodings.

  Args:
    url_string: String URL to encode.

  Returns:
    encoded URL.
  """
  return urllib.quote_plus(url_string.encode(UTF8))


def _DecodeUrl(enc_url_string):
  """Inverts encoding from EncodeUrl.

  Args:
    enc_url_string: String URL to decode.

  Returns:
    decoded URL.
  """
  return urllib.unquote_plus(enc_url_string).decode(UTF8)


# pylint: disable=bare-except
def _BatchSort(in_iter, out_file):
  """Sorts input lines from in_iter and outputs to out_file.

  Sorts in batches as input arrives, so input file does not need to be loaded
  into memory all at once. Derived from Python Recipe 466302: Sorting big
  files the Python 2.4 way by Nicolas Lehuen.

  Sorted format is per _BuildTmpOutputLine. We're sorting on the entire line
  when we could just sort on the first record (URL); but the sort order is
  identical either way.

  Args:
    in_iter: Input iterator.
    out_file: Output file.
  """
  # Note: If chunk_files gets very large we can run out of open FDs. See .boto
  # file comments about rsync_buffer_lines. If increasing rsync_buffer_lines
  # doesn't suffice (e.g., for someone synchronizing with a really large
  # bucket), an option would be to make gsutil merge in passes, never
  # opening all chunk files simultaneously.
  buffer_size = config.getint('GSUtil', 'rsync_buffer_lines', 32000)
  chunk_files = []
  try:
    while True:
      current_chunk = sorted(islice(in_iter, buffer_size))
      if not current_chunk:
        break
      output_chunk = io.open('%s-%06i' % (out_file.name, len(chunk_files)),
                             mode='w+', encoding=UTF8)
      chunk_files.append(output_chunk)
      output_chunk.writelines(unicode(''.join(current_chunk)))
      output_chunk.flush()
      output_chunk.seek(0)
    out_file.writelines(heapq.merge(*chunk_files))
  except IOError as e:
    if e.errno == errno.EMFILE:
      raise CommandException('\n'.join(textwrap.wrap(
          'Synchronization failed because too many open file handles were '
          'needed while building synchronization state. Please see the '
          'comments about rsync_buffer_lines in your .boto config file for a '
          'possible way to address this problem.')))
    raise
  finally:
    for chunk_file in chunk_files:
      try:
        chunk_file.close()
        os.remove(chunk_file.name)
      except:
        pass


class _DiffIterator(object):
  """Iterator yielding sequence of _DiffToApply objects."""

  def __init__(self, command_obj, base_src_url, base_dst_url):
    self.command_obj = command_obj
    self.compute_file_checksums = command_obj.compute_file_checksums
    self.delete_extras = command_obj.delete_extras
    self.recursion_requested = command_obj.recursion_requested
    self.logger = self.command_obj.logger
    self.base_src_url = base_src_url
    self.base_dst_url = base_dst_url
    self.logger.info('Building synchronization state...')

    (src_fh, self.sorted_list_src_file_name) = tempfile.mkstemp(
        prefix='gsutil-rsync-src-')
    _tmp_files.append(self.sorted_list_src_file_name)
    (dst_fh, self.sorted_list_dst_file_name) = tempfile.mkstemp(
        prefix='gsutil-rsync-dst-')
    _tmp_files.append(self.sorted_list_dst_file_name)
    # Close the file handles; the file will be opened in write mode by
    # _ListUrlRootFunc.
    os.close(src_fh)
    os.close(dst_fh)

    # Build sorted lists of src and dst URLs in parallel. To do this, pass args
    # to _ListUrlRootFunc as tuple (base_url_str, out_filename, desc)
    # where base_url_str is the starting URL string for listing.
    args_iter = iter([
        (self.base_src_url.url_string, self.sorted_list_src_file_name,
         'source'),
        (self.base_dst_url.url_string, self.sorted_list_dst_file_name,
         'destination')
    ])

    # Contains error message from non-retryable listing failure.
    command_obj.non_retryable_listing_failures = 0
    shared_attrs = ['non_retryable_listing_failures']
    command_obj.Apply(_ListUrlRootFunc, args_iter, _RootListingExceptionHandler,
                      shared_attrs, arg_checker=DummyArgChecker,
                      parallel_operations_override=True,
                      fail_on_error=True)

    if command_obj.non_retryable_listing_failures:
      raise CommandException('Caught non-retryable exception - aborting rsync')

    self.sorted_list_src_file = open(self.sorted_list_src_file_name, 'r')
    self.sorted_list_dst_file = open(self.sorted_list_dst_file_name, 'r')

    # Wrap iterators in PluralityCheckableIterator so we can check emptiness.
    self.sorted_src_urls_it = PluralityCheckableIterator(
        iter(self.sorted_list_src_file))
    self.sorted_dst_urls_it = PluralityCheckableIterator(
        iter(self.sorted_list_dst_file))

  def _ParseTmpFileLine(self, line):
    """Parses output from _BuildTmpOutputLine.

    Parses into tuple:
      (URL, size, crc32c, md5)
    where crc32c and/or md5 can be _NA.

    Args:
      line: The line to parse.

    Returns:
      Parsed tuple: (url, size, crc32c, md5)
    """
    (encoded_url, size, crc32c, md5) = line.split()
    return (_DecodeUrl(encoded_url), int(size), crc32c, md5.strip())

  def _WarnIfMissingCloudHash(self, url_str, crc32c, md5):
    """Warns if given url_str is a cloud URL and is missing both crc32c and md5.

    Args:
      url_str: Destination URL string.
      crc32c: Destination CRC32c.
      md5: Destination MD5.

    Returns:
      True if issued warning.
    """
    # One known way this can currently happen is when rsync'ing objects larger
    # than 5 GB from S3 (for which the etag is not an MD5).
    if (StorageUrlFromString(url_str).IsCloudUrl()
        and crc32c == _NA and md5 == _NA):
      self.logger.warn(
          'Found no hashes to validate %s. Integrity cannot be assured without '
          'hashes.', url_str)
      return True
    return False

  def _ObjectsMatch(self, src_url_str, src_size, src_crc32c, src_md5,
                    dst_url_str, dst_size, dst_crc32c, dst_md5):
    """Returns True if src and dst objects are the same.

    Uses size plus whatever checksums are available.

    Args:
      src_url_str: Source URL string.
      src_size: Source size
      src_crc32c: Source CRC32c.
      src_md5: Source MD5.
      dst_url_str: Destination URL string.
      dst_size: Destination size
      dst_crc32c: Destination CRC32c.
      dst_md5: Destination MD5.

    Returns:
      True/False.
    """
    # Note: This function is called from __iter__, which is called from the
    # Command.Apply driver. Thus, all checksum computation will be run in a
    # single thread, which is good (having multiple threads concurrently
    # computing checksums would thrash the disk).
    if src_size != dst_size:
      return False
    if self.compute_file_checksums:
      (src_crc32c, src_md5, dst_crc32c, dst_md5) = _ComputeNeededFileChecksums(
          self.logger, src_url_str, src_size, src_crc32c, src_md5, dst_url_str,
          dst_size, dst_crc32c, dst_md5)
    if src_md5 != _NA and dst_md5 != _NA:
      self.logger.debug('Comparing md5 for %s and %s', src_url_str, dst_url_str)
      return src_md5 == dst_md5
    if src_crc32c != _NA and dst_crc32c != _NA:
      self.logger.debug(
          'Comparing crc32c for %s and %s', src_url_str, dst_url_str)
      return src_crc32c == dst_crc32c
    if not self._WarnIfMissingCloudHash(src_url_str, src_crc32c, src_md5):
      self._WarnIfMissingCloudHash(dst_url_str, dst_crc32c, dst_md5)
    # Without checksums to compare we depend only on basic size comparison.
    return True

  def __iter__(self):
    """Iterates over src/dst URLs and produces a _DiffToApply sequence.

    Yields:
      The _DiffToApply.
    """
    # Strip trailing slashes, if any, so we compute tail length against
    # consistent position regardless of whether trailing slashes were included
    # or not in URL.
    base_src_url_len = len(self.base_src_url.url_string.rstrip('/\\'))
    base_dst_url_len = len(self.base_dst_url.url_string.rstrip('/\\'))
    src_url_str = dst_url_str = None
    # Invariant: After each yield, the URLs in src_url_str, dst_url_str,
    # self.sorted_src_urls_it, and self.sorted_dst_urls_it are not yet
    # processed. Each time we encounter None in src_url_str or dst_url_str we
    # populate from the respective iterator, and we reset one or the other value
    # to None after yielding an action that disposes of that URL.
    while not self.sorted_src_urls_it.IsEmpty() or src_url_str is not None:
      if src_url_str is None:
        (src_url_str, src_size, src_crc32c, src_md5) = self._ParseTmpFileLine(
            self.sorted_src_urls_it.next())
        # Skip past base URL and normalize slashes so we can compare across
        # clouds/file systems (including Windows).
        src_url_str_to_check = _EncodeUrl(
            src_url_str[base_src_url_len:].replace('\\', '/'))
        dst_url_str_would_copy_to = copy_helper.ConstructDstUrl(
            self.base_src_url, StorageUrlFromString(src_url_str), True, True,
            self.base_dst_url, False, self.recursion_requested).url_string
      if self.sorted_dst_urls_it.IsEmpty():
        # We've reached end of dst URLs, so copy src to dst.
        yield _DiffToApply(
            src_url_str, dst_url_str_would_copy_to, _DiffAction.COPY)
        src_url_str = None
        continue
      if not dst_url_str:
        (dst_url_str, dst_size, dst_crc32c, dst_md5) = (
            self._ParseTmpFileLine(self.sorted_dst_urls_it.next()))
        # Skip past base URL and normalize slashes so we can compare acros
        # clouds/file systems (including Windows).
        dst_url_str_to_check = _EncodeUrl(
            dst_url_str[base_dst_url_len:].replace('\\', '/'))

      if src_url_str_to_check < dst_url_str_to_check:
        # There's no dst object corresponding to src object, so copy src to dst.
        yield _DiffToApply(
            src_url_str, dst_url_str_would_copy_to, _DiffAction.COPY)
        src_url_str = None
      elif src_url_str_to_check > dst_url_str_to_check:
        # dst object without a corresponding src object, so remove dst if -d
        # option was specified.
        if self.delete_extras:
          yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE)
        dst_url_str = None
      else:
        # There is a dst object corresponding to src object, so check if objects
        # match.
        if self._ObjectsMatch(
            src_url_str, src_size, src_crc32c, src_md5,
            dst_url_str, dst_size, dst_crc32c, dst_md5):
          # Continue iterating without yielding a _DiffToApply.
          pass
        else:
          yield _DiffToApply(src_url_str, dst_url_str, _DiffAction.COPY)
        src_url_str = None
        dst_url_str = None

    # If -d option specified any files/objects left in dst iteration should be
    # removed.
    if not self.delete_extras:
      return
    if dst_url_str:
      yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE)
      dst_url_str = None
    for line in self.sorted_dst_urls_it:
      (dst_url_str, _, _, _) = self._ParseTmpFileLine(line)
      yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE)


def _RsyncFunc(cls, diff_to_apply, thread_state=None):
  """Worker function for performing the actual copy and remove operations."""
  gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
  dst_url_str = diff_to_apply.dst_url_str
  dst_url = StorageUrlFromString(dst_url_str)
  if diff_to_apply.diff_action == _DiffAction.REMOVE:
    if cls.dryrun:
      cls.logger.info('Would remove %s', dst_url)
    else:
      cls.logger.info('Removing %s', dst_url)
      if dst_url.IsFileUrl():
        os.unlink(dst_url.object_name)
      else:
        try:
          gsutil_api.DeleteObject(
              dst_url.bucket_name, dst_url.object_name,
              generation=dst_url.generation, provider=dst_url.scheme)
        except NotFoundException:
          # If the object happened to be deleted by an external process, this
          # is fine because it moves us closer to the desired state.
          pass
  elif diff_to_apply.diff_action == _DiffAction.COPY:
    src_url_str = diff_to_apply.src_url_str
    src_url = StorageUrlFromString(src_url_str)
    if cls.dryrun:
      cls.logger.info('Would copy %s to %s', src_url, dst_url)
    else:
      try:
        copy_helper.PerformCopy(cls.logger, src_url, dst_url, gsutil_api, cls,
                                _RsyncExceptionHandler,
                                headers=cls.headers)
      except SkipUnsupportedObjectError, e:
        cls.logger.info('Skipping item %s with unsupported object type %s',
                        src_url, e.unsupported_type)

  else:
    raise CommandException('Got unexpected DiffAction (%d)'
                           % diff_to_apply.diff_action)


def _RootListingExceptionHandler(cls, e):
  """Simple exception handler for exceptions during listing URLs to sync."""
  cls.logger.error(str(e))


def _RsyncExceptionHandler(cls, e):
  """Simple exception handler to allow post-completion status."""
  cls.logger.error(str(e))
  cls.op_failure_count += 1
  cls.logger.debug('\n\nEncountered exception while syncing:\n%s\n',
                   traceback.format_exc())


class RsyncCommand(Command):
  """Implementation of gsutil rsync command."""

  # Command specification. See base class for documentation.
  command_spec = Command.CreateCommandSpec(
      'rsync',
      command_name_aliases=[],
      usage_synopsis=_SYNOPSIS,
      min_args=2,
      max_args=2,
      supported_sub_args='cCdenprRUx:',
      file_url_ok=True,
      provider_url_ok=False,
      urls_start_arg=0,
      gs_api_support=[ApiSelector.XML, ApiSelector.JSON],
      gs_default_api=ApiSelector.JSON,
      argparse_arguments=[
          CommandArgument.MakeNCloudOrFileURLsArgument(2)
      ]
  )
  # Help specification. See help_provider.py for documentation.
  help_spec = Command.HelpSpec(
      help_name='rsync',
      help_name_aliases=['sync', 'synchronize'],
      help_type='command_help',
      help_one_line_summary='Synchronize content of two buckets/directories',
      help_text=_DETAILED_HELP_TEXT,
      subcommand_help_text={},
  )
  total_bytes_transferred = 0

  def _InsistContainer(self, url_str, treat_nonexistent_object_as_subdir):
    """Sanity checks that URL names an existing container.

    Args:
      url_str: URL string to check.
      treat_nonexistent_object_as_subdir: indicates if should treat a
                                          non-existent object as a subdir.

    Returns:
      URL for checked string.

    Raises:
      CommandException if url_str doesn't name an existing container.
    """
    (url, have_existing_container) = (
        copy_helper.ExpandUrlToSingleBlr(url_str, self.gsutil_api, self.debug,
                                         self.project_id,
                                         treat_nonexistent_object_as_subdir))
    if not have_existing_container:
      raise CommandException(
          'arg (%s) does not name a directory, bucket, or bucket subdir.'
          % url_str)
    return url

  def RunCommand(self):
    """Command entry point for the rsync command."""
    self._ParseOpts()
    if self.compute_file_checksums and not UsingCrcmodExtension(crcmod):
      self.logger.warn(SLOW_CRCMOD_WARNING)

    src_url = self._InsistContainer(self.args[0], False)
    dst_url = self._InsistContainer(self.args[1], True)

    # Tracks if any copy or rm operations failed.
    self.op_failure_count = 0

    # List of attributes to share/manage across multiple processes in
    # parallel (-m) mode.
    shared_attrs = ['op_failure_count']

    for signal_num in GetCaughtSignals():
      RegisterSignalHandler(signal_num, _HandleSignals)

    # Perform sync requests in parallel (-m) mode, if requested, using
    # configured number of parallel processes and threads. Otherwise,
    # perform requests with sequential function calls in current process.
    diff_iterator = _DiffIterator(self, src_url, dst_url)
    self.logger.info('Starting synchronization')
    try:
      self.Apply(_RsyncFunc, diff_iterator, _RsyncExceptionHandler,
                 shared_attrs, arg_checker=_DiffToApplyArgChecker,
                 fail_on_error=True)
    finally:
      CleanUpTempFiles()

    if self.op_failure_count:
      plural_str = 's' if self.op_failure_count else ''
      raise CommandException(
          '%d file%s/object%s could not be copied/removed.' %
          (self.op_failure_count, plural_str, plural_str))

  def _ParseOpts(self):
    # exclude_symlinks is handled by Command parent class, so save in Command
    # state rather than CopyHelperOpts.
    self.exclude_symlinks = False
    # continue_on_error is handled by Command parent class, so save in Command
    # state rather than CopyHelperOpts.
    self.continue_on_error = False
    self.delete_extras = False
    preserve_acl = False
    self.compute_file_checksums = False
    self.dryrun = False
    self.exclude_pattern = None
    self.skip_unsupported_objects = False
    # self.recursion_requested is initialized in command.py (so it can be
    # checked in parent class for all commands).

    if self.sub_opts:
      for o, a in self.sub_opts:
        if o == '-c':
          self.compute_file_checksums = True
        # Note: In gsutil cp command this is specified using -c but here we use
        # -C so we can use -c for checksum arg (to be consistent with Unix rsync
        # command options).
        elif o == '-C':
          self.continue_on_error = True
        elif o == '-d':
          self.delete_extras = True
        elif o == '-e':
          self.exclude_symlinks = True
        elif o == '-n':
          self.dryrun = True
        elif o == '-p':
          preserve_acl = True
        elif o == '-r' or o == '-R':
          self.recursion_requested = True
        elif o == '-U':
          self.skip_unsupported_objects = True
        elif o == '-x':
          if not a:
            raise CommandException('Invalid blank exclude filter')
          try:
            self.exclude_pattern = re.compile(a)
          except re.error:
            raise CommandException('Invalid exclude filter (%s)' % a)
    return CreateCopyHelperOpts(
        preserve_acl=preserve_acl,
        skip_unsupported_objects=self.skip_unsupported_objects)