in sharedlibraries/communication/communication.go [128:184]
func Communicate(ctx context.Context, endpoint string, channel string, messageHandler MsgHandlerFunc, cloudProperties *metadataserver.CloudProperties) error {
eBackoff := setupBackoff()
conn := establishConnection(ctx, endpoint, channel)
for conn == nil {
logMsg := fmt.Sprintf("Establishing connection failed. Will backoff and retry.")
logAndBackoff(ctx, eBackoff, logMsg)
conn = establishConnection(ctx, endpoint, channel)
}
// Reset backoff once we successfully connected.
eBackoff.Reset()
var lastErr error
for {
// Return most recent error if context is cancelled. Useful for unit testing purposes.
select {
case <-ctx.Done():
log.CtxLogger(ctx).Info("Context is done. Returning.")
return lastErr
default:
}
// listen for messages
msg := listenForMessages(ctx, conn, endpoint, channel)
log.CtxLogger(ctx).Infow("ListenForMessages complete.", "msg", prototext.Format(msg))
// parse message
if msg.GetLabels() == nil {
logMsg := fmt.Sprintf("Nil labels in message from listenForMessages. Will backoff and retry with a new connection.")
logAndBackoff(ctx, eBackoff, logMsg)
conn = establishConnection(ctx, endpoint, channel)
lastErr = fmt.Errorf("nil labels in message from listenForMessages")
continue
}
operationID, ok := msg.GetLabels()["operation_id"]
if !ok {
logMsg := fmt.Sprintf("No operation_id label in message. Will backoff and retry.")
logAndBackoff(ctx, eBackoff, logMsg)
lastErr = fmt.Errorf("no operation_id label in message")
continue
}
log.CtxLogger(ctx).Debugw("Parsed operation_id from label.", "operation_id", operationID)
// Reset backoff if we successfully parsed the message.
eBackoff.Reset()
// handle the message
res, err := messageHandler(ctx, msg.GetBody(), cloudProperties)
statusMsg := succeeded
if err != nil {
log.CtxLogger(ctx).Warnw("Encountered error during ACS message handling.", "err", err)
statusMsg = failed
}
log.CtxLogger(ctx).Debugw("Message handling complete.", "responseMsg", prototext.Format(res), "statusMsg", statusMsg)
// Send operation status message.
err = sendStatusMessage(ctx, operationID, res, statusMsg, conn)
if err != nil {
log.CtxLogger(ctx).Warnw("Encountered error during sendStatusMessage.", "err", err)
lastErr = err
}
}
}