def main()

in publish-events/channel-connection/client-libraries/python/publish.py [0:0]


def main():
    args = parser.parse_args()
    logger.setLevel(args.log)

    # Build a valid CloudEvent
    # The CloudEvent "id" and "time" are auto-generated if omitted and "specversion" defaults to "1.0".
    attributes = {
        "type": "provider.v1.event",
        "source": "//provider/source",
        "datacontenttype": "application/json",
        # Note: someattribute and somevalue have to match with the client trigger!
        "someattribute": "somevalue"
    }
    data = {"message": "Hello World from Python"}
    event = CloudEvent(attributes, data)

    logger.debug('Prepared CloudEvent:')
    logger.debug(event)

    request = publisher.PublishChannelConnectionEventsRequest()
    request.channel_connection = args.channel_connection
    request.text_events.append(to_json(event))

    logger.debug('Prepared publishing request:')
    logger.debug(request)

    publisher_client = client.PublisherClient()
    response = publisher_client.publish_channel_connection_events(request)
    logger.info('Event published')
    logger.debug(f'Result: {response}')