in azure/functions/decorators/function_app.py [0:0]
def kafka_output(self,
arg_name: str,
topic: str,
broker_list: str,
avro_schema: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
ssl_key_location: Optional[str] = None,
ssl_ca_location: Optional[str] = None,
ssl_certificate_location: Optional[str] = None,
ssl_key_password: Optional[str] = None,
schema_registry_url: Optional[str] = None,
schema_registry_username: Optional[str] = None,
schema_registry_password: Optional[str] = None,
o_auth_bearer_method: Optional[Union[OAuthBearerMethod, str]] = None, # noqa E501
o_auth_bearer_client_id: Optional[str] = None,
o_auth_bearer_client_secret: Optional[str] = None,
o_auth_bearer_scope: Optional[str] = None,
o_auth_bearer_token_endpoint_url: Optional[str] = None,
o_auth_bearer_extensions: Optional[str] = None,
max_message_bytes: int = 1_000_000,
batch_size: int = 10_000,
enable_idempotence: bool = False,
message_timeout_ms: int = 300_000,
request_timeout_ms: int = 5_000,
max_retries: int = 2_147_483_647,
authentication_mode: Optional[Union[BrokerAuthenticationMode, str]] = "NOTSET", # noqa E501
protocol: Optional[Union[BrokerProtocol, str]] = "NOTSET",
linger_ms: int = 5,
data_type: Optional[Union[DataType, str]] = None,
**kwargs) -> Callable[..., Any]:
"""
The kafka_output decorator adds
:class:`KafkaOutput`
to the :class:`FunctionBuilder` object
for building :class:`Function` object used in worker function
indexing model. This is equivalent to defining output binding
in the function.json which enables function to
write events to a kafka topic.
All optional fields will be given default value by function host when
they are parsed by function host.
Ref: https://aka.ms/kafkaoutput
:param arg_name: The variable name used in function code that
represents the event.
:param topic: The topic monitored by the trigger.
:param broker_list: The list of Kafka brokers monitored by the trigger.
:param avro_schema: This should be used only if a generic record
should be generated.
:param username: SASL username for use with the PLAIN and SASL-SCRAM-..
mechanisms. Default is empty string. This is equivalent to
'sasl.username' in librdkafka.
:param password: SASL password for use with the PLAIN and SASL-SCRAM-..
mechanisms. Default is empty string. This is equivalent to
'sasl.password' in librdkafka.
:param ssl_key_location: Path to client's private key (PEM) used for
authentication. Default is empty string. This is equivalent to
'ssl.key.location' in librdkafka.
:param ssl_ca_location: Path to CA certificate file for verifying the
broker's certificate. This is equivalent to 'ssl.ca.location' in
librdkafka.
:param ssl_certificate_location: Path to client's certificate. This is
equivalent to 'ssl.certificate.location' in librdkafka.
:param ssl_key_password: Password for client's certificate. This is
equivalent to 'ssl.key.password' in librdkafka.
:param schema_registry_url: URL for the Avro Schema Registry.
:param schema_registry_username: Username for the Avro Schema Registry.
:param schema_registry_password: Password for the Avro Schema Registry.
:param o_auth_bearer_method: Either 'default' or 'oidc'.
sasl.oauthbearer in librdkafka.
:param o_auth_bearer_client_id: Specify only when o_auth_bearer_method
is 'oidc'. sasl.oauthbearer.client.id in librdkafka.
:param o_auth_bearer_client_secret: Specify only when
o_auth_bearer_method is 'oidc'. sasl.oauthbearer.client.secret in
librdkafka.
:param o_auth_bearer_scope: Specify only when o_auth_bearer_method
is 'oidc'. Client use this to specify the scope of the access request
to the broker. sasl.oauthbearer.scope in librdkafka.
:param o_auth_bearer_token_endpoint_url: Specify only when
o_auth_bearer_method is 'oidc'. sasl.oauthbearer.token.endpoint.url
in librdkafka.
:param o_auth_bearer_extensions: Allow additional information to be
provided to the broker. Comma-separated list of key=value pairs. E.g.,
"supportFeatureX=true,organizationId=sales-emea".
sasl.oauthbearer.extensions in librdkafka
:param max_message_bytes: Maximum transmit message size. Default is 1MB
:param batch_size: Maximum number of messages batched in one MessageSet
Default is 10000.
:param enable_idempotence: When set to `true`, the producer will ensure
that messages are successfully produced exactly once and in the
original produce order. Default is false.
:param message_timeout_ms: Local message timeout. This value is only
enforced locally and limits the time a produced message waits for
successful delivery. A time of 0 is infinite. This is the maximum time
used to deliver a message (including retries). Delivery error occurs
when either the retry count or the message timeout are exceeded.
Default is 300000.
:param request_timeout_ms: The ack timeout of the producer request in
milliseconds. Default is 5000.
:param max_retries: How many times to retry sending a failing Message.
Default is 2147483647. Retrying may cause reordering unless
'EnableIdempotence' is set to 'True'.
:param authentication_mode: SASL mechanism to use for authentication.
Allowed values: Gssapi, Plain, ScramSha256, ScramSha512. Default is
Plain. This is equivalent to 'sasl.mechanism' in librdkafka.
:param protocol: Gets or sets the security protocol used to communicate
with brokers. Default is plain text. This is equivalent to
'security.protocol' in librdkafka.
:param linger_ms: Linger.MS property provides the time between batches
of messages being sent to cluster. Larger value allows more batching
results in high throughput.
:param data_type: Defines how Functions runtime should treat the
parameter value.
:param kwargs: Keyword arguments for specifying additional binding
fields to include in the binding json
:return: Decorator function.
"""
@self._configure_function_builder
def wrap(fb):
def decorator():
fb.add_binding(
binding=KafkaOutput(
name=arg_name,
topic=topic,
broker_list=broker_list,
avro_schema=avro_schema,
username=username,
password=password,
ssl_key_location=ssl_key_location,
ssl_ca_location=ssl_ca_location,
ssl_certificate_location=ssl_certificate_location,
ssl_key_password=ssl_key_password,
schema_registry_url=schema_registry_url,
schema_registry_username=schema_registry_username,
schema_registry_password=schema_registry_password,
o_auth_bearer_method=parse_singular_param_to_enum(
o_auth_bearer_method, OAuthBearerMethod),
o_auth_bearer_client_id=o_auth_bearer_client_id,
o_auth_bearer_client_secret=o_auth_bearer_client_secret, # noqa: E501
o_auth_bearer_scope=o_auth_bearer_scope,
o_auth_bearer_token_endpoint_url=o_auth_bearer_token_endpoint_url, # noqa: E501
o_auth_bearer_extensions=o_auth_bearer_extensions,
max_message_bytes=max_message_bytes,
batch_size=batch_size,
enable_idempotence=enable_idempotence,
message_timeout_ms=message_timeout_ms,
request_timeout_ms=request_timeout_ms,
max_retries=max_retries,
authentication_mode=parse_singular_param_to_enum(
authentication_mode, BrokerAuthenticationMode),
protocol=parse_singular_param_to_enum(protocol,
BrokerProtocol),
linger_ms=linger_ms,
data_type=parse_singular_param_to_enum(data_type,
DataType),
**kwargs))
return fb
return decorator()
return wrap