"""
Interfaces to the QEMU monitor.
@copyright: 2008-2010 Red Hat Inc.
"""
import socket, time, threading, logging, select
import virt_utils
try:
import json
except ImportError:
logging.warning("Could not import json module. "
"QMP monitor functionality disabled.")
class MonitorError(Exception):
pass
class MonitorConnectError(MonitorError):
pass
class MonitorSocketError(MonitorError):
def __init__(self, msg, e):
Exception.__init__(self, msg, e)
self.msg = msg
self.e = e
def __str__(self):
return "%s (%s)" % (self.msg, self.e)
class MonitorLockError(MonitorError):
pass
class MonitorProtocolError(MonitorError):
pass
class MonitorNotSupportedError(MonitorError):
pass
class QMPCmdError(MonitorError):
def __init__(self, cmd, qmp_args, data):
MonitorError.__init__(self, cmd, qmp_args, data)
self.cmd = cmd
self.qmp_args = qmp_args
self.data = data
def __str__(self):
return ("QMP command %r failed (arguments: %r, "
"error message: %r)" % (self.cmd, self.qmp_args, self.data))
class Monitor:
"""
Common code for monitor classes.
"""
def __init__(self, name, filename):
"""
Initialize the instance.
@param name: Monitor identifier (a string)
@param filename: Monitor socket filename
@raise MonitorConnectError: Raised if the connection fails
"""
self.name = name
self.filename = filename
self._lock = threading.RLock()
self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
self._socket.connect(filename)
except socket.error:
raise MonitorConnectError("Could not connect to monitor socket")
def __del__(self):
# Automatically close the connection when the instance is garbage
# collected
self._close_sock()
# The following two functions are defined to make sure the state is set
# exclusively by the constructor call as specified in __getinitargs__().
def __getstate__(self):
pass
def __setstate__(self, state):
pass
def __getinitargs__(self):
# Save some information when pickling -- will be passed to the
# constructor upon unpickling
return self.name, self.filename, True
def _close_sock(self):
try:
self._socket.shutdown(socket.SHUT_RDWR)
except socket.error:
pass
self._socket.close()
def _acquire_lock(self, timeout=20):
end_time = time.time() + timeout
while time.time() < end_time:
if self._lock.acquire(False):
return True
time.sleep(0.05)
return False
def _data_available(self, timeout=0):
timeout = max(0, timeout)
try:
return bool(select.select([self._socket], [], [], timeout)[0])
except socket.error, e:
raise MonitorSocketError("Verifying data on monitor socket", e)
def _recvall(self):
s = ""
while self._data_available():
try:
data = self._socket.recv(1024)
except socket.error, e:
raise MonitorSocketError("Could not receive data from monitor",
e)
if not data:
break
s += data
return s
def is_responsive(self):
"""
Return True iff the monitor is responsive.
"""
try:
self.verify_responsive()
return True
except MonitorError:
return False
class HumanMonitor(Monitor):
"""
Wraps "human monitor" commands.
"""
def __init__(self, name, filename, suppress_exceptions=False):
"""
Connect to the monitor socket and find the (qemu) prompt.
@param name: Monitor identifier (a string)
@param filename: Monitor socket filename
@raise MonitorConnectError: Raised if the connection fails and
suppress_exceptions is False
@raise MonitorProtocolError: Raised if the initial (qemu) prompt isn't
found and suppress_exceptions is False
@note: Other exceptions may be raised. See cmd()'s
docstring.
"""
try:
Monitor.__init__(self, name, filename)
self.protocol = "human"
# Find the initial (qemu) prompt
s, o = self._read_up_to_qemu_prompt(20)
if not s:
raise MonitorProtocolError("Could not find (qemu) prompt "
"after connecting to monitor. "
"Output so far: %r" % o)
# Save the output of 'help' for future use
self._help_str = self.cmd("help", debug=False)
except MonitorError, e:
self._close_sock()
if suppress_exceptions:
logging.warning(e)
else:
raise
# Private methods
def _read_up_to_qemu_prompt(self, timeout=20):
s = ""
end_time = time.time() + timeout
while self._data_available(end_time - time.time()):
data = self._recvall()
if not data:
break
s += data
try:
if s.splitlines()[-1].split()[-1] == "(qemu)":
return True, "\n".join(s.splitlines()[:-1])
except IndexError:
continue
return False, "\n".join(s.splitlines())
def _send(self, cmd):
"""
Send a command without waiting for output.
@param cmd: Command to send
@raise MonitorLockError: Raised if the lock cannot be acquired
@raise MonitorSocketError: Raised if a socket error occurs
"""
if not self._acquire_lock(20):
raise MonitorLockError("Could not acquire exclusive lock to send "
"monitor command '%s'" % cmd)
try:
try:
self._socket.sendall(cmd + "\n")
except socket.error, e:
raise MonitorSocketError("Could not send monitor command %r" %
cmd, e)
finally:
self._lock.release()
# Public methods
def cmd(self, command, timeout=20, debug=True):
"""
Send command to the monitor.
@param command: Command to send to the monitor
@param timeout: Time duration to wait for the (qemu) prompt to return
@param debug: Whether to print the commands being sent and responses
@return: Output received from the monitor
@raise MonitorLockError: Raised if the lock cannot be acquired
@raise MonitorSocketError: Raised if a socket error occurs
@raise MonitorProtocolError: Raised if the (qemu) prompt cannot be
found after sending the command
"""
if debug:
logging.debug("(monitor %s) Sending command '%s'",
self.name, command)
if not self._acquire_lock(20):
raise MonitorLockError("Could not acquire exclusive lock to send "
"monitor command '%s'" % command)
try:
# Read any data that might be available
self._recvall()
# Send command
self._send(command)
# Read output
s, o = self._read_up_to_qemu_prompt(timeout)
# Remove command echo from output
o = "\n".join(o.splitlines()[1:])
# Report success/failure
if s:
if debug and o:
logging.debug("(monitor %s) "
"Response to '%s'", self.name,
command)
for l in o.splitlines():
logging.debug("(monitor %s) %s", self.name, l)
return o
else:
msg = ("Could not find (qemu) prompt after command '%s'. "
"Output so far: %r" % (command, o))
raise MonitorProtocolError(msg)
finally:
self._lock.release()
def verify_responsive(self):
"""
Make sure the monitor is responsive by sending a command.
"""
self.cmd("info status", debug=False)
def verify_status(self, status):
"""
Verify VM status
@param status: Optional VM status, 'running' or 'paused'
@return: return True if VM status is same as we expected
"""
o = self.cmd("info status", debug=False)
if status=='paused' or status=='running':
return (status in o)
# Command wrappers
# Notes:
# - All of the following commands raise exceptions in a similar manner to
# cmd().
# - A command wrapper should use self._help_str if it requires information
# about the monitor's capabilities.
def quit(self):
"""
Send "quit" without waiting for output.
"""
self._send("quit")
def info(self, what):
"""
Request info about something and return the output.
"""
return self.cmd("info %s" % what)
def query(self, what):
"""
Alias for info.
"""
return self.info(what)
def screendump(self, filename, debug=True):
"""
Request a screendump.
@param filename: Location for the screendump
@return: The command's output
"""
return self.cmd(command="screendump %s" % filename, debug=debug)
def migrate(self, uri, full_copy=False, incremental_copy=False, wait=False):
"""
Migrate.
@param uri: destination URI
@param full_copy: If true, migrate with full disk copy
@param incremental_copy: If true, migrate with incremental disk copy
@param wait: If true, wait for completion
@return: The command's output
"""
cmd = "migrate"
if not wait:
cmd += " -d"
if full_copy:
cmd += " -b"
if incremental_copy:
cmd += " -i"
cmd += " %s" % uri
return self.cmd(cmd)
def migrate_set_speed(self, value):
"""
Set maximum speed (in bytes/sec) for migrations.
@param value: Speed in bytes/sec
@return: The command's output
"""
return self.cmd("migrate_set_speed %s" % value)
def sendkey(self, keystr, hold_time=1):
"""
Send key combination to VM.
@param keystr: Key combination string
@param hold_time: Hold time in ms (should normally stay 1 ms)
@return: The command's output
"""
return self.cmd("sendkey %s %s" % (keystr, hold_time))
def mouse_move(self, dx, dy):
"""
Move mouse.
@param dx: X amount
@param dy: Y amount
@return: The command's output
"""
return self.cmd("mouse_move %d %d" % (dx, dy))
def mouse_button(self, state):
"""
Set mouse button state.
@param state: Button state (1=L, 2=M, 4=R)
@return: The command's output
"""
return self.cmd("mouse_button %d" % state)
class QMPMonitor(Monitor):
"""
Wraps QMP monitor commands.
"""
def __init__(self, name, filename, suppress_exceptions=False):
"""
Connect to the monitor socket, read the greeting message and issue the
qmp_capabilities command. Also make sure the json module is available.
@param name: Monitor identifier (a string)
@param filename: Monitor socket filename
@raise MonitorConnectError: Raised if the connection fails and
suppress_exceptions is False
@raise MonitorProtocolError: Raised if the no QMP greeting message is
received and suppress_exceptions is False
@raise MonitorNotSupportedError: Raised if json isn't available and
suppress_exceptions is False
@note: Other exceptions may be raised if the qmp_capabilities command
fails. See cmd()'s docstring.
"""
try:
Monitor.__init__(self, name, filename)
self.protocol = "qmp"
self._greeting = None
self._events = []
# Make sure json is available
try:
json
except NameError:
raise MonitorNotSupportedError("QMP requires the json module "
"(Python 2.6 and up)")
# Read greeting message
end_time = time.time() + 20
while time.time() < end_time:
for obj in self._read_objects():
if "QMP" in obj:
self._greeting = obj
break
if self._greeting:
break
time.sleep(0.1)
else:
raise MonitorProtocolError("No QMP greeting message received")
# Issue qmp_capabilities
self.cmd("qmp_capabilities")
except MonitorError, e:
self._close_sock()
if suppress_exceptions:
logging.warning(e)
else:
raise
# Private methods
def _build_cmd(self, cmd, args=None, id=None):
obj = {"execute": cmd}
if args is not None:
obj["arguments"] = args
if id is not None:
obj["id"] = id
return obj
def _read_objects(self, timeout=5):
"""
Read lines from the monitor and try to decode them.
Stop when all available lines have been successfully decoded, or when
timeout expires. If any decoded objects are asynchronous events, store
them in self._events. Return all decoded objects.
@param timeout: Time to wait for all lines to decode successfully
@return: A list of objects
"""
if not self._data_available():
return []
s = ""
end_time = time.time() + timeout
while self._data_available(end_time - time.time()):
s += self._recvall()
# Make sure all lines are decodable
for line in s.splitlines():
if line:
try:
json.loads(line)
except:
# Found an incomplete or broken line -- keep reading
break
else:
# All lines are OK -- stop reading
break
# Decode all decodable lines
objs = []
for line in s.splitlines():
try:
objs += [json.loads(line)]
except:
pass
# Keep track of asynchronous events
self._events += [obj for obj in objs if "event" in obj]
return objs
def _send(self, data):
"""
Send raw data without waiting for response.
@param data: Data to send
@raise MonitorSocketError: Raised if a socket error occurs
"""
try:
self._socket.sendall(data)
except socket.error, e:
raise MonitorSocketError("Could not send data: %r" % data, e)
def _get_response(self, id=None, timeout=20):
"""
Read a response from the QMP monitor.
@param id: If not None, look for a response with this id
@param timeout: Time duration to wait for response
@return: The response dict, or None if none was found
"""
end_time = time.time() + timeout
while self._data_available(end_time - time.time()):
for obj in self._read_objects():
if isinstance(obj, dict):
if id is not None and obj.get("id") != id:
continue
if "return" in obj or "error" in obj:
return obj
# Public methods
def cmd(self, cmd, args=None, timeout=20, debug=True):
"""
Send a QMP monitor command and return the response.
Note: an id is automatically assigned to the command and the response
is checked for the presence of the same id.
@param cmd: Command to send
@param args: A dict containing command arguments, or None
@param timeout: Time duration to wait for response
@return: The response received
@raise MonitorLockError: Raised if the lock cannot be acquired
@raise MonitorSocketError: Raised if a socket error occurs
@raise MonitorProtocolError: Raised if no response is received
@raise QMPCmdError: Raised if the response is an error message
(the exception's args are (cmd, args, data) where data is the
error data)
"""
if debug:
logging.debug("(monitor %s) Sending command '%s'",
self.name, cmd)
if not self._acquire_lock(20):
raise MonitorLockError("Could not acquire exclusive lock to send "
"QMP command '%s'" % cmd)
try:
# Read any data that might be available
self._read_objects()
# Send command
id = virt_utils.generate_random_string(8)
self._send(json.dumps(self._build_cmd(cmd, args, id)) + "\n")
# Read response
r = self._get_response(id, timeout)
if r is None:
raise MonitorProtocolError("Received no response to QMP "
"command '%s', or received a "
"response with an incorrect id"
% cmd)
if "return" in r:
if debug and r["return"]:
logging.debug("(monitor %s) "
"Response to '%s'", self.name, cmd)
o = str(r["return"])
for l in o.splitlines():
logging.debug("(monitor %s) %s", self.name, l)
return r["return"]
if "error" in r:
raise QMPCmdError(cmd, args, r["error"])
finally:
self._lock.release()
def cmd_raw(self, data, timeout=20):
"""
Send a raw string to the QMP monitor and return the response.
Unlike cmd(), return the raw response dict without performing any
checks on it.
@param data: The data to send
@param timeout: Time duration to wait for response
@return: The response received
@raise MonitorLockError: Raised if the lock cannot be acquired
@raise MonitorSocketError: Raised if a socket error occurs
@raise MonitorProtocolError: Raised if no response is received
"""
if not self._acquire_lock(20):
raise MonitorLockError("Could not acquire exclusive lock to send "
"data: %r" % data)
try:
self._read_objects()
self._send(data)
r = self._get_response(None, timeout)
if r is None:
raise MonitorProtocolError("Received no response to data: %r" %
data)
return r
finally:
self._lock.release()
def cmd_obj(self, obj, timeout=20):
"""
Transform a Python object to JSON, send the resulting string to the QMP
monitor, and return the response.
Unlike cmd(), return the raw response dict without performing any
checks on it.
@param obj: The object to send
@param timeout: Time duration to wait for response
@return: The response received
@raise MonitorLockError: Raised if the lock cannot be acquired
@raise MonitorSocketError: Raised if a socket error occurs
@raise MonitorProtocolError: Raised if no response is received
"""
return self.cmd_raw(json.dumps(obj) + "\n")
def cmd_qmp(self, cmd, args=None, id=None, timeout=20):
"""
Build a QMP command from the passed arguments, send it to the monitor
and return the response.
Unlike cmd(), return the raw response dict without performing any
checks on it.
@param cmd: Command to send
@param args: A dict containing command arguments, or None
@param id: An id for the command, or None
@param timeout: Time duration to wait for response
@return: The response received
@raise MonitorLockError: Raised if the lock cannot be acquired
@raise MonitorSocketError: Raised if a socket error occurs
@raise MonitorProtocolError: Raised if no response is received
"""
return self.cmd_obj(self._build_cmd(cmd, args, id), timeout)
def verify_responsive(self):
"""
Make sure the monitor is responsive by sending a command.
"""
self.cmd(cmd="query-status", debug=False)
def verify_status(self, status):
"""
Verify VM status
@param status: Optional VM status, 'running' or 'paused'
@return: return True if VM status is same as we expected
"""
o = str(self.cmd(cmd="query-status", debug=False))
if (status=='paused' and "u'running': False" in o):
return True
if (status=='running' and "u'running': True" in o):
return True
def get_events(self):
"""
Return a list of the asynchronous events received since the last
clear_events() call.
@return: A list of events (the objects returned have an "event" key)
@raise MonitorLockError: Raised if the lock cannot be acquired
"""
if not self._acquire_lock(20):
raise MonitorLockError("Could not acquire exclusive lock to read "
"QMP events")
try:
self._read_objects()
return self._events[:]
finally:
self._lock.release()
def get_event(self, name):
"""
Look for an event with the given name in the list of events.
@param name: The name of the event to look for (e.g. 'RESET')
@return: An event object or None if none is found
"""
for e in self.get_events():
if e.get("event") == name:
return e
def clear_events(self):
"""
Clear the list of asynchronous events.
@raise MonitorLockError: Raised if the lock cannot be acquired
"""
if not self._acquire_lock(20):
raise MonitorLockError("Could not acquire exclusive lock to clear "
"QMP event list")
self._events = []
self._lock.release()
def get_greeting(self):
"""
Return QMP greeting message.
"""
return self._greeting
# Command wrappers
# Note: all of the following functions raise exceptions in a similar manner
# to cmd().
def quit(self):
"""
Send "quit" and return the response.
"""
return self.cmd("quit")
def info(self, what):
"""
Request info about something and return the response.
"""
return self.cmd("query-%s" % what)
def query(self, what):
"""
Alias for info.
"""
return self.info(what)
def screendump(self, filename, debug=True):
"""
Request a screendump.
@param filename: Location for the screendump
@return: The response to the command
"""
args = {"filename": filename}
return self.cmd(cmd="screendump", args=args, debug=debug)
def migrate(self, uri, full_copy=False, incremental_copy=False, wait=False):
"""
Migrate.
@param uri: destination URI
@param full_copy: If true, migrate with full disk copy
@param incremental_copy: If true, migrate with incremental disk copy
@param wait: If true, wait for completion
@return: The response to the command
"""
args = {"uri": uri,
"blk": full_copy,
"inc": incremental_copy}
return self.cmd("migrate", args)
def migrate_set_speed(self, value):
"""
Set maximum speed (in bytes/sec) for migrations.
@param value: Speed in bytes/sec
@return: The response to the command
"""
args = {"value": value}
return self.cmd("migrate_set_speed", args)