pulsar/functions/context.py (86 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
Context defines context information available during processing of a request.
"""
from abc import abstractmethod
class Context(object):
"""Interface defining information available at process time"""
@abstractmethod
def get_message_id(self):
"""Return the messageid of the current message that we are processing"""
pass
@abstractmethod
def get_message_key(self):
"""Return the key of the current message that we are processing"""
pass
@abstractmethod
def get_message_eventtime(self):
"""Return the event time of the current message that we are processing"""
pass
@abstractmethod
def get_message_properties(self):
"""Return the message properties kv map of the current message that we are processing"""
pass
@abstractmethod
def get_current_message_topic_name(self):
"""Returns the topic name of the message that we are processing"""
pass
@abstractmethod
def get_function_tenant(self):
"""Returns the tenant of the message that's being processed"""
pass
@abstractmethod
def get_function_namespace(self):
"""Returns the namespace of the message that's being processed"""
@abstractmethod
def get_function_name(self):
"""Returns the function name that we are a part of"""
pass
@abstractmethod
def get_function_id(self):
"""Returns the function id that we are a part of"""
pass
@abstractmethod
def get_instance_id(self):
"""Returns the instance id that is executing the function"""
pass
@abstractmethod
def get_function_version(self):
"""Returns the version of function that we are executing"""
pass
@abstractmethod
def get_logger(self):
"""Returns the logger object that can be used to do logging"""
pass
@abstractmethod
def get_user_config_value(self, key):
"""Returns the value of the user-defined config. If the key doesn't exist, None is returned"""
pass
@abstractmethod
def get_user_config_map(self):
"""Returns the entire user-defined config as a dict
(the dict will be empty if no user-defined config is supplied)"""
pass
@abstractmethod
def get_secret(self, secret_name):
"""Returns the secret value associated with the name. None if nothing was found"""
pass
@abstractmethod
def get_partition_key(self):
"""Returns partition key of the input message is one exists"""
pass
@abstractmethod
def get_ordering_key(self):
"""Returns ordering key of the input message, if one exists"""
pass
@abstractmethod
def record_metric(self, metric_name, metric_value):
"""Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)"""
pass
@abstractmethod
def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None,
compression_type=None, callback=None, message_conf=None):
"""Publishes message to topic_name by first serializing the message using serde_class_name serde
The message will have properties specified if any
The available options for message_conf:
properties,
partition_key,
ordering_key,
sequence_id,
replication_clusters,
disable_replication,
event_timestamp
"""
pass
@abstractmethod
def get_input_topics(self):
"""Returns the input topics of function"""
pass
@abstractmethod
def get_output_topic(self):
"""Returns the output topic of function"""
pass
@abstractmethod
def get_output_serde_class_name(self):
"""return output Serde class"""
pass
@abstractmethod
def ack(self, msgid, topic):
"""ack this message id"""
pass
@abstractmethod
def incr_counter(self, key, amount):
"""incr the counter of a given key in the managed state"""
pass
@abstractmethod
def get_counter(self, key):
"""get the counter of a given key in the managed state"""
pass
@abstractmethod
def del_counter(self, key):
"""delete the counter of a given key in the managed state"""
pass
@abstractmethod
def put_state(self, key, value):
"""update the value of a given key in the managed state"""
pass
@abstractmethod
def get_state(self, key):
"""get the value of a given key in the managed state"""
pass