def main()

in action/messageHubProduce.py [0:0]


def main(params):
    producer = None
    logging.info("Using kafka-python %s", str(__version__))

    logging.info("Validating parameters")
    validationResult = validateParams(params)
    if validationResult[0] != True:
        return {'error': validationResult[1]}
    else:
        validatedParams = validationResult[1]

    attempt = 0
    max_attempts = 3

    result = {"success": True}

    while attempt < max_attempts:
        attempt += 1
        logging.info("Starting attempt {}".format(attempt))

        try:
            logging.info("Getting producer")

            # set a client timeout that allows for 3 connection retries while still
            # reserving 10s for the actual send
            producer_timeout_ms = math.floor(getRemainingTime(reservedTime=10) / max_attempts * 1000)
            producer = getProducer(validatedParams, producer_timeout_ms)

            topic = validatedParams['topic']
            logging.info("Finding topic {}".format(topic))
            partition_info = producer.partitions_for(topic)
            logging.info("Found topic {} with partition(s) {}".format(topic, partition_info))

            break
        except Exception as e:
            if attempt == max_attempts:
                producer = None
                logging.warning(e)
                traceback.print_exc(limit=5)
                result = getResultForException(e)

    # we successfully connected and found the topic metadata... let's send!
    if producer is not None:
        try:
            logging.info("Producing message")

            # only use the key parameter if it is present
            value = validatedParams['value']
            if 'key' in validatedParams:
                messageKey = validatedParams['key']
                future = producer.send(
                    topic, bytes(value, 'utf-8'), key=bytes(messageKey, 'utf-8'))
            else:
                future = producer.send(topic, bytes(value, 'utf-8'))

            # future should wait all of the remaining time
            future_time_seconds = math.floor(getRemainingTime())
            sent = future.get(timeout=future_time_seconds)
            msg = "Successfully sent message to {}:{} at offset {}".format(
                sent.topic, sent.partition, sent.offset)
            logging.info(msg)
            result = {"success": True, "message": msg}
        except Exception as e:
            logging.warning(e)
            traceback.print_exc(limit=5)
            result = getResultForException(e)

    return result