callouts/python/extproc/example/e2e_tests/metadata_server.py (48 lines of code) (raw):

# Copyright 2024 Google LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from grpc import ServicerContext from envoy.service.ext_proc.v3 import external_processor_pb2 as service_pb2 from envoy.service.ext_proc.v3.external_processor_pb2 import ProcessingRequest, ProcessingResponse from extproc.service import callout_server from extproc.service import callout_tools from google.protobuf.wrappers_pb2 import StringValue from google.protobuf import any_pb2 def unpack_string(value: any_pb2.Any) -> str: """Unpacks a string value from a protobuf Any object. Args: value (google.protobuf.any_pb2.Any): The Any object containing the string value. Returns: str: The unpacked string value. """ unpacked_value = StringValue() value.Unpack(unpacked_value) return unpacked_value.value def check_metadata(request: service_pb2.ProcessingRequest) -> bool: """Check if the request contains 'fr' metadata. Args: request (service_pb2.ProcessingRequest): The processing request to check. Returns: bool: True if the 'fr' metadata is present and has a non-empty string value, False otherwise. """ if not request.HasField('metadata_context'): logging.info('No metadata context.') return False fr_data = None for _, feild_data in request.metadata_context.filter_metadata.items(): if 'fr' in feild_data.fields: fr_data = feild_data.fields['fr'] break if fr_data is None: logging.info('No "fr" metadata.') return False logging.info('Contains "fr" key: %s', fr_data) return fr_data.HasField('string_value') and fr_data.string_value != '' default_headers = [('service-callout-response-intercept', 'intercepted'), ('x-used-deltas-gfe3', ''), ('x-used-staging-gfe3', ''), ('x-ext-proc', '')] class CalloutServerExample(callout_server.CalloutServer): """Example callout server for use in e2e metadata testing.""" def process(self, request: ProcessingRequest, context: ServicerContext) -> ProcessingResponse: """Process the incoming request. Args: request (ProcessingRequest): The processing request received. context (ServicerContext): The context object for the gRPC service. Returns: ProcessingResponse: The processing response to be sent back. """ logging.info('Received request %s.', request) if request.HasField('response_body'): return ProcessingResponse( response_body=callout_tools.add_body_mutation('e2e-test')) if not check_metadata(request): return ProcessingResponse( response_headers=callout_tools.add_header_mutation( default_headers),) return ProcessingResponse( response_headers=callout_tools.add_header_mutation( add=[('metadata', 'found')] + default_headers),) if __name__ == '__main__': # Setup command line args. args = callout_tools.add_command_line_args().parse_args() # Set the debug level. logging.basicConfig(level=logging.DEBUG) logging.info('Starting e2e_test server v7.') # Run the gRPC service. CalloutServerExample(**vars(args)).run()