kafka-connector/copy_tool.py (76 lines of code) (raw):

#!/usr/bin/env python3 """Pub/Sub <-> Kafka Simple Copy Tool A python script for downloading, installing and running the Kafka connector in a single machine configuration. More complex set-ups should look at the kafka connect documentation (https://docs.confluent.io/home/connect/userguide.html). """ import argparse import io import os import platform import requests import tarfile import tempfile import subprocess KAFKA_RELEASE = "2.6.0" KAFKA_FOLDER = f"kafka_2.13-{KAFKA_RELEASE}" KAFKA_LINK = f"https://archive.apache.org/dist/kafka/{KAFKA_RELEASE}/{KAFKA_FOLDER}.tgz" CONNECTOR_RELEASE = "v0.10-alpha" PUBSUB_CONNECTOR_LINK = f"https://github.com/GoogleCloudPlatform/pubsub/releases/download/{CONNECTOR_RELEASE}/pubsub-kafka-connector.jar" def extract_kafka_to(tempdir): response = requests.get(KAFKA_LINK) with tarfile.open(fileobj=io.BytesIO(response.content), mode='r:gz') as archive: archive.extractall(path=tempdir) def get_connector(tempdir): response = requests.get(PUBSUB_CONNECTOR_LINK) with open(os.path.join(tempdir, "pubsub-kafka-connector.jar"), 'wb+') as f: f.write(response.content) def download(tempdir): print("Downloading kafka...") kafka_path = os.path.join(tempdir, "kafka") os.mkdir(kafka_path) extract_kafka_to(kafka_path) print("Downloading connector...") connector_path = os.path.join(tempdir, "connector") os.mkdir(connector_path) get_connector(connector_path) def make_connect_config(tempdir, bootstrap_servers): print("Building connect config...") with open(os.path.join(tempdir, "connect_config.properties"), 'w+') as output: output.write( "key.converter=org.apache.kafka.connect.converters.ByteArrayConverter\n") output.write( "value.converter=org.apache.kafka.connect.converters.ByteArrayConverter\n") output.write(f"bootstrap.servers={bootstrap_servers}\n") connector_dir = os.path.join(tempdir, "connector") output.write(f"plugin.path={connector_dir}\n") offset_file = os.path.join(tempdir, "connect.offset") output.write(f"offset.storage.file.filename={offset_file}\n") def run_connector(tempdir, connector_properties): if platform.system() == "Windows": kafka_script = os.path.join(tempdir, "kafka", KAFKA_FOLDER, "bin", "windows", "connect-standalone.bat") else: kafka_script = os.path.join(tempdir, "kafka", KAFKA_FOLDER, "bin", "connect-standalone.sh") connect_properties = os.path.join(tempdir, "connect_config.properties") print("Running") subprocess.run( args=[kafka_script, connect_properties, connector_properties], cwd=tempdir) def main(): parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--bootstrap_servers", type=str, required=True, help="A comma-separated list of kafka servers to use for bootstrapping." ) parser.add_argument( "--connector_properties_file", type=str, required=True, help="""The Pub/Sub connector configuration file. `connector.class` specifies which connector to use. Other arguments to this configuration may be found at https://github.com/GoogleCloudPlatform/pubsub/tree/master/kafka-connector#cloudpubsubconnector-configs """) args = parser.parse_args() with tempfile.TemporaryDirectory() as tempdir: download(tempdir) make_connect_config(tempdir, args.bootstrap_servers) run_connector(tempdir, args.connector_properties_file) if __name__ == "__main__": main()