普通文本  |  457行  |  17.54 KB

# -*- coding: utf-8 -*-
# Copyright 2015 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper functions for tracker file functionality."""

import errno
import hashlib
import json
import os
import re

from boto import config
from gslib.exception import CommandException
from gslib.util import CreateDirIfNeeded
from gslib.util import GetGsutilStateDir
from gslib.util import ResumableThreshold
from gslib.util import UTF8

# The maximum length of a file name can vary wildly between different
# operating systems, so we always ensure that tracker files are less
# than 100 characters in order to avoid any such issues.
MAX_TRACKER_FILE_NAME_LENGTH = 100


TRACKER_FILE_UNWRITABLE_EXCEPTION_TEXT = (
    'Couldn\'t write tracker file (%s): %s. This can happen if gsutil is '
    'configured to save tracker files to an unwritable directory)')


class TrackerFileType(object):
  UPLOAD = 'upload'
  DOWNLOAD = 'download'
  DOWNLOAD_COMPONENT = 'download_component'
  PARALLEL_UPLOAD = 'parallel_upload'
  SLICED_DOWNLOAD = 'sliced_download'
  REWRITE = 'rewrite'


def _HashFilename(filename):
  """Apply a hash function (SHA1) to shorten the passed file name.

  The spec for the hashed file name is as follows:

      TRACKER_<hash>_<trailing>

  where hash is a SHA1 hash on the original file name and trailing is
  the last 16 chars from the original file name. Max file name lengths
  vary by operating system so the goal of this function is to ensure
  the hashed version takes fewer than 100 characters.

  Args:
    filename: file name to be hashed.

  Returns:
    shorter, hashed version of passed file name
  """
  if isinstance(filename, unicode):
    filename = filename.encode(UTF8)
  else:
    filename = unicode(filename, UTF8).encode(UTF8)
  m = hashlib.sha1(filename)
  return 'TRACKER_' + m.hexdigest() + '.' + filename[-16:]


def CreateTrackerDirIfNeeded():
  """Looks up or creates the gsutil tracker file directory.

  This is the configured directory where gsutil keeps its resumable transfer
  tracker files. This function creates it if it doesn't already exist.

  Returns:
    The pathname to the tracker directory.
  """
  tracker_dir = config.get(
      'GSUtil', 'resumable_tracker_dir',
      os.path.join(GetGsutilStateDir(), 'tracker-files'))
  CreateDirIfNeeded(tracker_dir)
  return tracker_dir


def GetRewriteTrackerFilePath(src_bucket_name, src_obj_name, dst_bucket_name,
                              dst_obj_name, api_selector):
  """Gets the tracker file name described by the arguments.

  Args:
    src_bucket_name: Source bucket (string).
    src_obj_name: Source object (string).
    dst_bucket_name: Destination bucket (string).
    dst_obj_name: Destination object (string)
    api_selector: API to use for this operation.

  Returns:
    File path to tracker file.
  """
  # Encode the src and dest bucket and object names into the tracker file
  # name.
  res_tracker_file_name = (
      re.sub('[/\\\\]', '_', 'rewrite__%s__%s__%s__%s__%s.token' %
             (src_bucket_name, src_obj_name, dst_bucket_name,
              dst_obj_name, api_selector)))

  return _HashAndReturnPath(res_tracker_file_name, TrackerFileType.REWRITE)


