in azure/functions/decorators/function_app.py [0:0]
def kafka_trigger(self,
arg_name: str,
topic: str,
broker_list: str,
event_hub_connection_string: Optional[str] = None,
consumer_group: Optional[str] = None,
avro_schema: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
ssl_key_location: Optional[str] = None,
ssl_ca_location: Optional[str] = None,
ssl_certificate_location: Optional[str] = None,
ssl_key_password: Optional[str] = None,
schema_registry_url: Optional[str] = None,
schema_registry_username: Optional[str] = None,
schema_registry_password: Optional[str] = None,
o_auth_bearer_method: Optional[Union[OAuthBearerMethod, str]] = None, # noqa E501
o_auth_bearer_client_id: Optional[str] = None,
o_auth_bearer_client_secret: Optional[str] = None,
o_auth_bearer_scope: Optional[str] = None,
o_auth_bearer_token_endpoint_url: Optional[str] = None,
o_auth_bearer_extensions: Optional[str] = None,
authentication_mode: Optional[Union[BrokerAuthenticationMode, str]] = "NotSet", # noqa E501
protocol: Optional[Union[BrokerProtocol, str]] = "NotSet", # noqa E501
cardinality: Optional[Union[Cardinality, str]] = "One",
lag_threshold: int = 1000,
data_type: Optional[Union[DataType, str]] = None,
**kwargs) -> Callable[..., Any]:
"""
The kafka_trigger decorator adds
:class:`KafkaTrigger`
to the :class:`FunctionBuilder` object
for building :class:`Function` object used in worker function
indexing model. This is equivalent to defining kafka trigger
in the function.json which enables function to be triggered to
respond to an event sent to a kafka topic.
All optional fields will be given default value by function host when
they are parsed by function host.
Ref: https://aka.ms/kafkatrigger
:param arg_name: the variable name used in function code for the
parameter that has the kafka event data.
:param topic: The topic monitored by the trigger.
:param broker_list: The list of Kafka brokers monitored by the trigger.
:param event_hub_connection_string: The name of an app setting that
contains the connection string for the eventhub when using Kafka
protocol header feature of Azure EventHubs.
:param consumer_group: Kafka consumer group used by the trigger.
:param avro_schema: This should be used only if a generic record
should be generated.
:param username: SASL username for use with the PLAIN and SASL-SCRAM-..
mechanisms. Default is empty string. This is equivalent to
'sasl.username' in librdkafka.
:param password: SASL password for use with the PLAIN and SASL-SCRAM-..
mechanisms. Default is empty string. This is equivalent to
'sasl.password' in librdkafka.
:param ssl_key_location: Path to client's private key (PEM) used for
authentication. Default is empty string. This is equivalent to
'ssl.key.location' in librdkafka.
:param ssl_ca_location: Path to CA certificate file for verifying the
broker's certificate. This is equivalent to 'ssl.ca.location' in
librdkafka.
:param ssl_certificate_location: Path to client's certificate. This is
equivalent to 'ssl.certificate.location' in librdkafka.
:param ssl_key_password: Password for client's certificate. This is
equivalent to 'ssl.key.password' in librdkafka.
:param schema_registry_url: URL for the Avro Schema Registry.
:param schema_registry_username: Username for the Avro Schema Registry.
:param schema_registry_password: Password for the Avro Schema Registry.
:param o_auth_bearer_method: Either 'default' or 'oidc'.
sasl.oauthbearer in librdkafka.
:param o_auth_bearer_client_id: Specify only when o_auth_bearer_method
is 'oidc'. sasl.oauthbearer.client.id in librdkafka.
:param o_auth_bearer_client_secret: Specify only when
o_auth_bearer_method is 'oidc'. sasl.oauthbearer.client.secret in
librdkafka.
:param o_auth_bearer_scope: Specify only when o_auth_bearer_method
is 'oidc'. Client use this to specify the scope of the access request
to the broker. sasl.oauthbearer.scope in librdkafka.
:param o_auth_bearer_token_endpoint_url: Specify only when
o_auth_bearer_method is 'oidc'. sasl.oauthbearer.token.endpoint.url
in librdkafka.
:param o_auth_bearer_extensions: Allow additional information to be
provided to the broker. Comma-separated list of key=value pairs. E.g.,
"supportFeatureX=true,organizationId=sales-emea".
sasl.oauthbearer.extensions in librdkafka
:param authentication_mode: SASL mechanism to use for authentication.
Allowed values: Gssapi, Plain, ScramSha256, ScramSha512. Default is
Plain. This is equivalent to 'sasl.mechanism' in librdkafka.
:param protocol: Gets or sets the security protocol used to communicate
with brokers. Default is plain text. This is equivalent to
'security.protocol' in librdkafka. TODO
:param lag_threshold: Maximum number of unprocessed messages a worker
is expected to have at an instance. When target-based scaling is not
disabled, this is used to divide total unprocessed event count to
determine the number of worker instances, which will then be rounded
up to a worker instance count that creates a balanced partition
distribution. Default is 1000.
:param data_type: Defines how Functions runtime should treat the
parameter value.
:param kwargs: Keyword arguments for specifying additional binding
fields to include in the binding json
:return: Decorator function.
"""
@self._configure_function_builder
def wrap(fb):
def decorator():
fb.add_trigger(
trigger=KafkaTrigger(
name=arg_name,
topic=topic,
broker_list=broker_list,
event_hub_connection_string=event_hub_connection_string, # noqa: E501
consumer_group=consumer_group,
avro_schema=avro_schema,
username=username,
password=password,
ssl_key_location=ssl_key_location,
ssl_ca_location=ssl_ca_location,
ssl_certificate_location=ssl_certificate_location,
ssl_key_password=ssl_key_password,
schema_registry_url=schema_registry_url,
schema_registry_username=schema_registry_username,
schema_registry_password=schema_registry_password,
o_auth_bearer_method=parse_singular_param_to_enum(
o_auth_bearer_method, OAuthBearerMethod),
o_auth_bearer_client_id=o_auth_bearer_client_id,
o_auth_bearer_client_secret=o_auth_bearer_client_secret, # noqa: E501
o_auth_bearer_scope=o_auth_bearer_scope,
o_auth_bearer_token_endpoint_url=o_auth_bearer_token_endpoint_url, # noqa: E501
o_auth_bearer_extensions=o_auth_bearer_extensions,
authentication_mode=parse_singular_param_to_enum(
authentication_mode, BrokerAuthenticationMode),
protocol=parse_singular_param_to_enum(protocol,
BrokerProtocol),
cardinality=parse_singular_param_to_enum(cardinality,
Cardinality),
lag_threshold=lag_threshold,
data_type=parse_singular_param_to_enum(data_type,
DataType),
**kwargs))
return fb
return decorator()
return wrap