func queryPublishCatalog()

in internal/task/task.go [78:101]


func queryPublishCatalog(operationID string) (*flow.WorkflowEventCatalog, error) {
	grpcConn, err := getGRPCConn()
	if err != nil {
		return nil, err
	}
	defer closeGRPCConn(grpcConn)

	catalogClient := catalog.NewCatalogClient(grpcConn)
	rsp, err := catalogClient.QueryOperations(context.Background(), &catalog.QueryOperationsRequest{
		OperationId: operationID,
	})
	if err != nil {
		return nil, err
	}
	if len(rsp.Operations) == 0 {
		return nil, fmt.Errorf("operationID %s invalid, please check it", operationID)
	}
	operation := rsp.Operations[0]
	if operation.Type != constants.EventTypePublish {
		return nil, fmt.Errorf("operationID %s invalid, please check it", operationID)
	}
	return &flow.WorkflowEventCatalog{Topic: operation.ChannelName, Schema: operation.Schema,
		OperationID: operationID}, nil
}