def main()

in publish-events/channel/client-libraries/python/publish.py [0:0]


def main():
  args = parser.parse_args()
  logger.setLevel(args.log)

  # Build a valid CloudEvent
  # The CloudEvent "id" and "time" are auto-generated if omitted and "specversion" defaults to "1.0".
  attributes = {
      "type": "mycompany.myorg.myproject.v1.myevent",
      "source": "//event/from/python",
      "datacontenttype": "application/json",
      # Note: someattribute and somevalue have to match with the trigger!
      "someattribute": "somevalue"
  }
  data = {"message": "Hello World from Python"}
  event = CloudEvent(attributes, data)

  logger.debug('Prepared CloudEvent:')
  logger.debug(event)

  request = publisher.PublishEventsRequest()
  request.channel = args.channel
  request.text_events.append(to_json(event))

  logger.debug('Prepared publishing request:')
  logger.debug(request);

  publisher_client = client.PublisherClient()
  response = publisher_client.publish_events(request)
  logger.info('Event published')
  logger.debug(f'Result: {response}')