in pulsar-functions/instance/src/main/python/python_instance_main.py [0:0]
def main():
# Setup signal handlers
signal.signal(signal.SIGTERM, atexit_function)
signal.signal(signal.SIGHUP, atexit_function)
signal.signal(signal.SIGINT, atexit_function)
parser = generate_arguments_parser()
args = parser.parse_args()
merge_arguments(args, args.config_file)
validate_arguments(args)
function_details = Function_pb2.FunctionDetails()
args.function_details = str(args.function_details)
if args.function_details[0] == '\'':
args.function_details = args.function_details[1:]
if args.function_details[-1] == '\'':
args.function_details = args.function_details[:-1]
json_format.Parse(args.function_details, function_details)
if function_details.autoAck is False and function_details.processingGuarantees == "ATMOST_ONCE" \
or function_details.processingGuarantees == "ATLEAST_ONCE":
print("When Guarantees == " + function_details.processingGuarantees + ", autoAck must be equal to true, "
"This is a contradictory configuration, autoAck will be removed later,"
"If you want not to automatically ack, please configure the processing guarantees as MANUAL."
"This is a contradictory configuration, autoAck will be removed later,"
"Please refer to PIP: https://github.com/apache/pulsar/issues/15560")
sys.exit(1)
if function_details.processingGuarantees == Function_pb2.ProcessingGuarantees.Value('EFFECTIVELY_ONCE'):
if len(function_details.source.inputSpecs.keys()) != 1 or function_details.sink.topic == "":
print("When Guarantees == EFFECTIVELY_ONCE you need to ensure that the following pre-requisites have been met:"
"1. deduplication is enabled"
"2. set ProcessingGuarantees to EFFECTIVELY_ONCE"
"3. the function has only one source topic and one sink topic (both are non-partitioned)")
sys.exit(1)
if os.path.splitext(str(args.py))[1] == '.whl':
if args.install_usercode_dependencies:
cmd = "pip install -t %s" % os.path.dirname(os.path.abspath(str(args.py)))
if args.dependency_repository:
cmd = cmd + " -i %s" % str(args.dependency_repository)
if args.extra_dependency_repository:
cmd = cmd + " --extra-index-url %s" % str(args.extra_dependency_repository)
cmd = cmd + " %s" % str(args.py)
retval = os.system(cmd)
if retval != 0:
print("Could not install user depedencies")
sys.exit(1)
else:
zpfile = zipfile.ZipFile(str(args.py), 'r')
zpfile.extractall(os.path.dirname(str(args.py)))
sys.path.insert(0, os.path.dirname(os.path.abspath(str(args.py))))
elif os.path.splitext(str(args.py))[1] == '.zip':
# Assumig zip file with format func.zip
# extract to folder function
# internal dir format
# "func/src"
# "func/requirements.txt"
# "func/deps"
# run pip install to target folder deps folder
zpfile = zipfile.ZipFile(str(args.py), 'r')
zpfile.extractall(os.path.dirname(str(args.py)))
basename = os.path.basename(os.path.splitext(str(args.py))[0])
requirements_file = os.path.join(os.path.dirname(str(args.py)), basename, "requirements.txt")
if os.path.isfile(requirements_file):
cmd = "pip install -r %s" % requirements_file
Log.debug("Install python dependencies via cmd: %s" % cmd)
retval = os.system(cmd)
if retval != 0:
print("Could not install user depedencies specified by the requirements.txt file")
sys.exit(1)
deps_dir = os.path.join(os.path.dirname(str(args.py)), basename, "deps")
if os.path.isdir(deps_dir) and os.listdir(deps_dir):
# get all wheel files from deps directory
wheel_file_list = [os.path.join(deps_dir, f) for f in os.listdir(deps_dir) if os.path.isfile(os.path.join(deps_dir, f)) and os.path.splitext(f)[1] =='.whl']
cmd = "pip install -t %s --no-index --find-links %s %s" % (os.path.dirname(os.path.abspath(str(args.py))), deps_dir, " ".join(wheel_file_list))
Log.debug("Install python dependencies via cmd: %s" % cmd)
retval = os.system(cmd)
if retval != 0:
print("Could not install user depedencies specified by the zip file")
sys.exit(1)
# add python user src directory to path
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(str(args.py))), basename, "src"))
log_file = os.path.join(args.logging_directory,
util.getFullyQualifiedFunctionName(function_details.tenant, function_details.namespace, function_details.name),
"%s-%s.log" % (args.logging_file, args.instance_id))
logging_level = {"notset": logging.NOTSET,
"debug": logging.DEBUG,
"info": logging.INFO,
"warn": logging.WARNING,
"warning": logging.WARNING,
"error": logging.ERROR,
"critical": logging.CRITICAL,
"fatal": logging.CRITICAL}.get(args.logging_level, None)
log.init_logger(logging_level, log_file, args.logging_config_file)
Log.info("Starting Python instance with %s" % str(args))
authentication = None
use_tls = False
tls_allow_insecure_connection = False
tls_trust_cert_path = None
hostname_verification_enabled = False
if args.client_auth_plugin and args.client_auth_params:
authentication = pulsar.Authentication(args.client_auth_plugin, args.client_auth_params)
if args.use_tls == "true":
use_tls = True
if args.tls_allow_insecure_connection == "true":
tls_allow_insecure_connection = True
if args.tls_trust_cert_path:
tls_trust_cert_path = args.tls_trust_cert_path
if args.hostname_verification_enabled == "true":
hostname_verification_enabled = True
pulsar_client = pulsar.Client(args.pulsar_serviceurl, authentication=authentication, operation_timeout_seconds=30,
io_threads=1, message_listener_threads=1, concurrent_lookup_requests=50000,
log_conf_file_path=None, use_tls=use_tls, tls_trust_certs_file_path=tls_trust_cert_path,
tls_allow_insecure_connection=tls_allow_insecure_connection,
tls_validate_hostname=hostname_verification_enabled)
state_storage_serviceurl = None
if args.state_storage_serviceurl is not None:
state_storage_serviceurl = str(args.state_storage_serviceurl)
secrets_provider = None
if args.secrets_provider is not None:
secrets_provider = util.import_class(os.path.dirname(inspect.getfile(inspect.currentframe())), str(args.secrets_provider))
else:
secrets_provider = util.import_class(os.path.dirname(inspect.getfile(inspect.currentframe())), "secretsprovider.ClearTextSecretsProvider")
secrets_provider = secrets_provider()
secrets_provider_config = None
if args.secrets_provider_config is not None:
args.secrets_provider_config = str(args.secrets_provider_config)
if args.secrets_provider_config[0] == '\'':
args.secrets_provider_config = args.secrets_provider_config[1:]
if args.secrets_provider_config[-1] == '\'':
args.secrets_provider_config = args.secrets_provider_config[:-1]
secrets_provider_config = json.loads(str(args.secrets_provider_config))
secrets_provider.init(secrets_provider_config)
pyinstance = python_instance.PythonInstance(str(args.instance_id), str(args.function_id),
str(args.function_version), function_details,
int(args.max_buffered_tuples),
int(args.expected_healthcheck_interval),
str(args.py),
pulsar_client,
secrets_provider,
args.cluster_name,
state_storage_serviceurl,
args.config_file)
pyinstance.run()
server_instance = server.serve(args.port, pyinstance)
# Cannot use latest version of prometheus client because of thread leak
# prometheus_client.start_http_server(args.metrics_port)
# Use patched version of prometheus
# Contains fix from https://github.com/prometheus/client_python/pull/356
# This can be removed one the fix in is a official prometheus client release
prometheus_client_fix.start_http_server(args.metrics_port)
global to_run
while to_run:
time.sleep(1)
pyinstance.join()
# make sure to close all non-daemon threads before this!
sys.exit(0)