#!/usr/bin/python
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Queries a MySQL database and emits status metrics to Monarch.
Note: confusingly, 'Innodb_buffer_pool_reads' is actually the cache-misses, not
the number of reads to the buffer pool. 'Innodb_buffer_pool_read_requests'
corresponds to the number of reads the the buffer pool.
"""
import logging
import sys
import MySQLdb
import time
import common
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib.cros import retry
from chromite.lib import metrics
from chromite.lib import ts_mon_config
AT_DIR='/usr/local/autotest'
DEFAULT_USER = global_config.global_config.get_config_value(
'CROS', 'db_backup_user', type=str, default='')
DEFAULT_PASSWD = global_config.global_config.get_config_value(
'CROS', 'db_backup_password', type=str, default='')
LOOP_INTERVAL = 60
EMITTED_STATUSES_COUNTERS = [
'bytes_received',
'bytes_sent',
'connections',
'Innodb_buffer_pool_read_requests',
'Innodb_buffer_pool_reads',
'Innodb_row_lock_waits',
'questions',
'slow_queries',
'threads_created',
]
EMITTED_STATUS_GAUGES = [
'Innodb_row_lock_time_avg',
'Innodb_row_lock_current_waits',
'threads_running',
'threads_connected',
]
class RetryingConnection(object):
"""Maintains a db connection and a cursor."""
INITIAL_SLEEP_SECONDS = 20
MAX_TIMEOUT_SECONDS = 60 * 60
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.db = None
self.cursor = None
def Connect(self):
"""Establishes a MySQL connection and creates a cursor."""
self.db = MySQLdb.connect(*self.args, **self.kwargs)
self.cursor = self.db.cursor()
def Reconnect(self):
"""Attempts to close the connection, then reconnects."""
try:
self.cursor.close()
self.db.close()
except MySQLdb.Error:
pass
self.Connect()
def RetryWith(self, func):
"""Run a function, retrying on OperationalError."""
return retry.retry(
MySQLdb.OperationalError,
delay_sec=self.INITIAL_SLEEP_SECONDS,
timeout_min=self.MAX_TIMEOUT_SECONDS,
callback=self.Reconnect
)(func)()
def Execute(self, *args, **kwargs):
"""Runs .execute on the cursor, reconnecting on failure."""
def _Execute():
return self.cursor.execute(*args, **kwargs)
return self.RetryWith(_Execute)
def Fetchall(self):
"""Runs .fetchall on the cursor."""
return self.cursor.fetchall()
def GetStatus(connection, status):
"""Get the status variable from the database, retrying on failure.
@param connection: MySQLdb cursor to query with.
@param status: Name of the status variable.
@returns The mysql query result.
"""
connection.Execute('SHOW GLOBAL STATUS LIKE "%s";' % status)
output = connection.Fetchall()[0][1]
if not output:
logging.error('Cannot find any global status like %s', status)
return int(output)
def QueryAndEmit(baselines, conn):
"""Queries MySQL for important stats and emits Monarch metrics
@param baselines: A dict containing the initial values for the cumulative
metrics.
@param conn: The mysql connection object.
"""
for status in EMITTED_STATUSES_COUNTERS:
metric_name = 'chromeos/autotest/afe_db/%s' % status.lower()
delta = GetStatus(conn, status) - baselines[status]
metrics.Counter(metric_name).set(delta)
for status in EMITTED_STATUS_GAUGES:
metric_name = 'chromeos/autotest/afe_db/%s' % status.lower()
metrics.Gauge(metric_name).set(GetStatus(conn, status))
pages_free = GetStatus(conn, 'Innodb_buffer_pool_pages_free')
pages_total = GetStatus(conn, 'Innodb_buffer_pool_pages_total')
metrics.Gauge('chromeos/autotest/afe_db/buffer_pool_pages').set(
pages_free, fields={'used': False})
metrics.Gauge('chromeos/autotest/afe_db/buffer_pool_pages').set(
pages_total - pages_free, fields={'used': True})
def main():
"""Sets up ts_mon and repeatedly queries MySQL stats"""
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
conn = RetryingConnection('localhost', DEFAULT_USER, DEFAULT_PASSWD)
conn.Connect()
# TODO(crbug.com/803566) Use indirect=False to mitigate orphan mysql_stats
# processes overwhelming shards.
with ts_mon_config.SetupTsMonGlobalState('mysql_stats', indirect=False):
QueryLoop(conn)
def QueryLoop(conn):
"""Queries and emits metrics every LOOP_INTERVAL seconds.
@param conn: The mysql connection object.
"""
# Get the baselines for cumulative metrics. Otherwise the windowed rate at
# the very beginning will be extremely high as it shoots up from 0 to its
# current value.
baselines = dict((s, GetStatus(conn, s))
for s in EMITTED_STATUSES_COUNTERS)
while True:
now = time.time()
QueryAndEmit(baselines, conn)
time_spent = time.time() - now
sleep_duration = LOOP_INTERVAL - time_spent
time.sleep(max(0, sleep_duration))
if __name__ == '__main__':
main()