func Communicate()

in sharedlibraries/communication/communication.go [128:184]


func Communicate(ctx context.Context, endpoint string, channel string, messageHandler MsgHandlerFunc, cloudProperties *metadataserver.CloudProperties) error {
	eBackoff := setupBackoff()
	conn := establishConnection(ctx, endpoint, channel)
	for conn == nil {
		logMsg := fmt.Sprintf("Establishing connection failed. Will backoff and retry.")
		logAndBackoff(ctx, eBackoff, logMsg)
		conn = establishConnection(ctx, endpoint, channel)
	}
	// Reset backoff once we successfully connected.
	eBackoff.Reset()

	var lastErr error
	for {
		// Return most recent error if context is cancelled. Useful for unit testing purposes.
		select {
		case <-ctx.Done():
			log.CtxLogger(ctx).Info("Context is done. Returning.")
			return lastErr
		default:
		}
		// listen for messages
		msg := listenForMessages(ctx, conn, endpoint, channel)
		log.CtxLogger(ctx).Infow("ListenForMessages complete.", "msg", prototext.Format(msg))
		// parse message
		if msg.GetLabels() == nil {
			logMsg := fmt.Sprintf("Nil labels in message from listenForMessages. Will backoff and retry with a new connection.")
			logAndBackoff(ctx, eBackoff, logMsg)
			conn = establishConnection(ctx, endpoint, channel)
			lastErr = fmt.Errorf("nil labels in message from listenForMessages")
			continue
		}
		operationID, ok := msg.GetLabels()["operation_id"]
		if !ok {
			logMsg := fmt.Sprintf("No operation_id label in message. Will backoff and retry.")
			logAndBackoff(ctx, eBackoff, logMsg)
			lastErr = fmt.Errorf("no operation_id label in message")
			continue
		}
		log.CtxLogger(ctx).Debugw("Parsed operation_id from label.", "operation_id", operationID)
		// Reset backoff if we successfully parsed the message.
		eBackoff.Reset()
		// handle the message
		res, err := messageHandler(ctx, msg.GetBody(), cloudProperties)
		statusMsg := succeeded
		if err != nil {
			log.CtxLogger(ctx).Warnw("Encountered error during ACS message handling.", "err", err)
			statusMsg = failed
		}
		log.CtxLogger(ctx).Debugw("Message handling complete.", "responseMsg", prototext.Format(res), "statusMsg", statusMsg)
		// Send operation status message.
		err = sendStatusMessage(ctx, operationID, res, statusMsg, conn)
		if err != nil {
			log.CtxLogger(ctx).Warnw("Encountered error during sendStatusMessage.", "err", err)
			lastErr = err
		}
	}
}