in create_task.py [0:0]
def create_task(name,
schema_name,
description,
task_sheet,
migration_type = "full-load",
source_arn = None,
target_arn = None,
replica_arn = None,
tags = None,
cdc_start_time = None):
i = 1 # counter for rule-id
num_cols, task_tables= task_sheet.ncols, task_sheet.nrows
task_name = re.sub(r"[^\w]",'', name)
op_template = copy.deepcopy(JSON_DEFAULT_TEMPLATE_FILE)
# Add endpoints and replication instance as perametes
if source_arn: # Replace Source ARN in Parameters in task dms cloudformation
op_template["Parameters"]["SourceEndpoint"]["Default"] = source_arn
if target_arn: # Replace Target ARN in Parameters in task dms cloudformation
op_template["Parameters"]["TargetEndpoint"]["Default"] = target_arn
if replica_arn: # Replace ReplicaARN in Parameters in task dms cloudformation
op_template["Parameters"]["ReplicationServerARN"]["Default"] = replica_arn
op_template["Resources"][task_name] = op_template["Resources"].pop("TaskNameFromConfig")
op_template["Resources"][task_name]["Properties"]["ReplicationTaskIdentifier"] = name
op_template["Resources"][task_name]["Properties"]["MigrationType"] = migration_type
op_template["Description"] = description
rules = {"rules" : []}
selection_json = JSON_TABLE_MAPPINGS["selection"]
transformation_default_json = JSON_TABLE_MAPPINGS["transformation_default"]
for row_idx in range(1, task_tables):
table_name = task_sheet.cell(row_idx, 1).value
exclude_columns_list = task_sheet.cell(row_idx, 2).value.split(',')
exclude_columns = list()
for col in exclude_columns_list:
exclude_columns.append(str(col).strip())
#exclude_columns = map(str.strip, exclude_columns_list)
table_json_op = copy.deepcopy(selection_json)
table_json_op["rule-id"] = i
table_json_op["rule-name"] = i
table_json_op["object-locator"]["schema-name"] = schema_name
table_json_op["object-locator"]["table-name"] = table_name
rules["rules"].append(table_json_op)
i += 1
for exclude_column in exclude_columns: # Adding Exclude Colllumns Rule to task if exists
if not exclude_column:
continue
exc_json_op = copy.deepcopy(transformation_default_json)
exc_json_op["rule-type"] = "transformation"
exc_json_op["rule-id"] = i
exc_json_op["rule-name"] = i
exc_json_op["rule-target"] = "column"
exc_json_op["object-locator"]["schema-name"] = schema_name
exc_json_op["object-locator"]["table-name"] = table_name
exc_json_op["object-locator"]["column-name"] = exclude_column
exc_json_op["rule-action"] = "remove-column"
rules["rules"].append(exc_json_op)
i += 1
for trans_rule in JSON_TABLE_MAPPINGS["transformation"]: # Adding transformation rules to task
trans_rule["rule-id"] = i
trans_rule["rule-name"] = i
trans_rule["object-locator"]["schema-name"] = schema_name
rules["rules"].append(trans_rule)
i += 1
op_template["Resources"][task_name]["Properties"]["TableMappings"] = json.dumps(rules).replace('"', '\"')
# Add Common tags if exist
if tags and len(tags) > 0 : op_template["Resources"][task_name]["Properties"]["Tags"] = tags
# Add CDCStartTime if applicable
if cdc_start_time : op_template["Resources"][task_name]["Properties"]["CdcStartTime"] = cdc_start_time
if not os.path.exists(os.path.join(BASE_DIR, "output")): # Creating output folder if not exists
os.makedirs(os.path.join(BASE_DIR, "output"))
with open(os.path.join(BASE_DIR, "output", name+".template"), "w") as writefile:
json.dump(op_template, writefile, indent=4)
print("Created Task Template for %s"%name)