python-example-elasticsearch-extension/extensions/logs_api_elasticsearch_extension/extensions_api_client.py [12:62]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
LAMBDA_AGENT_NAME_HEADER_KEY = "Lambda-Extension-Name"
LAMBDA_AGENT_IDENTIFIER_HEADER_KEY = "Lambda-Extension-Identifier"

class ExtensionsAPIClient():
    def __init__(self):
        try:
            runtime_api_address = os.environ['AWS_LAMBDA_RUNTIME_API']
            self.runtime_api_base_url = f"http://{runtime_api_address}/2020-01-01/extension"
        except Exception as e:
            raise Exception(f"AWS_LAMBDA_RUNTIME_API is not set {e}") from e

    # Register as early as possible - the runtime initialization starts after all extensions have registered.
    def register(self, agent_unique_name, registration_body):
        try:
            print(f"Registering to ExtensionsAPIClient on {self.runtime_api_base_url}")
            req = urllib.request.Request(f"{self.runtime_api_base_url}/register")
            req.method = 'POST'
            req.add_header(LAMBDA_AGENT_NAME_HEADER_KEY, agent_unique_name)
            req.add_header("Content-Type", "application/json")
            data = json.dumps(registration_body).encode("utf-8")
            req.data = data
            resp = urllib.request.urlopen(req)
            if resp.status != 200:
                print(f"/register request to ExtensionsAPIClient failed. Status:  {resp.status}, Response: {resp.read()}")
                # Fail the extension
                sys.exit(1)
            agent_identifier = resp.headers.get(LAMBDA_AGENT_IDENTIFIER_HEADER_KEY)
            return agent_identifier
        except Exception as e:
            raise Exception(f"Failed to register to ExtensionsAPIClient: on {self.runtime_api_base_url}/register \
                with agent_unique_name:{agent_unique_name}  \
                and registration_body:{registration_body}\nError: {e}") from e

    # Call the following method when the extension is ready to receive the next invocation
    # and there is no job it needs to execute beforehand.
    def next(self, agent_id):
        try:
            req = urllib.request.Request(f"{self.runtime_api_base_url}/event/next")
            req.method = 'GET'
            req.add_header(LAMBDA_AGENT_IDENTIFIER_HEADER_KEY, agent_id)
            req.add_header("Content-Type", "application/json")
            resp = urllib.request.urlopen(req)
            if resp.status != 200:
                print(f"/event/next request to ExtensionsAPIClient failed. Status: {resp.status}, Response: {resp.read()} ")
                # Fail the extension
                sys.exit(1)
            data = resp.read()
            print(f"Received response from ExtensionsAPIClient: {data}")
            return data
        except Exception as e:
            raise Exception(f"Failed to get /event/next from ExtensionsAPIClient: {e}") from e
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



python-example-logs-api-extension/extensions/logs_api_http_extension/extensions_api_client.py [12:62]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
LAMBDA_AGENT_NAME_HEADER_KEY = "Lambda-Extension-Name"
LAMBDA_AGENT_IDENTIFIER_HEADER_KEY = "Lambda-Extension-Identifier"

class ExtensionsAPIClient():
    def __init__(self):
        try:
            runtime_api_address = os.environ['AWS_LAMBDA_RUNTIME_API']
            self.runtime_api_base_url = f"http://{runtime_api_address}/2020-01-01/extension"
        except Exception as e:
            raise Exception(f"AWS_LAMBDA_RUNTIME_API is not set {e}") from e

    # Register as early as possible - the runtime initialization starts after all extensions have registered.
    def register(self, agent_unique_name, registration_body):
        try:
            print(f"Registering to ExtensionsAPIClient on {self.runtime_api_base_url}")
            req = urllib.request.Request(f"{self.runtime_api_base_url}/register")
            req.method = 'POST'
            req.add_header(LAMBDA_AGENT_NAME_HEADER_KEY, agent_unique_name)
            req.add_header("Content-Type", "application/json")
            data = json.dumps(registration_body).encode("utf-8")
            req.data = data
            resp = urllib.request.urlopen(req)
            if resp.status != 200:
                print(f"/register request to ExtensionsAPIClient failed. Status:  {resp.status}, Response: {resp.read()}")
                # Fail the extension
                sys.exit(1)
            agent_identifier = resp.headers.get(LAMBDA_AGENT_IDENTIFIER_HEADER_KEY)
            return agent_identifier
        except Exception as e:
            raise Exception(f"Failed to register to ExtensionsAPIClient: on {self.runtime_api_base_url}/register \
                with agent_unique_name:{agent_unique_name}  \
                and registration_body:{registration_body}\nError: {e}") from e

    # Call the following method when the extension is ready to receive the next invocation
    # and there is no job it needs to execute beforehand.
    def next(self, agent_id):
        try:
            req = urllib.request.Request(f"{self.runtime_api_base_url}/event/next")
            req.method = 'GET'
            req.add_header(LAMBDA_AGENT_IDENTIFIER_HEADER_KEY, agent_id)
            req.add_header("Content-Type", "application/json")
            resp = urllib.request.urlopen(req)
            if resp.status != 200:
                print(f"/event/next request to ExtensionsAPIClient failed. Status: {resp.status}, Response: {resp.read()} ")
                # Fail the extension
                sys.exit(1)
            data = resp.read()
            print(f"Received response from ExtensionsAPIClient: {data}")
            return data
        except Exception as e:
            raise Exception(f"Failed to get /event/next from ExtensionsAPIClient: {e}") from e
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



