# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import io
import json
import logging
import time
import uuid

from google.appengine.api import app_identity

from apiclient import http
from apiclient.discovery import build
from oauth2client import client

from base import exceptions


# urlfetch max size is 10 MB. Assume 1000 bytes per row and split the
# insert into chunks of 10,000 rows.
INSERTION_MAX_ROWS = 10000


class BigQuery(object):
  """Methods for interfacing with BigQuery."""

  def __init__(self, project_id=None):
    self._service = _Service()
    if project_id:
      self._project_id = project_id
    else:
      self._project_id = app_identity.get_application_id()

  def InsertRowsAsync(self, dataset_id, table_id, rows,
                      truncate=False, num_retries=5):
    responses = []
    for i in xrange(0, len(rows), INSERTION_MAX_ROWS):
      rows_chunk = rows[i:i+INSERTION_MAX_ROWS]
      logging.info('Inserting %d rows into %s.%s.',
                   len(rows_chunk), dataset_id, table_id)
      body = {
          'configuration': {
              'jobReference': {
                  'projectId': self._project_id,
                  'jobId': str(uuid.uuid4()),
              },
              'load': {
                  'destinationTable': {
                      'projectId': self._project_id,
                      'datasetId': dataset_id,
                      'tableId': table_id,
                  },
                  'sourceFormat': 'NEWLINE_DELIMITED_JSON',
                  'writeDisposition':
                      'WRITE_TRUNCATE' if truncate else 'WRITE_APPEND',
              }
          }
      }

      # Format rows as newline-delimited JSON.
      media_buffer = io.BytesIO()
      for row in rows_chunk:
        json.dump(row, media_buffer, separators=(',', ':'))
        print >> media_buffer
      media_body = http.MediaIoBaseUpload(
          media_buffer, mimetype='application/octet-stream')

      responses.append(self._service.jobs().insert(
          projectId=self._project_id,
          body=body, media_body=media_body).execute(num_retries=num_retries))

      # Only truncate on the first insert!
      truncate = False

    # TODO(dtu): Return a Job object.
    return responses

  def InsertRowsSync(self, dataset_id, table_id, rows, num_retries=5):
    for i in xrange(0, len(rows), INSERTION_MAX_ROWS):
      rows_chunk = rows[i:i+INSERTION_MAX_ROWS]
      logging.info('Inserting %d rows into %s.%s.',
                   len(rows_chunk), dataset_id, table_id)
      rows_chunk = [{'insertId': str(uuid.uuid4()), 'json': row}
                    for row in rows_chunk]
      insert_data = {'rows': rows_chunk}
      response = self._service.tabledata().insertAll(
          projectId=self._project_id,
          datasetId=dataset_id,
          tableId=table_id,
          body=insert_data).execute(num_retries=num_retries)

      if 'insertErrors' in response:
        raise exceptions.QueryError(response['insertErrors'])

  def QueryAsync(self, query, num_retries=5):
    logging.debug(query)
    body = {
        'jobReference': {
            'projectId': self._project_id,
            'jobId': str(uuid.uuid4()),
        },
        'configuration': {
            'query': {
                'query': query,
                'priority': 'INTERACTIVE',
            }
        }
    }
    return self._service.jobs().insert(
        projectId=self._project_id,
        body=body).execute(num_retries=num_retries)

  def QuerySync(self, query, timeout=60, num_retries=5):
    """Query Bigtable and return the results as a dict.

    Args:
      query: Query string.
      timeout: Timeout in seconds.
      num_retries: Number of attempts.

    Returns:
      Query results. The format is specified in the "rows" field here:
      https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/python/latest/bigquery_v2.jobs.html#getQueryResults
    """
    logging.debug(query)
    query_data = {
        'query': query,
        'timeoutMs': timeout * 1000,
    }
    start_time = time.time()
    response = self._service.jobs().query(
        projectId=self._project_id,
        body=query_data).execute(num_retries=num_retries)

    if 'errors' in response:
      raise exceptions.QueryError(response['errors'])

    # TODO(dtu): Fetch subsequent pages of rows for big queries.
    # TODO(dtu): Reformat results as dicts.
    result = response.get('rows', [])
    logging.debug('Query fetched %d rows in %fs.',
                  len(result), time.time() - start_time)
    return result

  def IsJobDone(self, job):
    response = self._service.jobs().get(**job['jobReference']).execute()
    if response['status']['state'] == 'DONE':
      return response
    else:
      return None

  def PollJob(self, job, timeout):
    # TODO(dtu): Take multiple jobs as parameters.
    start_time = time.time()
    iteration = 0

    while True:
      elapsed_time = time.time() - start_time

      response = self.IsJobDone(job)
      if response:
        if 'errors' in response['status']:
          raise exceptions.QueryError(response['status']['errors'])
        logging.debug('Polled job for %d seconds.', int(elapsed_time))
        return response

      if elapsed_time >= timeout:
        break
      time.sleep(min(1.5 ** iteration, timeout - elapsed_time))
      iteration += 1

    raise exceptions.TimeoutError()


def _Service():
  """Returns an initialized and authorized BigQuery client."""
  # pylint: disable=no-member
  credentials = client.GoogleCredentials.get_application_default()
  if credentials.create_scoped_required():
    credentials = credentials.create_scoped(
        'https://www.googleapis.com/auth/bigquery')
  return build('bigquery', 'v2', credentials=credentials)