in internal/task/task.go [78:101]
func queryPublishCatalog(operationID string) (*flow.WorkflowEventCatalog, error) {
grpcConn, err := getGRPCConn()
if err != nil {
return nil, err
}
defer closeGRPCConn(grpcConn)
catalogClient := catalog.NewCatalogClient(grpcConn)
rsp, err := catalogClient.QueryOperations(context.Background(), &catalog.QueryOperationsRequest{
OperationId: operationID,
})
if err != nil {
return nil, err
}
if len(rsp.Operations) == 0 {
return nil, fmt.Errorf("operationID %s invalid, please check it", operationID)
}
operation := rsp.Operations[0]
if operation.Type != constants.EventTypePublish {
return nil, fmt.Errorf("operationID %s invalid, please check it", operationID)
}
return &flow.WorkflowEventCatalog{Topic: operation.ChannelName, Schema: operation.Schema,
OperationID: operationID}, nil
}