action/kafkaProduce.py (135 lines of code) (raw):

"""Kafka message producer. /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ """ import base64 import logging import math import os import sys import time import traceback from kafka import KafkaProducer from kafka.errors import NoBrokersAvailable, KafkaTimeoutError, AuthenticationFailedError from kafka.version import __version__ from random import shuffle logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(levelname)-8s %(asctime)s %(message)s', datefmt='[%H:%M:%S]') max_cached_producers = 10 def main(params): producer = None logging.info("Using kafka-python %s", str(__version__)) logging.info("Validating parameters") validationResult = validateParams(params) if validationResult[0] != True: return {'error': validationResult[1]} else: validatedParams = validationResult[1] attempt = 0 max_attempts = 3 result = {"success": True} while attempt < max_attempts: attempt += 1 logging.info("Starting attempt {}".format(attempt)) try: logging.info("Getting producer") # set a client timeout that allows for 3 connection retries while still # reserving 10s for the actual send producer_timeout_ms = math.floor(getRemainingTime(reservedTime=10) / max_attempts * 1000) producer = getProducer(validatedParams, producer_timeout_ms) topic = validatedParams['topic'] logging.info("Finding topic {}".format(topic)) partition_info = producer.partitions_for(topic) logging.info("Found topic {} with partition(s) {}".format(topic, partition_info)) break except Exception as e: if attempt == max_attempts: producer = None logging.warning(e) traceback.print_exc(limit=5) result = getResultForException(e) # we successfully connected and found the topic metadata... let's send! if producer is not None: try: logging.info("Producing message") # only use the key parameter if it is present value = validatedParams['value'] if 'key' in validatedParams: messageKey = validatedParams['key'] future = producer.send( topic, bytes(value, 'utf-8'), key=bytes(messageKey, 'utf-8')) else: future = producer.send(topic, bytes(value, 'utf-8')) # future should wait all of the remaining time future_time_seconds = math.floor(getRemainingTime()) sent = future.get(timeout=future_time_seconds) msg = "Successfully sent message to {}:{} at offset {}".format( sent.topic, sent.partition, sent.offset) logging.info(msg) result = {"success": True, "message": msg} except Exception as e: logging.warning(e) traceback.print_exc(limit=5) result = getResultForException(e) return result def getResultForException(e): if isinstance(e, KafkaTimeoutError): return {'error': 'Timed out communicating with Message Hub'} elif isinstance(e, AuthenticationFailedError): return {'error': 'Authentication failed'} elif isinstance(e, NoBrokersAvailable): return {'error': 'No brokers available. Check that your supplied brokers are correct and available.'} else: return {'error': '{}'.format(e)} def validateParams(params): validatedParams = params.copy() requiredParams = ['brokers', 'topic', 'value'] missingParams = [] for requiredParam in requiredParams: if requiredParam not in params: missingParams.append(requiredParam) if len(missingParams) > 0: return (False, "You must supply all of the following parameters: {}".format(', '.join(missingParams))) if isinstance(params['brokers'], str): # turn it into a List validatedParams['brokers'] = params['brokers'].split(',') shuffle(validatedParams['brokers']) if 'base64DecodeValue' in params and params['base64DecodeValue'] == True: try: validatedParams['value'] = base64.b64decode(params['value']).decode('utf-8') except: return (False, "value parameter is not Base64 encoded") if len(validatedParams['value']) == 0: return (False, "value parameter is not Base64 encoded") if 'base64DecodeKey' in params and params['base64DecodeKey'] == True: try: validatedParams['key'] = base64.b64decode(params['key']).decode('utf-8') except: return (False, "key parameter is not Base64 encoded") if len(validatedParams['key']) == 0: return (False, "key parameter is not Base64 encoded") return (True, validatedParams) def getProducer(validatedParams, timeout_ms): connectionHash = getConnectionHash(validatedParams) if globals().get("cached_producers") is None: logging.info("dictionary was None") globals()["cached_producers"] = dict() # remove arbitrary connection to make room for new one if len(globals()["cached_producers"]) == max_cached_producers: poppedProducer = globals()["cached_producers"].popitem()[1] poppedProducer.close(timeout=1) logging.info("Removed cached producer") if connectionHash not in globals()["cached_producers"]: logging.info("cache miss") # create a new connection producer = KafkaProducer( api_version_auto_timeout_ms=15000, batch_size=0, bootstrap_servers=validatedParams['brokers'], max_block_ms=timeout_ms, request_timeout_ms=timeout_ms, ) logging.info("Created producer") # store the producer globally for subsequent invocations globals()["cached_producers"][connectionHash] = producer # return it return producer else: logging.info("Reusing existing producer") return globals()["cached_producers"][connectionHash] def getConnectionHash(params): # always use the sorted brokers to combat the effects of shuffle() brokers = params['brokers'] brokers.sort() brokersString = ",".join(brokers) return brokersString # return the remaining time (in seconds) until the action will expire, # optionally reserving some time (also in seconds). def getRemainingTime(reservedTime=0): deadlineSeconds = int(os.getenv('__OW_DEADLINE', 60000)) / 1000 remaining = deadlineSeconds - time.time() - reservedTime # ensure value is at least zero # yes, this is a little paranoid return max(remaining, 0)