azext_edge/edge/providers/orchestration/resources/dataflows.py (56 lines of code) (raw):
# coding=utf-8
# ----------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License file in the project root for license information.
# ----------------------------------------------------------------------------------------------
from typing import TYPE_CHECKING, Iterable
from knack.log import get_logger
from ....util.az_client import get_iotops_mgmt_client
from ....util.queryable import Queryable
logger = get_logger(__name__)
if TYPE_CHECKING:
from ....vendor.clients.iotopsmgmt.operations import (
DataflowEndpointOperations,
DataflowOperations,
DataflowProfileOperations,
)
class DataFlowProfiles(Queryable):
def __init__(self, cmd):
super().__init__(cmd=cmd)
self.iotops_mgmt_client = get_iotops_mgmt_client(
subscription_id=self.default_subscription_id,
)
self.ops: "DataflowProfileOperations" = self.iotops_mgmt_client.dataflow_profile
self.dataflows = DataFlows(self.iotops_mgmt_client.dataflow)
def show(self, name: str, instance_name: str, resource_group_name: str) -> dict:
return self.ops.get(
resource_group_name=resource_group_name, instance_name=instance_name, dataflow_profile_name=name
)
def list(self, instance_name: str, resource_group_name: str) -> Iterable[dict]:
return self.ops.list_by_resource_group(resource_group_name=resource_group_name, instance_name=instance_name)
class DataFlows:
def __init__(self, ops: "DataflowOperations"):
self.ops = ops
def show(self, name: str, dataflow_profile_name: str, instance_name: str, resource_group_name: str) -> dict:
return self.ops.get(
resource_group_name=resource_group_name,
instance_name=instance_name,
dataflow_profile_name=dataflow_profile_name,
dataflow_name=name,
)
def list(self, dataflow_profile_name: str, instance_name: str, resource_group_name: str) -> Iterable[dict]:
return self.ops.list_by_profile_resource(
resource_group_name=resource_group_name,
instance_name=instance_name,
dataflow_profile_name=dataflow_profile_name,
)
class DataFlowEndpoints(Queryable):
def __init__(self, cmd):
super().__init__(cmd=cmd)
self.iotops_mgmt_client = get_iotops_mgmt_client(
subscription_id=self.default_subscription_id,
)
self.ops: "DataflowEndpointOperations" = self.iotops_mgmt_client.dataflow_endpoint
def show(self, name: str, instance_name: str, resource_group_name: str) -> dict:
return self.ops.get(
resource_group_name=resource_group_name,
instance_name=instance_name,
dataflow_endpoint_name=name,
)
def list(self, instance_name: str, resource_group_name: str) -> Iterable[dict]:
return self.ops.list_by_resource_group(resource_group_name=resource_group_name, instance_name=instance_name)