def GetTrackerFilePath(dst_url, tracker_file_type, api_selector, src_url=None,
                       component_num=None):
  """Gets the tracker file name described by the arguments.

  Args:
    dst_url: Destination URL for tracker file.
    tracker_file_type: TrackerFileType for this operation.
    api_selector: API to use for this operation.
    src_url: Source URL for the source file name for parallel uploads.
    component_num: Component number if this is a download component, else None.

  Returns:
    File path to tracker file.
  """
  if tracker_file_type == TrackerFileType.UPLOAD:
    # Encode the dest bucket and object name into the tracker file name.
    res_tracker_file_name = (
        re.sub('[/\\\\]', '_', 'resumable_upload__%s__%s__%s.url' %
               (dst_url.bucket_name, dst_url.object_name, api_selector)))
  elif tracker_file_type == TrackerFileType.DOWNLOAD:
    # Encode the fully-qualified dest file name into the tracker file name.
    res_tracker_file_name = (
        re.sub('[/\\\\]', '_', 'resumable_download__%s__%s.etag' %
               (os.path.realpath(dst_url.object_name), api_selector)))
  elif tracker_file_type == TrackerFileType.DOWNLOAD_COMPONENT:
    # Encode the fully-qualified dest file name and the component number
    # into the tracker file name.
    res_tracker_file_name = (
        re.sub('[/\\\\]', '_', 'resumable_download__%s__%s__%d.etag' %
               (os.path.realpath(dst_url.object_name), api_selector,
                component_num)))
  elif tracker_file_type == TrackerFileType.PARALLEL_UPLOAD:
    # Encode the dest bucket and object names as well as the source file name
    # into the tracker file name.
    res_tracker_file_name = (
        re.sub('[/\\\\]', '_', 'parallel_upload__%s__%s__%s__%s.url' %
               (dst_url.bucket_name, dst_url.object_name,
                src_url, api_selector)))
  elif tracker_file_type == TrackerFileType.SLICED_DOWNLOAD:
    # Encode the fully-qualified dest file name into the tracker file name.
    res_tracker_file_name = (
        re.sub('[/\\\\]', '_', 'sliced_download__%s__%s.etag' %
               (os.path.realpath(dst_url.object_name), api_selector)))
  elif tracker_file_type == TrackerFileType.REWRITE:
    # Should use GetRewriteTrackerFilePath instead.
    raise NotImplementedError()

  return _HashAndReturnPath(res_tracker_file_name, tracker_file_type)


def DeleteDownloadTrackerFiles(dst_url, api_selector):
  """Deletes all tracker files corresponding to an object download.

  Args:
    dst_url: StorageUrl describing the destination file.
    api_selector: The Cloud API implementation used.
  """
  # Delete non-sliced download tracker file.
  DeleteTrackerFile(GetTrackerFilePath(dst_url, TrackerFileType.DOWNLOAD,
                                       api_selector))

  # Delete all sliced download tracker files.
  tracker_files = GetSlicedDownloadTrackerFilePaths(dst_url, api_selector)
  for tracker_file in tracker_files:
    DeleteTrackerFile(tracker_file)


def GetSlicedDownloadTrackerFilePaths(dst_url, api_selector,
                                      num_components=None):
  """Gets a list of sliced download tracker file paths.

  The list consists of the parent tracker file path in index 0, and then
  any existing component tracker files in [1:].

  Args:
    dst_url: Destination URL for tracker file.
    api_selector: API to use for this operation.
    num_components: The number of component tracker files, if already known.
                    If not known, the number will be retrieved from the parent
                    tracker file on disk.
  Returns:
    File path to tracker file.
  """
  parallel_tracker_file_path = GetTrackerFilePath(
      dst_url, TrackerFileType.SLICED_DOWNLOAD, api_selector)
  tracker_file_paths = [parallel_tracker_file_path]

  # If we don't know the number of components, check the tracker file.
  if num_components is None:
    tracker_file = None
    try:
      tracker_file = open(parallel_tracker_file_path, 'r')
      num_components = json.load(tracker_file)['num_components']
    except (IOError, ValueError):
      return tracker_file_paths
    finally:
      if tracker_file:
        tracker_file.close()

  for i in range(num_components):
    tracker_file_paths.append(GetTrackerFilePath(
        dst_url, TrackerFileType.DOWNLOAD_COMPONENT, api_selector,
        component_num=i))

  return tracker_file_paths


def _HashAndReturnPath(res_tracker_file_name, tracker_file_type):
  """Hashes and returns a tracker file path.

  Args:
    res_tracker_file_name: The tracker file name prior to it being hashed.
    tracker_file_type: The TrackerFileType of res_tracker_file_name.

  Returns:
    Final (hashed) tracker file path.
  """
  resumable_tracker_dir = CreateTrackerDirIfNeeded()
  hashed_tracker_file_name = _HashFilename(res_tracker_file_name)
  tracker_file_name = '%s_%s' % (str(tracker_file_type).lower(),
                                 hashed_tracker_file_name)
  tracker_file_path = '%s%s%s' % (resumable_tracker_dir, os.sep,
                                  tracker_file_name)
  assert len(tracker_file_name) < MAX_TRACKER_FILE_NAME_LENGTH
  return tracker_file_path


