#!/usr/bin/python
#
# Copyright 2017 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Classes for generic netlink."""
import collections
from socket import * # pylint: disable=wildcard-import
import struct
import cstruct
import netlink
### Generic netlink constants. See include/uapi/linux/genetlink.h.
# The generic netlink control family.
GENL_ID_CTRL = 16
# Commands.
CTRL_CMD_GETFAMILY = 3
# Attributes.
CTRL_ATTR_FAMILY_ID = 1
CTRL_ATTR_FAMILY_NAME = 2
CTRL_ATTR_VERSION = 3
CTRL_ATTR_HDRSIZE = 4
CTRL_ATTR_MAXATTR = 5
CTRL_ATTR_OPS = 6
CTRL_ATTR_MCAST_GROUPS = 7
# Attributes netsted inside CTRL_ATTR_OPS.
CTRL_ATTR_OP_ID = 1
CTRL_ATTR_OP_FLAGS = 2
# Data structure formats.
# These aren't constants, they're classes. So, pylint: disable=invalid-name
Genlmsghdr = cstruct.Struct("genlmsghdr", "BBxx", "cmd version")
class GenericNetlink(netlink.NetlinkSocket):
"""Base class for all generic netlink classes."""
NL_DEBUG = []
def __init__(self):
super(GenericNetlink, self).__init__(netlink.NETLINK_GENERIC)
def _SendCommand(self, family, command, version, data, flags):
genlmsghdr = Genlmsghdr((command, version))
self._SendNlRequest(family, genlmsghdr.Pack() + data, flags)
def _Dump(self, family, command, version):
msg = Genlmsghdr((command, version))
return super(GenericNetlink, self)._Dump(family, msg, Genlmsghdr, "")
class GenericNetlinkControl(GenericNetlink):
"""Generic netlink control class.
This interface is used to manage other generic netlink families. We currently
use it only to find the family ID for address families of interest."""
def _DecodeOps(self, data):
ops = []
Op = collections.namedtuple("Op", ["id", "flags"])
while data:
# Skip the nest marker.
datalen, index, data = data[:2], data[2:4], data[4:]
nla, nla_data, data = self._ReadNlAttr(data)
if nla.nla_type != CTRL_ATTR_OP_ID:
raise ValueError("Expected CTRL_ATTR_OP_ID, got %d" % nla.nla_type)
op_id = struct.unpack("=I", nla_data)[0]
nla, nla_data, data = self._ReadNlAttr(data)
if nla.nla_type != CTRL_ATTR_OP_FLAGS:
raise ValueError("Expected CTRL_ATTR_OP_FLAGS, got %d" % nla.type)
op_flags = struct.unpack("=I", nla_data)[0]
ops.append(Op(op_id, op_flags))
return ops
def _Decode(self, command, msg, nla_type, nla_data):
"""Decodes generic netlink control attributes to human-readable format."""
name = self._GetConstantName(__name__, nla_type, "CTRL_ATTR_")
if name == "CTRL_ATTR_FAMILY_ID":
data = struct.unpack("=H", nla_data)[0]
elif name == "CTRL_ATTR_FAMILY_NAME":
data = nla_data.strip("\x00")
elif name in ["CTRL_ATTR_VERSION", "CTRL_ATTR_HDRSIZE", "CTRL_ATTR_MAXATTR"]:
data = struct.unpack("=I", nla_data)[0]
elif name == "CTRL_ATTR_OPS":
data = self._DecodeOps(nla_data)
else:
data = nla_data
return name, data
def GetFamily(self, name):
"""Returns the family ID for the specified family name."""
data = self._NlAttrStr(CTRL_ATTR_FAMILY_NAME, name)
self._SendCommand(GENL_ID_CTRL, CTRL_CMD_GETFAMILY, 0, data, netlink.NLM_F_REQUEST)
hdr, attrs = self._GetMsg(Genlmsghdr)
return attrs["CTRL_ATTR_FAMILY_ID"]
if __name__ == "__main__":
g = GenericNetlinkControl()
print g.GetFamily("tcp_metrics")