diff --git a/deployment/configs/config.json b/deployment/configs/config.json index de6db85a..38c0ead5 100755 --- a/deployment/configs/config.json +++ b/deployment/configs/config.json @@ -75,7 +75,8 @@ "reporting": false, "remediation": false, "remediation_accounts": ["210987654321", "654321210987"], - "remediation_retention_period": 21 + "remediation_retention_period": 21, + "known_ip_sources": ["205.203.150.72", "205.203.130.22"] }, "user_inactivekeys": { "enabled": true, diff --git a/hammer/library/aws/security_groups.py b/hammer/library/aws/security_groups.py index 814d88aa..bab672d7 100755 --- a/hammer/library/aws/security_groups.py +++ b/hammer/library/aws/security_groups.py @@ -8,12 +8,14 @@ from library.utility import jsonDumps from library.aws.s3 import S3Operations from library.aws.utility import convert_tags +from library.config import Config class RestrictionStatus(Enum): Restricted = "restricted" OpenCompletely = "open_completely" OpenPartly = "open_partly" + Safe = "safe" class SecurityGroupOperations: @@ -372,6 +374,28 @@ def __str__(self): perms = ", ".join([str(perm) for perm in self.permissions]) return f"{self.__class__.__name__}(Name={self.name}, Id={self.id}, Permissions=[{perms}])" + def validate_known_ip_soource(self, source_ip): + """ + + :param source_ip: ip address + :return: boolean + """ + config = Config() + known_ip_sources = config.sg.known_ip_sources + source_cidr = ipaddress.ip_network(source_ip) + + for known_ip in known_ip_sources: + known_ip_cidr = ipaddress.ip_network(known_ip) + if known_ip_cidr == source_cidr: + return True + elif source_ip.endswith("/32"): + if source_cidr[-1] in known_ip_cidr: + return True + # ipaddress.subnet_of() function new to Python 3.7. Not available in 3.6 + """elif source_cidr.subnet_of(known_ip_cidr): + return True""" + return False + def restriction_status(self, cidr): """ Check restriction status of cidr @@ -380,11 +404,17 @@ def restriction_status(self, cidr): :return: RestrictionStatus with check result """ + is_known_ip = self.validate_known_ip_soource(cidr) + status = RestrictionStatus.Restricted if cidr.endswith("/0"): status = RestrictionStatus.OpenCompletely elif ipaddress.ip_network(cidr).is_global: status = RestrictionStatus.OpenPartly + + if is_known_ip: + status = RestrictionStatus.Safe + logging.debug(f"Checked '{cidr}' - '{status.value}'") return status @@ -406,10 +436,11 @@ def check(self, restricted_ports): logging.debug(f"Checking '{perm.protocol}' '{perm.from_port}-{perm.to_port}' ports for {ip_range}") # first condition - CIDR is Global/Public status = self.restriction_status(ip_range.cidr) - if status == RestrictionStatus.Restricted: - logging.debug(f"Skipping restricted '{ip_range}'") + if status in (RestrictionStatus.Restricted, RestrictionStatus.Safe): + logging.debug(f"Skipping restricted/safe IP address '{ip_range}'") continue - # second - check if ports from `restricted_ports` list has intersection with ports from FromPort..ToPort range + # second - check if ports from `restricted_ports` list has intersection with ports from FromPort.. + # ToPort range if perm.from_port is None or perm.to_port is None: logging.debug(f"Marking world-wide open all ports from '{ip_range}'") ip_range.status = status