in publish-events/channel-connection/client-libraries/python/publish.py [0:0]
def main():
args = parser.parse_args()
logger.setLevel(args.log)
# Build a valid CloudEvent
# The CloudEvent "id" and "time" are auto-generated if omitted and "specversion" defaults to "1.0".
attributes = {
"type": "provider.v1.event",
"source": "//provider/source",
"datacontenttype": "application/json",
# Note: someattribute and somevalue have to match with the client trigger!
"someattribute": "somevalue"
}
data = {"message": "Hello World from Python"}
event = CloudEvent(attributes, data)
logger.debug('Prepared CloudEvent:')
logger.debug(event)
request = publisher.PublishChannelConnectionEventsRequest()
request.channel_connection = args.channel_connection
request.text_events.append(to_json(event))
logger.debug('Prepared publishing request:')
logger.debug(request)
publisher_client = client.PublisherClient()
response = publisher_client.publish_channel_connection_events(request)
logger.info('Event published')
logger.debug(f'Result: {response}')