#!/usr/bin/python
#
# Copyright (C) 2010 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Test server
A detailed description of tms.
"""
__author__ = 'wink@google.com (Wink Saville)'
import socket
import struct
import sys
import time
import ctrl_pb2
import ril_pb2
import msgheader_pb2
def recvall(s, count):
"""Receive all of the data otherwise return none.
Args:
s: socket
count: number of bytes
Returns:
data received
None if no data is received
"""
all_data = []
while (count > 0):
data = s.recv(count)
if (len(data) == 0):
return None
count -= len(data)
all_data.append(data)
result = ''.join(all_data);
return result
def sendall(s, data):
"""Send all of the data
Args:
s: socket
count: number of bytes
Returns:
Nothing
"""
s.sendall(data)
class MsgHeader:
"""A fixed length message header; cmd, token, status and length of protobuf."""
def __init__(self):
self.cmd = 0
self.token = 0
self.status = 0
self.length_protobuf = 0
def sendHeader(self, s):
"""Send the header to the socket
Args:
s: socket
Returns
nothing
"""
mh = msgheader_pb2.MsgHeader()
mh.cmd = self.cmd
mh.token = self.token
mh.status = self.status
mh.length_data = self.length_protobuf
mhser = mh.SerializeToString()
len_msg_header_raw = struct.pack('<i', len(mhser))
sendall(s, len_msg_header_raw)
sendall(s, mhser)
def recvHeader(self, s):
"""Receive the header from the socket"""
len_msg_header_raw = recvall(s, 4)
len_msg_hdr = struct.unpack('<i', len_msg_header_raw)
mh = msgheader_pb2.MsgHeader()
mh_raw = recvall(s, len_msg_hdr[0])
mh.ParseFromString(mh_raw)
self.cmd = mh.cmd
self.token = mh.token
self.status = mh.status
self.length_protobuf = mh.length_data;
class Msg:
"""A message consists of a fixed length MsgHeader followed by a protobuf.
This class sends and receives messages, when sending the status
will be zero and when receiving the protobuf field will be an
empty string if there was no protobuf.
"""
def __init__(self):
"""Initialize the protobuf to None and header to an empty"""
self.protobuf = None
self.header = MsgHeader()
# Keep a local copy of header for convenience
self.cmd = 0
self.token = 0
self.status = 0
def sendMsg(self, s, cmd, token, protobuf=''):
"""Send a message to a socket
Args:
s: socket
cmd: command to send
token: token to send, will be returned unmodified
protobuf: optional protobuf to send
Returns
nothing
"""
self.cmd = cmd
self.token = token
self.status = 0
self.protobuf = protobuf
self.header.cmd = self.cmd
self.header.token = self.token
self.header.status = self.status
if (len(protobuf) > 0):
self.header.length_protobuf = len(protobuf)
else:
self.header.length_protobuf = 0
self.protobuf = ''
self.header.sendHeader(s)
if (self.header.length_protobuf > 0):
sendall(s, self.protobuf)
def recvMsg(self, s):
"""Receive a message from a socket
Args:
s: socket
Returns:
nothing
"""
self.header.recvHeader(s)
self.cmd = self.header.cmd
self.token = self.header.token
self.status = self.header.status
if (self.header.length_protobuf > 0):
self.protobuf = recvall(s, self.header.length_protobuf)
else:
self.protobuf = ''
def main(argv):
"""Create a socket and connect.
Before using you'll need to forward the port
used by mock_ril, 54312 to a port on the PC.
The following worked for me:
adb forward tcp:11111 tcp:54312.
Then you can execute this test using:
tms.py 127.0.0.1 11111
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = sys.argv[1] # server address
print "host=%s" % host
port = int(sys.argv[2]) # server port
print "port=%d" % port
s.connect((host, port))
# Create an object which is serializable to a protobuf
rrs = ctrl_pb2.CtrlRspRadioState()
rrs.state = ril_pb2.RADIOSTATE_UNAVAILABLE
print "rrs.state=%d" % (rrs.state)
# Serialize
rrs_ser = rrs.SerializeToString()
print "len(rrs_ser)=%d" % (len(rrs_ser))
# Deserialize
rrs_new = ctrl_pb2.CtrlRspRadioState()
rrs_new.ParseFromString(rrs_ser)
print "rrs_new.state=%d" % (rrs_new.state)
# Do an echo test
req = Msg()
req.sendMsg(s, 0, 1234567890123, rrs_ser)
resp = Msg()
resp.recvMsg(s)
response = ctrl_pb2.CtrlRspRadioState()
response.ParseFromString(resp.protobuf)
print "cmd=%d" % (resp.cmd)
print "token=%d" % (resp.token)
print "status=%d" % (resp.status)
print "len(protobuf)=%d" % (len(resp.protobuf))
print "response.state=%d" % (response.state)
if ((resp.cmd == 0) & (resp.token == 1234567890123) &
(resp.status == 0) & (response.state == 1)):
print "SUCCESS: echo ok"
else:
print "ERROR: expecting cmd=0 token=1234567890123 status=0 state=1"
# Test CTRL_GET_RADIO_STATE
req.sendMsg(s, ctrl_pb2.CTRL_CMD_GET_RADIO_STATE, 4)
resp = Msg()
resp.recvMsg(s)
print "cmd=%d" % (resp.cmd)
print "token=%d" % (resp.token)
print "status=%d" % (resp.status)
print "length_protobuf=%d" % (len(resp.protobuf))
if (resp.cmd == ctrl_pb2.CTRL_CMD_GET_RADIO_STATE):
response = ctrl_pb2.CtrlRspRadioState()
response.ParseFromString(resp.protobuf)
print "SUCCESS: response.state=%d" % (response.state)
else:
print "ERROR: expecting resp.cmd == ctrl_pb2.CTRL_CMD_GET_RADIO_STATE"
# Close socket
print "closing socket"
s.close()
if __name__ == '__main__':
main(sys.argv)