feat(waf-acl): calculate WCU

This commit is contained in:
Ming Di Leom 2021-07-23 07:02:02 +00:00
parent f546c33f67
commit 4298af53d6
No known key found for this signature in database
GPG Key ID: 32D3E28E96A695E8
1 changed files with 99 additions and 5 deletions

View File

@ -23,11 +23,19 @@ parser.add_argument('--directory', '-d',
parser.add_argument('--original', '-a', parser.add_argument('--original', '-a',
help = 'Also download raw ACLs.', help = 'Also download raw ACLs.',
action = 'store_true') action = 'store_true')
parser.add_argument('--wcu', '-w',
help = 'Calculate WCU value of each rule.',
action = 'store_true')
parser.add_argument('--total-wcu', '-t',
help = 'Shows the total WCU of each web ACL.',
action = 'store_true')
args = parser.parse_args() args = parser.parse_args()
profile = args.profile profile = args.profile
original = args.original original = args.original
dirPath = args.directory dirPath = args.directory
Path(dirPath).mkdir(parents = True, exist_ok = True) Path(dirPath).mkdir(parents = True, exist_ok = True)
show_wcu = args.wcu
show_total_wcu = args.total_wcu
today = date.today().strftime('%Y%m%d') today = date.today().strftime('%Y%m%d')
@ -47,9 +55,15 @@ def byteToString(bts):
def first_key(obj): def first_key(obj):
return list(obj.keys())[0] return list(obj.keys())[0]
def field_to_match(obj):
return first_key(obj).lower() \
if 'SingleHeader' not in obj \
else obj['SingleHeader']['Name'].lower()
def parse_statement(obj): def parse_statement(obj):
first_key_obj = first_key(obj) first_key_obj = first_key(obj)
text = first_key_obj text = first_key_obj
wcu_statement = {}
if first_key_obj == 'NotStatement' or first_key_obj == 'ByteMatchStatement': if first_key_obj == 'NotStatement' or first_key_obj == 'ByteMatchStatement':
not_prefix = '' not_prefix = ''
@ -68,21 +82,46 @@ def parse_statement(obj):
if first_key(obj['ByteMatchStatement']['FieldToMatch']) != 'SingleHeader' \ if first_key(obj['ByteMatchStatement']['FieldToMatch']) != 'SingleHeader' \
else obj['ByteMatchStatement']['FieldToMatch']['SingleHeader']['Name'] else obj['ByteMatchStatement']['FieldToMatch']['SingleHeader']['Name']
positional_constraint = obj['ByteMatchStatement']['PositionalConstraint'] positional_constraint = obj['ByteMatchStatement']['PositionalConstraint']
# WCU Calculation
wcu_statement['statement'] = 'string_match'
# assume single-element TextTransformations
wcu_statement['text_transform'] = obj['ByteMatchStatement']['TextTransformations'][0]['Type'].lower()
wcu_statement['base'] = positional_constraint.lower()
if 'TextTransformations' in obj['ByteMatchStatement']:
wcu_statement['field'] = field_to_match(obj['ByteMatchStatement']['FieldToMatch'])
elif first_key_obj == 'IPSetReferenceStatement': elif first_key_obj == 'IPSetReferenceStatement':
search_string = obj['IPSetReferenceStatement']['ARN'] search_string = obj['IPSetReferenceStatement']['ARN']
positional_constraint = 'IPSet' positional_constraint = 'IPSet'
wcu_statement['statement'] = 'ipset'
separator = '=' if len(field_match) >= 1 else '' separator = '=' if len(field_match) >= 1 else ''
text = f'{not_prefix}{field_match}{separator}{positional_constraint}({search_string})' text = f'{not_prefix}{field_match}{separator}{positional_constraint}({search_string})'
elif first_key_obj == 'SqliMatchStatement' or first_key_obj == 'XssMatchStatement': elif first_key_obj == 'SqliMatchStatement' or first_key_obj == 'XssMatchStatement':
text = f'{first_key_obj}({first_key(obj[first_key_obj]["FieldToMatch"])})' text = f'{first_key_obj}({first_key(obj[first_key_obj]["FieldToMatch"])})'
if first_key_obj == 'SqliMatchStatement':
wcu_statement['statement'] = 'sql'
wcu_statement['text_transform'] = obj['SqliMatchStatement']['TextTransformations'][0]['Type'].lower()
return text if 'TextTransformations' in obj['SqliMatchStatement']:
wcu_statement['field'] = field_to_match(obj['SqliMatchStatement']['FieldToMatch'])
else:
wcu_statement['statement'] = 'xss'
wcu_statement['text_transform'] = obj['XssMatchStatement']['TextTransformations'][0]['Type'].lower()
if 'TextTransformations' in obj['XssMatchStatement']:
wcu_statement['field'] = field_to_match(obj['XssMatchStatement']['FieldToMatch'])
return {
'text': text,
'wcu_statement': wcu_statement
}
# Make rule statements human-readable # Make rule statements human-readable
def parse_rule(obj): def parse_rule(obj):
text = '' text = ''
first_key_obj = first_key(obj) first_key_obj = first_key(obj)
wcu_rule = []
if first_key_obj == 'AndStatement' or first_key_obj == 'OrStatement': if first_key_obj == 'AndStatement' or first_key_obj == 'OrStatement':
and_or = 'AND' if first_key_obj == 'AndStatement' else 'OR' and_or = 'AND' if first_key_obj == 'AndStatement' else 'OR'
@ -92,10 +131,13 @@ def parse_rule(obj):
for i in range(0, len(statements)): for i in range(0, len(statements)):
statement = statements[i] statement = statements[i]
rule = parse_statement(statement) rule = parse_statement(statement)['text']
if first_key(statement) == 'AndStatement' or first_key(statement) == 'OrStatement': if first_key(statement) == 'AndStatement' or first_key(statement) == 'OrStatement':
rule = parse_rule(statement) rule = parse_rule(statement)['text']
wcu_rule.extend(parse_rule(statement)['wcu_rule'])
else:
wcu_rule.append(parse_statement(statement)['wcu_statement'])
if i == 0: if i == 0:
text = f'{open_paren}{rule}' text = f'{open_paren}{rule}'
@ -106,9 +148,25 @@ def parse_rule(obj):
elif first_key_obj == 'IPSetReferenceStatement': elif first_key_obj == 'IPSetReferenceStatement':
text = f'IPSet({obj["IPSetReferenceStatement"]["ARN"]})' text = f'IPSet({obj["IPSetReferenceStatement"]["ARN"]})'
else: else:
text = first_key_obj # unknown rule statement
print(first_key_obj)
return text.strip() return {
'text': text.strip(),
'wcu_rule': wcu_rule
}
wcu_dict = {
'ipset': 1,
'exactly': 2,
'starts_with': 2,
'ends_with': 2,
'contains': 10,
'contains_word': 10,
'sql': 20,
'xss': 40,
'text_transform': 10
}
for web_acl in web_acls: for web_acl in web_acls:
web_acl_rule = client.get_web_acl( web_acl_rule = client.get_web_acl(
@ -123,6 +181,8 @@ for web_acl in web_acls:
dump(web_acl_rule['WebACL'], f, indent = 2, default = byteToString) dump(web_acl_rule['WebACL'], f, indent = 2, default = byteToString)
rules = [] rules = []
web_acl_wcu = 0
web_acl_text_transform = set()
for rule in web_acl_rule['WebACL']['Rules']: for rule in web_acl_rule['WebACL']['Rules']:
out_rule = { out_rule = {
# Name: "Rule" # Name: "Rule"
@ -132,7 +192,41 @@ for web_acl in web_acls:
out_rule[rule['Name']] = parse_rule(rule['Statement']).strip() out_rule[rule['Name']] = parse_rule(rule['Statement']).strip()
out_rule['Action'] = 'Allow' if first_key(rule['Action']) == 'Allow' else 'Block' out_rule['Action'] = 'Allow' if first_key(rule['Action']) == 'Allow' else 'Block'
# WCU calculation
if show_wcu or show_total_wcu:
statements = parse_rule(rule['Statement'])['wcu_rule']
if len(statements) >= 1:
rule_wcu = 0
rule_text_transform = set()
for statement in statements:
rule_statement = statement['statement']
if rule_statement == 'ipset' or rule_statement == 'sql' or rule_statement == 'xss':
web_acl_wcu = web_acl_wcu + wcu_dict[rule_statement]
rule_wcu = rule_wcu + wcu_dict[rule_statement]
elif rule_statement == 'string_match':
web_acl_wcu = web_acl_wcu + wcu_dict[statement['base']]
rule_wcu = rule_wcu + wcu_dict[statement['base']]
if 'text_transform' in statement:
text_transform_key = statement['text_transform'] + statement['field']
if text_transform_key not in web_acl_text_transform:
web_acl_wcu = web_acl_wcu + wcu_dict['text_transform']
web_acl_text_transform.add(text_transform_key)
if text_transform_key not in rule_text_transform:
rule_wcu = rule_wcu + wcu_dict['text_transform']
rule_text_transform.add(text_transform_key)
if show_wcu:
out_rule['WCU'] = rule_wcu
rules.append(out_rule) rules.append(out_rule)
with open(path.join(dirPath, f'{web_acl["Name"]}-{today}.json'), mode = 'w') as f: with open(path.join(dirPath, f'{web_acl["Name"]}-{today}.json'), mode = 'w') as f:
dump(rules, f, indent = 2) dump(rules, f, indent = 2)
if show_total_wcu:
print(f'{web_acl["Name"]} consumes {web_acl_wcu} WCU.')