in action/kafkaProduce.py [0:0]
def main(params):
producer = None
logging.info("Using kafka-python %s", str(__version__))
logging.info("Validating parameters")
validationResult = validateParams(params)
if validationResult[0] != True:
return {'error': validationResult[1]}
else:
validatedParams = validationResult[1]
attempt = 0
max_attempts = 3
result = {"success": True}
while attempt < max_attempts:
attempt += 1
logging.info("Starting attempt {}".format(attempt))
try:
logging.info("Getting producer")
# set a client timeout that allows for 3 connection retries while still
# reserving 10s for the actual send
producer_timeout_ms = math.floor(getRemainingTime(reservedTime=10) / max_attempts * 1000)
producer = getProducer(validatedParams, producer_timeout_ms)
topic = validatedParams['topic']
logging.info("Finding topic {}".format(topic))
partition_info = producer.partitions_for(topic)
logging.info("Found topic {} with partition(s) {}".format(topic, partition_info))
break
except Exception as e:
if attempt == max_attempts:
producer = None
logging.warning(e)
traceback.print_exc(limit=5)
result = getResultForException(e)
# we successfully connected and found the topic metadata... let's send!
if producer is not None:
try:
logging.info("Producing message")
# only use the key parameter if it is present
value = validatedParams['value']
if 'key' in validatedParams:
messageKey = validatedParams['key']
future = producer.send(
topic, bytes(value, 'utf-8'), key=bytes(messageKey, 'utf-8'))
else:
future = producer.send(topic, bytes(value, 'utf-8'))
# future should wait all of the remaining time
future_time_seconds = math.floor(getRemainingTime())
sent = future.get(timeout=future_time_seconds)
msg = "Successfully sent message to {}:{} at offset {}".format(
sent.topic, sent.partition, sent.offset)
logging.info(msg)
result = {"success": True, "message": msg}
except Exception as e:
logging.warning(e)
traceback.print_exc(limit=5)
result = getResultForException(e)
return result