def DeleteTrackerFile(tracker_file_name):
  if tracker_file_name and os.path.exists(tracker_file_name):
    os.unlink(tracker_file_name)


def HashRewriteParameters(
    src_obj_metadata, dst_obj_metadata, projection, src_generation=None,
    gen_match=None, meta_gen_match=None, canned_acl=None, fields=None,
    max_bytes_per_call=None):
  """Creates an MD5 hex digest of the parameters for a rewrite call.

  Resuming rewrites requires that the input parameters are identical. Thus,
  the rewrite tracker file needs to represent the input parameters. For
  easy comparison, hash the input values. If a user does a performs a
  same-source/same-destination rewrite via a different command (for example,
  with a changed ACL), the hashes will not match and we will restart the
  rewrite from the beginning.

  Args:
    src_obj_metadata: apitools Object describing source object. Must include
      bucket, name, and etag.
    dst_obj_metadata: apitools Object describing destination object. Must
      include bucket and object name
    projection: Projection used for the API call.
    src_generation: Optional source generation.
    gen_match: Optional generation precondition.
    meta_gen_match: Optional metageneration precondition.
    canned_acl: Optional canned ACL string.
    fields: Optional fields to include in response.
    max_bytes_per_call: Optional maximum bytes rewritten per call.

  Returns:
    MD5 hex digest Hash of the input parameters, or None if required parameters
    are missing.
  """
  if (not src_obj_metadata or
      not src_obj_metadata.bucket or
      not src_obj_metadata.name or
      not src_obj_metadata.etag or
      not dst_obj_metadata or
      not dst_obj_metadata.bucket or
      not dst_obj_metadata.name or
      not projection):
    return
  md5_hash = hashlib.md5()
  for input_param in (
      src_obj_metadata, dst_obj_metadata, projection, src_generation,
      gen_match, meta_gen_match, canned_acl, fields, max_bytes_per_call):
    md5_hash.update(str(input_param))
  return md5_hash.hexdigest()


def ReadRewriteTrackerFile(tracker_file_name, rewrite_params_hash):
  """Attempts to read a rewrite tracker file.

  Args:
    tracker_file_name: Tracker file path string.
    rewrite_params_hash: MD5 hex digest of rewrite call parameters constructed
        by HashRewriteParameters.

  Returns:
    String rewrite_token for resuming rewrite requests if a matching tracker
    file exists, None otherwise (which will result in starting a new rewrite).
  """
  # Check to see if we already have a matching tracker file.
  tracker_file = None
  if not rewrite_params_hash:
    return
  try:
    tracker_file = open(tracker_file_name, 'r')
    existing_hash = tracker_file.readline().rstrip('\n')
    if existing_hash == rewrite_params_hash:
      # Next line is the rewrite token.
      return tracker_file.readline().rstrip('\n')
  except IOError as e:
    # Ignore non-existent file (happens first time a rewrite is attempted.
    if e.errno != errno.ENOENT:
      print('Couldn\'t read Copy tracker file (%s): %s. Restarting copy '
            'from scratch.' %
            (tracker_file_name, e.strerror))
  finally:
    if tracker_file:
      tracker_file.close()


def WriteRewriteTrackerFile(tracker_file_name, rewrite_params_hash,
                            rewrite_token):
  """Writes a rewrite tracker file.

  Args:
    tracker_file_name: Tracker file path string.
    rewrite_params_hash: MD5 hex digest of rewrite call parameters constructed
        by HashRewriteParameters.
    rewrite_token: Rewrite token string returned by the service.
  """
  _WriteTrackerFile(tracker_file_name, '%s\n%s\n' % (rewrite_params_hash,
                                                     rewrite_token))


