in publish-events/channel/client-libraries/python/publish.py [0:0]
def main():
args = parser.parse_args()
logger.setLevel(args.log)
# Build a valid CloudEvent
# The CloudEvent "id" and "time" are auto-generated if omitted and "specversion" defaults to "1.0".
attributes = {
"type": "mycompany.myorg.myproject.v1.myevent",
"source": "//event/from/python",
"datacontenttype": "application/json",
# Note: someattribute and somevalue have to match with the trigger!
"someattribute": "somevalue"
}
data = {"message": "Hello World from Python"}
event = CloudEvent(attributes, data)
logger.debug('Prepared CloudEvent:')
logger.debug(event)
request = publisher.PublishEventsRequest()
request.channel = args.channel
request.text_events.append(to_json(event))
logger.debug('Prepared publishing request:')
logger.debug(request);
publisher_client = client.PublisherClient()
response = publisher_client.publish_events(request)
logger.info('Event published')
logger.debug(f'Result: {response}')