in aws-frauddetector-detector/src/aws_frauddetector_detector/helpers/model_helpers.py [0:0]
def get_model_for_detector(frauddetector_client, detector, model: models.ResourceModel):
# build model from detector
detector_id = detector.get("detectorId", "")
detector_arn = detector.get("arn", "")
referenced_resources = get_referenced_resources_for_detector(model)
model_to_return = models.ResourceModel(
DetectorId=detector_id,
Arn=detector_arn,
CreatedTime=detector.get("createdTime", ""),
LastUpdatedTime=detector.get("lastUpdatedTime", ""),
Description=detector.get("description", ""),
EventType=None,
DetectorVersionId=None,
DetectorVersionStatus=None,
RuleExecutionMode=None,
Rules=[],
Tags=None,
AssociatedModels=None,
)
# get event type model
event_type_model = get_event_type_and_return_event_type_model(frauddetector_client, model.EventType)
model_to_return.EventType = event_type_model
# get latest detector version info to attach to model
if model.DetectorVersionId:
desired_detector_version = api_helpers.call_get_detector_version(
frauddetector_client, model.DetectorId, model.DetectorVersionId
)
else:
describe_detectors_response = api_helpers.call_describe_detector(frauddetector_client, detector_id)
detector_version_summaries = describe_detectors_response.get("detectorVersionSummaries", [])
max_version_id = max(
{int(dv_summary.get("detectorVersionId", "-1")) for dv_summary in detector_version_summaries}
)
desired_detector_version = api_helpers.call_get_detector_version(
frauddetector_client, model.DetectorId, str(max_version_id)
)
model_to_return.DetectorVersionId = desired_detector_version.get("detectorVersionId", "-1")
model_to_return.DetectorVersionStatus = desired_detector_version.get("status", "")
model_to_return.RuleExecutionMode = desired_detector_version.get("ruleExecutionMode", "")
associated_models: List[models.Model] = []
model_endpoints: List[str] = desired_detector_version.get("externalModelEndpoints", [])
for model_endpoint in model_endpoints:
get_external_models_response = api_helpers.call_get_external_models(frauddetector_client, model_endpoint)
external_models = get_external_models_response.get("externalModels", [])
if not external_models:
# we should never see this block get executed
raise exceptions.NotFound("associatedModel", model_endpoint)
associated_models.append(models.Model(Arn=external_models[0].get("arn", "not/found")))
model_versions: List[dict] = desired_detector_version.get("modelVersions", [])
for model_version in model_versions:
required_attributes = {"modelId", "modelType", "modelVersionNumber"}
if not required_attributes.issubset(model_version.keys()):
# we should never see this block get executed
LOG.error(f"get DV did not include enough information in model versions: {desired_detector_version}")
raise exceptions.NotFound("associatedModel", model_version)
model_version_arn = get_model_version_arn_from_model_version(frauddetector_client, model_version)
associated_models.append(models.Model(Arn=model_version_arn))
model_to_return.AssociatedModels = associated_models
# get rule models to attach
referenced_outcome_names = referenced_resources.get("rule_outcomes")
for rule in desired_detector_version.get("rules", []):
rule_detector_id = rule.get("detectorId", "")
rule_id = rule.get("ruleId", "")
rule_version = rule.get("ruleVersion", "-1")
rule_to_append = get_rule_and_return_rule_model(
frauddetector_client,
rule_detector_id,
rule_id,
rule_version,
referenced_outcome_names,
)
model_to_return.Rules.append(rule_to_append)
# get tags
detector_tags = _get_tags_for_given_arn(frauddetector_client, detector_arn)
# TODO: reorder tags to the same order as the input model to work around contract test bug?
model_to_return.Tags = get_tag_models_from_tags(detector_tags)
return model_to_return