azext_edge/edge/commands_dataflow.py (29 lines of code) (raw):
# coding=utf-8
# ----------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License file in the project root for license information.
# ----------------------------------------------------------------------------------------------
from typing import Iterable
from .providers.orchestration.resources import DataFlowEndpoints, DataFlowProfiles
def show_dataflow_profile(cmd, profile_name: str, instance_name: str, resource_group_name: str) -> dict:
return DataFlowProfiles(cmd).show(
name=profile_name,
instance_name=instance_name,
resource_group_name=resource_group_name,
)
def list_dataflow_profiles(cmd, instance_name: str, resource_group_name: str) -> Iterable[dict]:
return DataFlowProfiles(cmd).list(instance_name=instance_name, resource_group_name=resource_group_name)
def show_dataflow(cmd, dataflow_name: str, profile_name: str, instance_name: str, resource_group_name: str) -> dict:
return DataFlowProfiles(cmd).dataflows.show(
name=dataflow_name,
dataflow_profile_name=profile_name,
instance_name=instance_name,
resource_group_name=resource_group_name,
)
def list_dataflows(cmd, profile_name: str, instance_name: str, resource_group_name: str) -> Iterable[dict]:
return DataFlowProfiles(cmd).dataflows.list(
dataflow_profile_name=profile_name, instance_name=instance_name, resource_group_name=resource_group_name
)
def show_dataflow_endpoint(cmd, endpoint_name: str, instance_name: str, resource_group_name: str) -> dict:
return DataFlowEndpoints(cmd).show(
name=endpoint_name,
instance_name=instance_name,
resource_group_name=resource_group_name,
)
def list_dataflow_endpoints(cmd, instance_name: str, resource_group_name: str) -> Iterable[dict]:
return DataFlowEndpoints(cmd).list(instance_name=instance_name, resource_group_name=resource_group_name)