#!/bin/env python
#
# Copyright (C) 2014 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import os
import re
import sys
import tempfile
import textwrap
import uuid
from xml.etree import ElementTree
from xml.dom import minidom
# We are dealing with unicode data. It is extremely important to choose between
# the |unicode| type and the |str| type with unicode encoding as the default
# storage type for strings, and stick to it.
# - All strings except filenames and such are of type |unicode|
# - Note that the xml.etree.ElementTree.parse function actually returns
# strings in the |str| type. These will be implicitly coerced to |unicode|
# as needed. If you don't like this, add a phase to explicitly cast these
# strings.
# - Whenever using the |str| type, use the suffix |_str|
# - Moreover, whenever using |str| type with |ascii| encoding, using
# |_str_ascii| suffix
FILE_ENCODING = 'utf-8'
class ConverterError(Exception):
pass
class ServiceProvidersConverter(object):
""" Convert the ServiceProviders XML into protobuf format. """
def __init__(self, file_path, out_file_path=None):
"""
@param file_path: Absolute path to the XML file to read
@param out_file_path: Absolute path to the file to which the output
should be written.
"""
self._file_path = file_path
self._out_file_path = out_file_path
self._gsm_nodes_no_mccmnc = set()
self._gsm_nodes_by_mccmnc = {}
self._mcc_mnc_by_mccmnc = {}
# Book-keeping to sanity check the total number of providers converted,
# and detailed information about the conversion.
self._xml_cdma_nodes = 0
self._xml_gsm_nodes = 0
self._protobuf_mnos_dumped = 0
self._protobuf_mvnos_dumped = 0
self._protobuf_gsm_mnos = 0
self._protobuf_cdma_mnos = 0
self._protobuf_gsm_mvnos = 0
self._protobuf_gsm_unique_mvnos = 0
# Turns out some MVNOs are MNOs using a different MCCMNC.
self._protobuf_gsm_mvnos_mnos = 0
# Remember nodes that we decide to drop at any point.
self._dropped_nodes = set()
# Related to the actual protobuf output:
self._indent = 0
def Convert(self):
""" Top level function for the conversion. """
parser = ElementTree.XMLParser(encoding=FILE_ENCODING)
element_tree = ElementTree.parse(self._file_path, parser=parser)
self._root = element_tree.getroot()
logging.info('Dumping parsed XML')
self._DumpXMLToTempFile()
self._xml_cdma_nodes = len(self._root.findall(u'.//cdma'))
self._xml_gsm_nodes = len(self._root.findall(u'.//gsm'))
self._TransformXML()
logging.info('Dumping transformed XML.')
self._DumpXMLToTempFile()
self._GroupGSMNodesByMCCMNC()
self._FindPrimaryNodes()
if self._out_file_path is not None:
with open(self._out_file_path, 'w') as self._out_file:
self._SpewProtobuf()
else:
self._out_file = sys.stdout
self._SpewProtobuf()
self._RunStatsDiagnostics()
def _CheckStatsEqual(self, lhs, lhs_name, rhs, rhs_name):
"""
Test that |lhs| == |rhs| and log appropriate message.
@param lhs: One value to compare.
@param lhs_name: str name to be used for |lhs| for logging.
@param rhs: Other value to compare.
@param rhs_name: str name to be used for |rhs| for logging.
@return True if check passes, False otherwise.
"""
result = (lhs == rhs)
logger = logging.info if result else logging.error
message = 'PASS' if result else 'FAIL'
logger('Sanity check: (%s) == (%s) (%d == %d) **%s**',
lhs_name, rhs_name, lhs, rhs, message)
return result
def _RunStatsDiagnostics(self):
""" Checks that the stats about nodes found / dumped tally. """
# First dump dropped nodes.
if len(self._dropped_nodes) > 0:
logging.warning('Following nodes were dropped:')
for node in self._dropped_nodes:
logging.info(self._PPrintXML(node).encode(FILE_ENCODING))
logging.info('######################')
logging.info('Conversion diagnostics')
logging.info('######################')
logging.info('Total number of XML CDMA nodes read [xml_cdma_nodes]: %d',
self._xml_cdma_nodes)
logging.info('Total number of XML GSM nodes read [xml_gsm_nodes]: %d',
self._xml_gsm_nodes)
logging.info('Total number of XML nodes read '
'[xml_nodes = xml_cdma_nodes + xml_gsm_nodes]: %d',
self._xml_cdma_nodes + self._xml_gsm_nodes)
logging.info('Total number of protobuf MNOs dumped '
'[protobuf_mnos_dumped]: %d',
self._protobuf_mnos_dumped)
logging.info('Total number of protobuf MVNOs dumped '
'[protobuf_mvnos_dumped]: %d',
self._protobuf_mvnos_dumped)
logging.info('Total number of protobuf nodes dropped '
'[protobuf_dropped_nodes]: %d',
len(self._dropped_nodes))
logging.info(' (See above for the exact nodes dropped)')
logging.info('Total number of protobuf CDMA MNOs '
'[protobuf_cdma_mnos]: %d',
self._protobuf_cdma_mnos)
logging.info('Total number of protobuf GSM MNOs '
'[protobuf_gsm_mnos]: %d',
self._protobuf_gsm_mnos)
logging.info('Total number of protobuf GSM MVNOs '
'[protobuf_gsm_mvnos]: %d',
self._protobuf_gsm_mvnos)
logging.info('Total number of protobuf unique GSM MVNOs. '
'[protobuf_gsm_unique_mvnos]: %d',
self._protobuf_gsm_unique_mvnos)
logging.info(' (Some MVNOs may appear in multiple MNOs)')
logging.info('Total number of protobuf GSM MVNOs that are also MNOs. '
'[protobuf_gsm_mvnos_mnos]: %d',
self._protobuf_gsm_mvnos_mnos)
check_results = []
check_results.append(self._CheckStatsEqual(
self._protobuf_mnos_dumped,
'protobuf_mnos_dumped',
self._protobuf_cdma_mnos + self._protobuf_gsm_mnos,
'protobuf_cdma_mnos + protobuf_gsm_mnos'))
check_results.append(self._CheckStatsEqual(
self._protobuf_mnos_dumped + self._protobuf_mvnos_dumped,
'protobuf_mnos_dumped + protobuf_mvnos_dumped',
(self._protobuf_cdma_mnos +
self._protobuf_gsm_mnos +
self._protobuf_gsm_mvnos),
'protobuf_cdma_mnos + protobuf_gsm_mnos + protobuf_gsm_mvnos'))
check_results.append(self._CheckStatsEqual(
self._xml_cdma_nodes + self._xml_gsm_nodes,
'xml_cdma_nodes + xml_gsm_nodes',
(len(self._dropped_nodes) +
self._protobuf_gsm_mnos +
self._protobuf_cdma_mnos +
self._protobuf_gsm_unique_mvnos -
self._protobuf_gsm_mvnos_mnos),
('protobuf_dropped_nodes + '
'protobuf_gsm_mnos + protobuf_cdma_mnos + '
'protobuf_gsm_unique_mvnos - protobuf_gsm_mvnos_mnos')))
if False in check_results:
self._LogAndRaise('StatsDiagnostics failed.')
def _DumpXMLToTempFile(self):
""" Dumps the parsed XML to a temp file for debugging. """
fd, fname = tempfile.mkstemp(prefix='converter_')
logging.info('Dumping XML to file %s', fname)
with os.fdopen(fd, 'w') as fout:
fout.write(self._PPrintXML(self._root).encode(FILE_ENCODING))
def _EnrichNode(self, node, country_code, primary, roaming_required, names,
provider_type):
"""
Adds the information passed in as children of |node|.
@param node: The XML node to enrich.
@param country_code: The country code for node. Type: str.
@param primary: Is this node a primary provider. Type: str
@param roaming_required: Does this provider requires roaming. Type: str.
@param names: List of names for this provider. Type: [(str, str)].
@param provider_type: Is this node 'gsm'/'cdma'. Type: str.
"""
ElementTree.SubElement(node, u'country', {u'code': country_code})
provider_map = {}
provider_map[u'type'] = provider_type
if primary is not None:
provider_map[u'primary'] = primary
if roaming_required is not None:
provider_map[u'roaming-required'] = roaming_required
ElementTree.SubElement(node, u'provider', provider_map)
for name, lang in names:
name_map = {}
if lang is not None:
name_map[u'xml:lang'] = lang
name_node = ElementTree.SubElement(node, u'name', name_map)
name_node.text = name
def _TransformXML(self):
"""
Store the country, provider, name, type (gsm/cdma) under the
|gsm|/|cdma| nodes. This allows us to directly deal with these nodes
instead of going down the tree.
"""
# First find all nodes to be modified, since we can't iterate the tree
# while modifying it.
nodes = {}
for country_node in self._root.findall(u'country'):
cur_country = country_node.get(u'code')
for provider_node in country_node.findall(u'provider'):
primary = provider_node.get(u'primary')
roaming_required = provider_node.get(u'roaming-required')
names = [(name_node.text, name_node.get(u'xml:lang')) for
name_node in provider_node.findall(u'name')]
for gsm_node in provider_node.findall(u'gsm'):
nodes[gsm_node] = (cur_country,
primary,
roaming_required,
names,
u'gsm')
for cdma_node in provider_node.findall(u'cdma'):
# Some CDMA providers have a special name under the <cdma>
# node. This name should *override* the names given outside.
if cdma_node.find(u'name') is not None:
names = []
nodes[cdma_node] = (cur_country,
primary,
roaming_required,
names,
u'cdma')
# Now, iterate through all those nodes and update the tree.
for node, args in nodes.iteritems():
self._EnrichNode(node, *args)
def _CheckAmbiguousMCCMNC(self, mcc, mnc):
"""
Ensure that no two mcc, mnc pairs concat to the same MCCMNC.
@param mcc: The mcc to check.
@param mnc: The mnc to check.
"""
mccmnc = mcc + mnc
if mccmnc in self._mcc_mnc_by_mccmnc:
old_mcc, old_mnc = self._mcc_mnc_by_mccmnc(mccmnc)
if old_mcc != mcc or old_mnc != mnc:
self._LogAndRaise(u'Ambiguous MCCMNC pairs detected: '
u'(%s, %s) vs. (%s, %s)',
old_mcc, old_mnc, mcc, mnc)
self._mcc_mnc_by_mccmnc[u'mccmnc'] = (mcc, mnc)
def _GroupGSMNodesByMCCMNC(self):
""" Map all GSM nodes with same MCCMNC together. """
for gsm_node in self._root.findall(u'.//gsm'):
network_id_nodes = gsm_node.findall(u'network-id')
if not network_id_nodes:
logging.warning('Found a GSM node with no MCCMNC. ')
self._gsm_nodes_no_mccmnc.add(gsm_node)
continue
for network_id_node in gsm_node.findall(u'network-id'):
mcc = network_id_node.get(u'mcc')
mnc = network_id_node.get(u'mnc')
self._CheckAmbiguousMCCMNC(mcc, mnc)
mccmnc = mcc + mnc
if mccmnc in self._gsm_nodes_by_mccmnc:
self._gsm_nodes_by_mccmnc[mccmnc].append(gsm_node)
else:
self._gsm_nodes_by_mccmnc[mccmnc] = [gsm_node]
def _FindPrimaryNodes(self):
"""
Finds nodes that correspond to MNOs as opposed to MVNOs.
All CDMA nodes are primary, all GSM nodes that have a unique MCCMNC are
primary, GSM nodes with non-unique MCCMNC that explicitly claim to be
primary are primary.
"""
unique_mvnos = set()
self._mvnos = {}
# All cdma nodes are primary.
self._primary_cdma_nodes = set(self._root.findall(u'.//cdma'))
self._protobuf_cdma_mnos = len(self._primary_cdma_nodes)
# Start by marking all nodes with no MCCMNC primary.
self._primary_gsm_nodes = self._gsm_nodes_no_mccmnc
for mccmnc, nodes in self._gsm_nodes_by_mccmnc.iteritems():
mvnos = set()
if len(nodes) == 1:
self._primary_gsm_nodes.add(nodes[0])
continue
# Exactly one node in the list should claim to be primary.
primary = None
for node in nodes:
provider_node = node.find(u'provider')
if (provider_node.get(u'primary') and
provider_node.get(u'primary') == u'true'):
if primary is not None:
self._LogAndRaise(
u'Found two primary gsm nodes with MCCMNC['
u'%s]: \n%s\n%s',
mccmnc, self._PPrintXML(primary),
self._PPrintXML(node))
primary = node
self._primary_gsm_nodes.add(node)
else:
mvnos.add(node)
if primary is None:
logging.warning('Failed to find primary node with '
'MCCMNC[%s]. Will make all of them '
'distinct MNOs', mccmnc)
logging.info('Nodes found:')
for node in nodes:
self._PPrintLogXML(logging.info, node)
self._primary_gsm_nodes = (self._primary_gsm_nodes | set(nodes))
continue
# This primary may already have MVNOs due to another MCCMNC.
existing_mvnos = self._mvnos.get(primary, set())
self._mvnos[primary] = existing_mvnos | mvnos
# Only add to the MVNO count the *new* MVNOs added.
self._protobuf_gsm_mvnos += (len(self._mvnos[primary]) -
len(existing_mvnos))
unique_mvnos = unique_mvnos | mvnos
self._primary_nodes = (self._primary_cdma_nodes |
self._primary_gsm_nodes)
self._protobuf_gsm_mnos = len(self._primary_gsm_nodes)
self._protobuf_gsm_unique_mvnos = len(unique_mvnos)
self._protobuf_gsm_mvnos_mnos = len(
self._primary_gsm_nodes & unique_mvnos)
def _SortOperators(self, node_list):
""" Sort operators by country and name """
# First sort by name.
node_list.sort(cmp=lambda x, y:
cmp(sorted([z.text for z in x.findall(u'name')]),
sorted([z.text for z in y.findall(u'name')])))
# Now sort by country. Since list sort is stable, nodes with the same
# country remain sorted by name.
node_list.sort(cmp=lambda x, y: cmp(x.find(u'country').get(u'code'),
y.find(u'country').get(u'code')))
def _SpewProtobuf(self):
""" Entry function for dumping to prototext format. """
_, fname = os.path.split(__file__)
self._SpewComment("!!! DO NOT EDIT THIS FILE BY HAND !!!");
self._SpewComment("This file is generated by the script %s" % fname)
self._SpewComment("This file was generated from serviceproviders.xml, "
"a public domain database of cellular network "
"operators around the globe.")
primaries = list(self._primary_nodes)
self._SortOperators(primaries)
for node in primaries:
self._protobuf_mnos_dumped += 1
self._SpewMessageBegin(u'mno')
self._SpewData(node)
if node in self._mvnos:
mvnos = list(self._mvnos[node])
self._SortOperators(mvnos)
for mvno_node in mvnos:
self._protobuf_mvnos_dumped += 1
self._SpewMessageBegin(u'mvno')
self._SpewNameFilter(mvno_node)
self._SpewData(mvno_node)
self._SpewMessageEnd(u'mvno')
self._SpewMessageEnd(u'mno')
self._SpewLine()
def _SpewNameFilter(self, node):
name_list = []
for name_node in node.findall(u'name'):
if name_node.text:
name_list.append(name_node.text)
if not name_list:
self._LogAndRaise(
u'Did not find any name for MVNO. Can not create filter.\n'
u'%s', self._PPrintXML(node))
name = u'|'.join(name_list)
self._SpewMessageBegin(u'mvno_filter')
self._SpewEnum(u'type', u'OPERATOR_NAME')
self._SpewString(u'regex', name)
self._SpewMessageEnd(u'mvno_filter')
def _SpewData(self, node):
self._SpewMessageBegin(u'data')
self._SpewString(u'uuid', str(uuid.uuid4()))
country_node = node.find(u'country')
self._SpewString(u'country', country_node.get(u'code'))
provider_node = node.find(u'provider')
provider_type = provider_node.get(u'type')
self._SpewEnum(u'provider_type', provider_type.upper())
roaming_required = provider_node.get(u'roaming-required')
if roaming_required is not None:
self._SpewBool(u'requires_roaming', roaming_required)
for name_node in sorted(node.findall(u'name')):
self._SpewLocalizedNameNode(name_node)
# GSM specific fields.
for network_id_node in sorted(node.findall(u'network-id')):
self._SpewString(u'mccmnc',
network_id_node.get(u'mcc') +
network_id_node.get(u'mnc'))
for apn_node in sorted(node.findall(u'apn')):
self._SpewMobileAPNNode(apn_node)
# CDMA specific fields.
for sid_node in sorted(node.findall(u'sid')):
self._SpewString(u'sid', sid_node.get(u'value'))
# CDMA networks have some extra username/password/dns information that
# corresponds very well with the APN concept of 3GPP, so we map it to an
# MobileAPN instead of storing it specially.
if (node.find(u'username') is not None or
node.find(u'password') is not None or
node.find(u'dns') is not None):
self._SpewMobileAPNNode(node)
self._SpewMessageEnd(u'Data')
def _SpewMobileAPNNode(self, apn_node):
self._SpewMessageBegin(u'mobile_apn')
apn = apn_node.get(u'value')
# This may be None when converting a <cdma> node to MobileAPN node.
if apn is None:
apn=''
self._SpewString(u'apn', apn)
for plan_node in sorted(apn_node.findall(u'plan')):
self._SpewEnum(u'plan', plan_node.get(u'type').upper())
for name_node in sorted(apn_node.findall(u'name')):
self._SpewLocalizedNameNode(name_node)
for gateway_node in apn_node.findall(u'gateway'):
self._SpewString(u'gateway', gateway_node.text)
for username_node in apn_node.findall(u'username'):
self._SpewString(u'username', username_node.text)
for password_node in apn_node.findall(u'password'):
self._SpewString(u'password', password_node.text)
for dns_node in sorted(apn_node.findall(u'dns')):
self._SpewString(u'dns', dns_node.text)
self._SpewMessageEnd(u'mobile_apn')
def _SpewLocalizedNameNode(self, name_node):
self._SpewMessageBegin(u'localized_name')
self._SpewString(u'name', name_node.text)
lang = name_node.get(u'xml:lang')
if lang is not None:
self._SpewString(u'language', lang)
self._SpewMessageEnd(u'localized_name')
def _SpewMessageBegin(self, message_name):
self._SpewLine(message_name, u'{')
self._indent += 1
def _SpewMessageEnd(self, _):
self._indent -= 1
self._SpewLine(u'}')
def _SpewString(self, key, value):
# Treat None |value| as empty string.
if value is None:
value = u''
self._SpewLine(key, u':', u'"' + value + u'"')
def _SpewBool(self, key, value):
self._SpewLine(key, u':', value)
def _SpewEnum(self, key, value):
self._SpewLine(key, u':', value)
def _SpewComment(self, comment):
line_length = 78 - (2 * self._indent)
comment_lines = textwrap.wrap(comment, line_length)
for line in comment_lines:
self._SpewLine(u'# ' + line)
def _SpewLine(self, *args):
indent = (2 * self._indent) * u' '
line = indent + u' '.join(args) + u'\n'
self._out_file.write(line.encode(FILE_ENCODING))
def _PPrintXML(self, node):
""" Returns a pretty-printed |unicode| string for the xml |node|. """
rough_string_str = ElementTree.tostring(node, encoding=FILE_ENCODING)
reparsed = minidom.parseString(rough_string_str)
xml_data_str = reparsed.toprettyxml(indent=u' ',
encoding=FILE_ENCODING)
xml_data = unicode(xml_data_str, FILE_ENCODING)
lines = xml_data.split(u'\n')
lines = [line.strip(u'\n') for line in lines]
lines = [line for line in lines if not line.strip() == u'']
lines = [line.strip(u'\n') for line in lines if line.strip()]
retval = u'\n'.join(lines)
return retval
def _PPrintLogXML(self, logger, node):
""" Logs a given xml |node| to |logger| encoded in 'ascii' format. """
to_print = self._PPrintXML(node)
# Marshall, as best as we can to ASCII.
to_print_str_ascii = to_print.encode('ascii', errors='replace')
lines_str_ascii = to_print_str_ascii.split('\n')
logger('NODE:')
for line_str_ascii in lines_str_ascii:
logger(line_str_ascii)
def _LogAndRaise(self, fmt, *args):
"""
Logs the error encoded in 'ascii' format and raises an error.
@param fmt: The base formatted string for the error.
@param *args: Arguments to format the string |fmt|.
@raises ConverterError
"""
error_string = fmt.format(*args)
# Marshall, as best as we can to ASCII.
error_string_str_ascii = error_string.encode('ascii', errors='replace')
logging.error(error_string_str_ascii)
raise ConverterError(error_string_str_ascii)
def main(prog_name, args):
"""
Entry function to this script.
@param prog_name: Name of the program to display.
@param args: Command line arguments.
"""
logging.basicConfig(level=logging.DEBUG)
if not (1 <= len(args) <= 2):
print("Usage: %s <in_file> [<out_file>]" % prog_name)
sys.exit(1)
in_file_path = args[0]
out_file_path = args[1] if len(args) == 2 else None
converter = ServiceProvidersConverter(in_file_path, out_file_path)
converter.Convert()
if __name__ == '__main__':
main(sys.argv[0], sys.argv[1:])