in Deployment/RateRuleReload.py [0:0]
def update_raterule(log, assumed_session, webacl_scope, webacl_name, webacl_id, policy_rbpostvalue, policy_rbgetvalue):
wafv2_client = assumed_session.client('wafv2')
webacl_response = wafv2_client.get_web_acl(
Name=webacl_name,
Scope=webacl_scope,
Id=webacl_id
)
postRuleExists = False
getHeadRuleExists = False
ruleEditSuccess = False
i = 0
for rule in webacl_response['WebACL']['Rules']:
if rule['Name'] == 'POSTRule':
postRuleExists = True
log.info('[RateBasedRule-Reload] POSTRule exists for WebACL: %s' %webacl_name)
newvalue = {
"Limit": int(policy_rbpostvalue)
}
try:
webacl_response['WebACL']['Rules'][i]['Statement']['RateBasedStatement'].update(newvalue)
log.info('[RateBasedRule-Reload] Successfully edited POSTRule for WebACL: %s to new value %s' %(webacl_name, policy_rbpostvalue))
ruleEditSuccess = True
except Exception as error:
log.error(str(error))
if rule['Name'] == 'GetHeadRule':
getHeadRuleExists = True
log.info('[RateBasedRule-Reload] GetHeadRule exists for WebACL: %s' %webacl_name)
newvalue = {
"Limit": int(policy_rbgetvalue)
}
try:
webacl_response['WebACL']['Rules'][i]['Statement']['RateBasedStatement'].update(newvalue)
log.info('[RateBasedRule-Reload] Successfully edited GetHeadRule for WebACL: %s to new value %s' %(webacl_name, policy_rbgetvalue))
ruleEditSuccess = True
except Exception as error:
log.error(str(error))
i = i + 1
if ruleEditSuccess:
updates = webacl_response['WebACL']['Rules']
response_webacl = wafv2_client.get_web_acl(
Name=webacl_name,
Scope=webacl_scope,
Id=webacl_id
)
try:
wafv2_client.update_web_acl(
Name=webacl_name,
Scope=webacl_scope,
Id=webacl_id,
DefaultAction={
'Allow': {}
},
Rules=updates,
VisibilityConfig={
'SampledRequestsEnabled': True,
'CloudWatchMetricsEnabled': True,
'MetricName': 'WebACL'
},
LockToken=response_webacl['LockToken']
)
log.info('[RateBasedRule-Reload] Successfully updated POSTRule & GetHeadRule for WebACL: %s' %webacl_name)
except Exception as error:
log.error(str(error))
if not postRuleExists:
log.info('[RateBasedRule-Reload] Adding PostRule for %s with value: %s' %(webacl_name, policy_rbpostvalue))
webacl_response['WebACL']['Rules'].append({
"Name": "POSTRule",
"Priority": 0,
"Statement": {
"RateBasedStatement": {
"Limit": int(policy_rbpostvalue),
"AggregateKeyType": "IP",
"ScopeDownStatement": {
"ByteMatchStatement": {
"FieldToMatch": {
"Method": {}
},
"PositionalConstraint": "EXACTLY",
"SearchString": "POST",
"TextTransformations": [
{
"Type": "NONE",
"Priority": 0
}
]
}
}
}
},
'Action': {
'Block': {}
},
'VisibilityConfig': {
'SampledRequestsEnabled': True,
'CloudWatchMetricsEnabled': True,
'MetricName': 'RateRule-POST'
}
})
updates = webacl_response['WebACL']['Rules']
response_webacl = wafv2_client.get_web_acl(
Name=webacl_name,
Scope=webacl_scope,
Id=webacl_id
)
try:
wafv2_client.update_web_acl(
Name=webacl_name,
Scope=webacl_scope,
Id=webacl_id,
DefaultAction={
'Allow': {}
},
Rules=updates,
VisibilityConfig={
'SampledRequestsEnabled': True,
'CloudWatchMetricsEnabled': True,
'MetricName': 'WebACL'
},
LockToken=response_webacl['LockToken']
)
log.info('[RateBasedRule-Reload] Successfully added PostRule for %s' %webacl_name)
except Exception as error:
log.error(str(error))
if not getHeadRuleExists:
log.info('[RateBasedRule-Reload] Adding GetHeadRule for %s with value: %s' %(webacl_name, policy_rbgetvalue))
webacl_response['WebACL']['Rules'].append({
'Name': 'GetHeadRule',
'Priority': 1,
'Statement': {
'RateBasedStatement': {
'Limit': int(policy_rbgetvalue),
'AggregateKeyType': 'IP',
"ScopeDownStatement": {
"OrStatement": {
"Statements": [
{
"ByteMatchStatement": {
"FieldToMatch": {
"Method": {}
},
"PositionalConstraint": "CONTAINS",
"SearchString": "GET",
"TextTransformations": [
{
"Type": "NONE",
"Priority": 0
}
]
}
},
{
"ByteMatchStatement": {
"FieldToMatch": {
"Method": {}
},
"PositionalConstraint": "CONTAINS",
"SearchString": "HEAD",
"TextTransformations": [
{
"Type": "NONE",
"Priority": 0
}
]
}
}
]
}
}
}
},
'Action': {
'Block': {}
},
'VisibilityConfig': {
'SampledRequestsEnabled': True,
'CloudWatchMetricsEnabled': True,
'MetricName': 'RateRule'
}
})
updates = webacl_response['WebACL']['Rules']
response_webacl = wafv2_client.get_web_acl(
Name=webacl_name,
Scope=webacl_scope,
Id=webacl_id
)
try:
wafv2_client.update_web_acl(
Name=webacl_name,
Scope=webacl_scope,
Id=webacl_id,
DefaultAction={
'Allow': {}
},
Rules=updates,
VisibilityConfig={
'SampledRequestsEnabled': True,
'CloudWatchMetricsEnabled': True,
'MetricName': 'WebACL'
},
LockToken=response_webacl['LockToken']
)
log.info('[RateBasedRule-Reload] Successfully added GetHeadRule for %s' %webacl_name)
except Exception as error:
log.error(str(error))