def ReadOrCreateDownloadTrackerFile(src_obj_metadata, dst_url, logger,
                                    api_selector, start_byte,
                                    existing_file_size, component_num=None):
  """Checks for a download tracker file and creates one if it does not exist.

  The methodology for determining the download start point differs between
  normal and sliced downloads. For normal downloads, the existing bytes in
  the file are presumed to be correct and have been previously downloaded from
  the server (if a tracker file exists). In this case, the existing file size
  is used to determine the download start point. For sliced downloads, the
  number of bytes previously retrieved from the server cannot be determined
  from the existing file size, and so the number of bytes known to have been
  previously downloaded is retrieved from the tracker file.

  Args:
    src_obj_metadata: Metadata for the source object. Must include etag and
                      generation.
    dst_url: Destination URL for tracker file.
    logger: For outputting log messages.
    api_selector: API to use for this operation.
    start_byte: The start byte of the byte range for this download.
    existing_file_size: Size of existing file for this download on disk.
    component_num: The component number, if this is a component of a parallel
                   download, else None.

  Returns:
    tracker_file_name: The name of the tracker file, if one was used.
    download_start_byte: The first byte that still needs to be downloaded.
  """
  assert src_obj_metadata.etag

  tracker_file_name = None
  if src_obj_metadata.size < ResumableThreshold():
    # Don't create a tracker file for a small downloads; cross-process resumes
    # won't work, but restarting a small download is inexpensive.
    return tracker_file_name, start_byte

  download_name = dst_url.object_name
  if component_num is None:
    tracker_file_type = TrackerFileType.DOWNLOAD
  else:
    tracker_file_type = TrackerFileType.DOWNLOAD_COMPONENT
    download_name += ' component %d' % component_num

  tracker_file_name = GetTrackerFilePath(dst_url, tracker_file_type,
                                         api_selector,
                                         component_num=component_num)
  tracker_file = None
  # Check to see if we already have a matching tracker file.
  try:
    tracker_file = open(tracker_file_name, 'r')
    if tracker_file_type is TrackerFileType.DOWNLOAD:
      etag_value = tracker_file.readline().rstrip('\n')
      if etag_value == src_obj_metadata.etag:
        return tracker_file_name, existing_file_size
    elif tracker_file_type is TrackerFileType.DOWNLOAD_COMPONENT:
      component_data = json.loads(tracker_file.read())
      if (component_data['etag'] == src_obj_metadata.etag and
          component_data['generation'] == src_obj_metadata.generation):
        return tracker_file_name, component_data['download_start_byte']

    logger.warn('Tracker file doesn\'t match for download of %s. Restarting '
                'download from scratch.' % download_name)

  except (IOError, ValueError) as e:
    # Ignore non-existent file (happens first time a download
    # is attempted on an object), but warn user for other errors.
    if isinstance(e, ValueError) or e.errno != errno.ENOENT:
      logger.warn('Couldn\'t read download tracker file (%s): %s. Restarting '
                  'download from scratch.' % (tracker_file_name, str(e)))
  finally:
    if tracker_file:
      tracker_file.close()

  # There wasn't a matching tracker file, so create one and then start the
  # download from scratch.
  if tracker_file_type is TrackerFileType.DOWNLOAD:
    _WriteTrackerFile(tracker_file_name, '%s\n' % src_obj_metadata.etag)
  elif tracker_file_type is TrackerFileType.DOWNLOAD_COMPONENT:
    WriteDownloadComponentTrackerFile(tracker_file_name, src_obj_metadata,
                                      start_byte)
  return tracker_file_name, start_byte


def WriteDownloadComponentTrackerFile(tracker_file_name, src_obj_metadata,
                                      current_file_pos):
  """Updates or creates a download component tracker file on disk.

  Args:
    tracker_file_name: The name of the tracker file.
    src_obj_metadata: Metadata for the source object. Must include etag.
    current_file_pos: The current position in the file.
  """
  component_data = {'etag': src_obj_metadata.etag,
                    'generation': src_obj_metadata.generation,
                    'download_start_byte': current_file_pos}

  _WriteTrackerFile(tracker_file_name, json.dumps(component_data))


def _WriteTrackerFile(tracker_file_name, data):
  """Creates a tracker file, storing the input data."""
  try:
    with os.fdopen(os.open(tracker_file_name,
                           os.O_WRONLY | os.O_CREAT, 0600), 'w') as tf:
      tf.write(data)
    return False
  except (IOError, OSError) as e:
    raise RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror)


def RaiseUnwritableTrackerFileException(tracker_file_name, error_str):
  """Raises an exception when unable to write the tracker file."""
  raise CommandException(TRACKER_FILE_UNWRITABLE_EXCEPTION_TEXT %
                         (tracker_file_name, error_str))