in src/modules/snowflake/deploy-utils/create_snowflake_connector.py [0:0]
def main():
print_usage()
args = parse_args()
region = 'us-east-1' if args.region is None else args.region
endpoint_url = args.endpoint_url
workspace_id = args.workspace_id
component_type_id = 'com.example.SnowflakeConnector' if args.component_type_id is None else args.component_type_id
entity_property_table_name = args.entity_property_table_name
timeseries_table_name = args.timeseries_table_name
schema_initializer_arn = args.schema_initializer_arn
data_reader_by_entity_arn = args.data_reader_by_entity_arn
session = boto3.session.Session()
iottwinmaker = session.client(service_name='iottwinmaker', endpoint_url=endpoint_url, region_name=region)
# check if the component type id already exists
componentTypeIds = list_component_type_ids(iottwinmaker, workspace_id)
if component_type_id in componentTypeIds:
print(f"ComponentTypeId : {component_type_id} already exists in the workspace {workspace_id}")
return
response = iottwinmaker.create_component_type(
workspaceId = workspace_id,
componentTypeId = component_type_id,
isSingleton = True,
propertyDefinitions = {
"elemId": {
"dataType": {
"type": "STRING"
},
"isTimeSeries": False,
"isRequiredInEntity": True,
"isExternalId": True
},
"entityPropertyTableName": {
"dataType": {
"type": "STRING"
},
"isTimeSeries": False,
"defaultValue": {
"stringValue": entity_property_table_name
}
},
"timeseriesTableName": {
"dataType": {
"type": "STRING"
},
"isTimeSeries": True,
"isStoredExternally": False,
"defaultValue": {
"stringValue": timeseries_table_name
}
}
},
functions = {
"schemaInitializer": {
"scope": "ENTITY",
"implementedBy": {
"lambda": {
"arn": schema_initializer_arn
}
},
"requiredProperties": [
"elemId",
"entityPropertyTableName"
]
},
"dataReaderByEntity": {
"scope": "ENTITY",
"implementedBy": {
"lambda": {
"arn": data_reader_by_entity_arn
}
}
}
}
)
print(response)