in src/modules/s3/deploy-utils/create_s3_document_connector.py [0:0]
def main():
args = parse_args()
if args.usage is not None:
print_usage()
region = args.region
workspace_id = args.workspace_id
component_type_id = args.component_type_id
attribute_reader_arn = args.attribute_property_value_reader_by_entity_arn
session = boto3.session.Session()
iottwinmaker = session.client(service_name='iottwinmaker', region_name=region)
# check if the component type id already exists
componentTypeIds = list_component_type_ids(iottwinmaker, workspace_id)
if component_type_id in componentTypeIds:
print(f"ComponentTypeId : {component_type_id} already exists in the workspace {workspace_id}")
return
response = iottwinmaker.create_component_type(
workspaceId = workspace_id,
componentTypeId = component_type_id,
isSingleton = True,
propertyDefinitions = {
"s3Url": {
"dataType": {
"type": "STRING"
},
"isTimeSeries": False,
"isStoredExternally": False
},
"operationStatus": {
"dataType": {
"type": "STRING"
},
"isTimeSeries": False,
"isStoredExternally": True
}
},
functions= {
"attributePropertyValueReaderByEntity": {
"scope": "ENTITY",
"implementedBy": {
"isNative": False,
"lambda": {
"arn": attribute_reader_arn
}
}
}
}
)
print(response)