普通文本  |  149行  |  5.95 KB

# Copyright 2016 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Reference implementation for reflection in gRPC Python."""

import grpc
from google.protobuf import descriptor_pb2
from google.protobuf import descriptor_pool

from grpc_reflection.v1alpha import reflection_pb2 as _reflection_pb2
from grpc_reflection.v1alpha import reflection_pb2_grpc as _reflection_pb2_grpc

_POOL = descriptor_pool.Default()
SERVICE_NAME = _reflection_pb2.DESCRIPTOR.services_by_name[
    'ServerReflection'].full_name


def _not_found_error():
    return _reflection_pb2.ServerReflectionResponse(
        error_response=_reflection_pb2.ErrorResponse(
            error_code=grpc.StatusCode.NOT_FOUND.value[0],
            error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),
        ))


def _file_descriptor_response(descriptor):
    proto = descriptor_pb2.FileDescriptorProto()
    descriptor.CopyToProto(proto)
    serialized_proto = proto.SerializeToString()
    return _reflection_pb2.ServerReflectionResponse(
        file_descriptor_response=_reflection_pb2.FileDescriptorResponse(
            file_descriptor_proto=(serialized_proto,)),)


class ReflectionServicer(_reflection_pb2_grpc.ServerReflectionServicer):
    """Servicer handling RPCs for service statuses."""

    def __init__(self, service_names, pool=None):
        """Constructor.

    Args:
      service_names: Iterable of fully-qualified service names available.
    """
        self._service_names = tuple(sorted(service_names))
        self._pool = _POOL if pool is None else pool

    def _file_by_filename(self, filename):
        try:
            descriptor = self._pool.FindFileByName(filename)
        except KeyError:
            return _not_found_error()
        else:
            return _file_descriptor_response(descriptor)

    def _file_containing_symbol(self, fully_qualified_name):
        try:
            descriptor = self._pool.FindFileContainingSymbol(
                fully_qualified_name)
        except KeyError:
            return _not_found_error()
        else:
            return _file_descriptor_response(descriptor)

    def _file_containing_extension(self, containing_type, extension_number):
        try:
            message_descriptor = self._pool.FindMessageTypeByName(
                containing_type)
            extension_descriptor = self._pool.FindExtensionByNumber(
                message_descriptor, extension_number)
            descriptor = self._pool.FindFileContainingSymbol(
                extension_descriptor.full_name)
        except KeyError:
            return _not_found_error()
        else:
            return _file_descriptor_response(descriptor)

    def _all_extension_numbers_of_type(self, containing_type):
        try:
            message_descriptor = self._pool.FindMessageTypeByName(
                containing_type)
            extension_numbers = tuple(
                sorted(
                    extension.number
                    for extension in self._pool.FindAllExtensions(
                        message_descriptor)))
        except KeyError:
            return _not_found_error()
        else:
            return _reflection_pb2.ServerReflectionResponse(
                all_extension_numbers_response=_reflection_pb2.
                ExtensionNumberResponse(
                    base_type_name=message_descriptor.full_name,
                    extension_number=extension_numbers))

    def _list_services(self):
        return _reflection_pb2.ServerReflectionResponse(
            list_services_response=_reflection_pb2.ListServiceResponse(
                service=[
                    _reflection_pb2.ServiceResponse(name=service_name)
                    for service_name in self._service_names
                ]))

    def ServerReflectionInfo(self, request_iterator, context):
        # pylint: disable=unused-argument
        for request in request_iterator:
            if request.HasField('file_by_filename'):
                yield self._file_by_filename(request.file_by_filename)
            elif request.HasField('file_containing_symbol'):
                yield self._file_containing_symbol(
                    request.file_containing_symbol)
            elif request.HasField('file_containing_extension'):
                yield self._file_containing_extension(
                    request.file_containing_extension.containing_type,
                    request.file_containing_extension.extension_number)
            elif request.HasField('all_extension_numbers_of_type'):
                yield self._all_extension_numbers_of_type(
                    request.all_extension_numbers_of_type)
            elif request.HasField('list_services'):
                yield self._list_services()
            else:
                yield _reflection_pb2.ServerReflectionResponse(
                    error_response=_reflection_pb2.ErrorResponse(
                        error_code=grpc.StatusCode.INVALID_ARGUMENT.value[0],
                        error_message=grpc.StatusCode.INVALID_ARGUMENT.value[1]
                        .encode(),
                    ))


def enable_server_reflection(service_names, server, pool=None):
    """Enables server reflection on a server.

    Args:
      service_names: Iterable of fully-qualified service names available.
      server: grpc.Server to which reflection service will be added.
      pool: DescriptorPool object to use (descriptor_pool.Default() if None).
    """
    _reflection_pb2_grpc.add_ServerReflectionServicer_to_server(
        ReflectionServicer(service_names, pool=pool), server)