# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import io
import json
import logging
import time
import uuid
from google.appengine.api import app_identity
from apiclient import http
from apiclient.discovery import build
from oauth2client import client
from base import exceptions
# urlfetch max size is 10 MB. Assume 1000 bytes per row and split the
# insert into chunks of 10,000 rows.
INSERTION_MAX_ROWS = 10000
class BigQuery(object):
"""Methods for interfacing with BigQuery."""
def __init__(self, project_id=None):
self._service = _Service()
if project_id:
self._project_id = project_id
else:
self._project_id = app_identity.get_application_id()
def InsertRowsAsync(self, dataset_id, table_id, rows,
truncate=False, num_retries=5):
responses = []
for i in xrange(0, len(rows), INSERTION_MAX_ROWS):
rows_chunk = rows[i:i+INSERTION_MAX_ROWS]
logging.info('Inserting %d rows into %s.%s.',
len(rows_chunk), dataset_id, table_id)
body = {
'configuration': {
'jobReference': {
'projectId': self._project_id,
'jobId': str(uuid.uuid4()),
},
'load': {
'destinationTable': {
'projectId': self._project_id,
'datasetId': dataset_id,
'tableId': table_id,
},
'sourceFormat': 'NEWLINE_DELIMITED_JSON',
'writeDisposition':
'WRITE_TRUNCATE' if truncate else 'WRITE_APPEND',
}
}
}
# Format rows as newline-delimited JSON.
media_buffer = io.BytesIO()
for row in rows_chunk:
json.dump(row, media_buffer, separators=(',', ':'))
print >> media_buffer
media_body = http.MediaIoBaseUpload(
media_buffer, mimetype='application/octet-stream')
responses.append(self._service.jobs().insert(
projectId=self._project_id,
body=body, media_body=media_body).execute(num_retries=num_retries))
# Only truncate on the first insert!
truncate = False
# TODO(dtu): Return a Job object.
return responses
def InsertRowsSync(self, dataset_id, table_id, rows, num_retries=5):
for i in xrange(0, len(rows), INSERTION_MAX_ROWS):
rows_chunk = rows[i:i+INSERTION_MAX_ROWS]
logging.info('Inserting %d rows into %s.%s.',
len(rows_chunk), dataset_id, table_id)
rows_chunk = [{'insertId': str(uuid.uuid4()), 'json': row}
for row in rows_chunk]
insert_data = {'rows': rows_chunk}
response = self._service.tabledata().insertAll(
projectId=self._project_id,
datasetId=dataset_id,
tableId=table_id,
body=insert_data).execute(num_retries=num_retries)
if 'insertErrors' in response:
raise exceptions.QueryError(response['insertErrors'])
def QueryAsync(self, query, num_retries=5):
logging.debug(query)
body = {
'jobReference': {
'projectId': self._project_id,
'jobId': str(uuid.uuid4()),
},
'configuration': {
'query': {
'query': query,
'priority': 'INTERACTIVE',
}
}
}
return self._service.jobs().insert(
projectId=self._project_id,
body=body).execute(num_retries=num_retries)
def QuerySync(self, query, timeout=60, num_retries=5):
"""Query Bigtable and return the results as a dict.
Args:
query: Query string.
timeout: Timeout in seconds.
num_retries: Number of attempts.
Returns:
Query results. The format is specified in the "rows" field here:
https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/python/latest/bigquery_v2.jobs.html#getQueryResults
"""
logging.debug(query)
query_data = {
'query': query,
'timeoutMs': timeout * 1000,
}
start_time = time.time()
response = self._service.jobs().query(
projectId=self._project_id,
body=query_data).execute(num_retries=num_retries)
if 'errors' in response:
raise exceptions.QueryError(response['errors'])
# TODO(dtu): Fetch subsequent pages of rows for big queries.
# TODO(dtu): Reformat results as dicts.
result = response.get('rows', [])
logging.debug('Query fetched %d rows in %fs.',
len(result), time.time() - start_time)
return result
def IsJobDone(self, job):
response = self._service.jobs().get(**job['jobReference']).execute()
if response['status']['state'] == 'DONE':
return response
else:
return None
def PollJob(self, job, timeout):
# TODO(dtu): Take multiple jobs as parameters.
start_time = time.time()
iteration = 0
while True:
elapsed_time = time.time() - start_time
response = self.IsJobDone(job)
if response:
if 'errors' in response['status']:
raise exceptions.QueryError(response['status']['errors'])
logging.debug('Polled job for %d seconds.', int(elapsed_time))
return response
if elapsed_time >= timeout:
break
time.sleep(min(1.5 ** iteration, timeout - elapsed_time))
iteration += 1
raise exceptions.TimeoutError()
def _Service():
"""Returns an initialized and authorized BigQuery client."""
# pylint: disable=no-member
credentials = client.GoogleCredentials.get_application_default()
if credentials.create_scoped_required():
credentials = credentials.create_scoped(
'https://www.googleapis.com/auth/bigquery')
return build('bigquery', 'v2', credentials=credentials)