Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/code refactor #114

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
boto3==1.9.42
requests
3 changes: 1 addition & 2 deletions hammer/identification/lambdas/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
boto3==1.9.42
requests
requests
4 changes: 3 additions & 1 deletion hammer/library/aws/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from library.utility import jsonDumps
from library.aws.utility import convert_tags
from library.aws.s3 import S3Operations
from library.aws.policy import PolicyOperations


# structure which describes Elastic search domains
ElasticSearchDomain_Details = namedtuple('ElasticSearchDomain', [
Expand Down Expand Up @@ -150,7 +152,7 @@ def validate_access_policy(cls, policy_details):
"""
public_policy = False
for statement in policy_details.get("Statement", []):
public_policy = S3Operations.public_statement(statement)
public_policy = PolicyOperations.public_statement(statement)

return public_policy

Expand Down
104 changes: 104 additions & 0 deletions hammer/library/aws/policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import logging

from copy import deepcopy


class PolicyOperations(object):
@staticmethod
def public_statement(statement):
"""
Check if supplied policy statement allows public access.

:param statement: dict with policy statement (as AWS returns)

:return: boolean, True - if statement allows access from '*' `Principal`, not restricted by `IpAddress` condition
False - otherwise
"""
effect = statement['Effect']
principal = statement.get('Principal', {})
not_principal = statement.get('NotPrincipal', None)
condition = statement.get('Condition', None)
suffix = "/0"
# check both `Principal` - `{"AWS": "*"}` and `"*"`
# and condition (if exists) to be restricted (not "0.0.0.0/0")
if effect == "Allow" and \
(principal == "*" or principal.get("AWS") == "*"):
if condition is not None:
if suffix in str(condition.get("IpAddress")):
return True
else:
return True
if effect == "Allow" and \
not_principal is not None:
# TODO: it is not recommended to use `Allow` with `NotPrincipal`, need to write proper check for such case
# https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_notprincipal.html
logging.error(f"TODO: is this statement public???\n{statement}")
return False

@classmethod
def public_policy(cls, policy):
"""
Check if supplied policy allows public access by checking policy statements

:param policy: dict with policy (as AWS returns)

:return: boolean, True - if any policy statement has public access allowed
False - otherwise
"""
for statement in policy.get("Statement", []):
if cls.public_statement(statement):
return True
return False

@classmethod
def restrict_policy(cls, policy):
"""
Walk through policy and restrict all public statements.
It does not restrict supplied policy dict, but creates an copy and works with that copy.

:param policy: dict with policy (as AWS returns)

:return: new dict with policy based on old one, but with restricted public statements
"""
# make a copy of supplied policy to restrict it
new_policy = deepcopy(policy)
# iterate over policy copy and restrict statements
for statement in new_policy.get("Statement", []):
cls.restrict_statement(statement)
return new_policy

@classmethod
def restrict_statement(cls, statement):
"""
Restricts provided policy statement with RFC1918 condition.
It performs in-place restriction of supplied statement.

:param statement: dict with policy statement to restrict (as AWS returns)

:return: nothing
"""

suffix = "/0"
ip_ranges_rfc1918 = ['10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16']
if cls.public_statement(statement):
# get current condition, if no condition - return condition with source ip from rfc1918
condition = statement.get('Condition', { "IpAddress": {"aws:SourceIp": ip_ranges_rfc1918}})
# get current ip addresses from condition, if no ip addresses - return source ip from rfc1918
ipaddress = condition.get("IpAddress", {"aws:SourceIp": ip_ranges_rfc1918})
# get source ips, if no ips return rfc1918 range
sourceip = ipaddress.get("aws:SourceIp", ip_ranges_rfc1918)
# make list from source ip if it is a single string value
if isinstance(sourceip, str):
sourceip = [sourceip]
# replace cidr with "/0" from source ips with ip ranges from rfc1918
ip_ranges = []
for cidr in sourceip:
if suffix not in cidr:
ip_ranges.append(cidr)
else:
ip_ranges += ip_ranges_rfc1918
# remove dublicates
ip_ranges = list(set(ip_ranges))
ipaddress['aws:SourceIp'] = ip_ranges
condition['IpAddress'] = ipaddress
statement['Condition'] = condition
104 changes: 3 additions & 101 deletions hammer/library/aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from botocore.exceptions import ClientError
from library.utility import jsonDumps
from library.aws.utility import convert_tags
from library.aws.policy import PolicyOperations


class S3Operations(object):
Expand Down Expand Up @@ -50,105 +51,6 @@ def public_acl(cls, acl):
"""
return bool(cls.get_public_acls(acl))

@classmethod
def public_policy(cls, policy):
"""
Check if S3 supplied bucket policy allows public access by checking S3 bucket policy statements

:param policy: dict with S3 bucket policy (as AWS returns)

:return: boolean, True - if any policy statement has public access allowed
False - otherwise
"""
for statement in policy.get("Statement", []):
if cls.public_statement(statement):
return True
return False

@staticmethod
def public_statement(statement):
"""
Check if S3 supplied bucket policy statement allows public access.

:param statement: dict with S3 bucket policy statement (as AWS returns)

:return: boolean, True - if statement allows access from '*' `Principal`, not restricted by `IpAddress` condition
False - otherwise
"""
effect = statement['Effect']
principal = statement.get('Principal', {})
not_principal = statement.get('NotPrincipal', None)
condition = statement.get('Condition', None)
suffix = "/0"
# check both `Principal` - `{"AWS": "*"}` and `"*"`
# and condition (if exists) to be restricted (not "0.0.0.0/0")
if effect == "Allow" and \
(principal == "*" or principal.get("AWS") == "*"):
if condition is not None:
if suffix in str(condition.get("IpAddress")):
return True
else:
return True
if effect == "Allow" and \
not_principal is not None:
# TODO: it is not recommended to use `Allow` with `NotPrincipal`, need to write proper check for such case
# https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_notprincipal.html
logging.error(f"TODO: is this statement public???\n{statement}")
return False

@classmethod
def restrict_policy(cls, policy):
"""
Walk through S3 bucket policy and restrict all public statements.
It does not restrict supplied policy dict, but creates an copy and works with that copy.

:param policy: dict with S3 bucket policy (as AWS returns)

:return: new dict with S3 bucket policy based on old one, but with restricted public statements
"""
# make a copy of supplied policy to restrict it
new_policy = deepcopy(policy)
# iterate over policy copy and restrict statements
for statement in new_policy.get("Statement", []):
cls.restrict_statement(statement)
return new_policy

@classmethod
def restrict_statement(cls, statement):
"""
Restricts provided S3 bucket policy statement with RFC1918 condition.
It performs in-place restriction of supplied statement.

:param statement: dict with S3 bucket policy statement to restrict (as AWS returns)

:return: nothing
"""

suffix = "/0"
ip_ranges_rfc1918 = ['10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16']
if cls.public_statement(statement):
# get current condition, if no condition - return condition with source ip from rfc1918
condition = statement.get('Condition', { "IpAddress": {"aws:SourceIp": ip_ranges_rfc1918}})
# get current ip addresses from condition, if no ip addresses - return source ip from rfc1918
ipaddress = condition.get("IpAddress", {"aws:SourceIp": ip_ranges_rfc1918})
# get source ips, if no ips return rfc1918 range
sourceip = ipaddress.get("aws:SourceIp", ip_ranges_rfc1918)
# make list from source ip if it is a single string value
if isinstance(sourceip, str):
sourceip = [sourceip]
# replace cidr with "/0" from source ips with ip ranges from rfc1918
ip_ranges = []
for cidr in sourceip:
if suffix not in cidr:
ip_ranges.append(cidr)
else:
ip_ranges += ip_ranges_rfc1918
# remove dublicates
ip_ranges = list(set(ip_ranges))
ipaddress['aws:SourceIp'] = ip_ranges
condition['IpAddress'] = ipaddress
statement['Condition'] = condition

@staticmethod
def object_exists(s3_client, bucket, path):
"""
Expand Down Expand Up @@ -333,7 +235,7 @@ def public_by_policy(self):
:return: boolean, True - if S3 bucket policy allows public access
False - otherwise
"""
return S3Operations.public_policy(self._policy)
return PolicyOperations.public_policy(self._policy)

@property
def public_by_acl(self):
Expand Down Expand Up @@ -384,7 +286,7 @@ def restrict_policy(self):
.. note:: This keeps self._policy unchanged.
You need to recheck S3 bucket policy to ensure that it was really restricted.
"""
restricted_policy = S3Operations.restrict_policy(self._policy)
restricted_policy = PolicyOperations.restrict_policy(self._policy)
try:
S3Operations.put_bucket_policy(self.account.client("s3"), self.name, restricted_policy)
except Exception:
Expand Down
5 changes: 3 additions & 2 deletions hammer/library/aws/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from botocore.exceptions import ClientError
from library.utility import jsonDumps
from library.aws.s3 import S3Operations
from library.aws.policy import PolicyOperations


class SQSOperations(object):
Expand Down Expand Up @@ -66,7 +67,7 @@ def public(self):
"""
:return: boolean, True - if policy allows public access to SQS
"""
return S3Operations.public_policy(self._policy)
return PolicyOperations.public_policy(self._policy)

def backup_policy_s3(self, s3_client, bucket):
"""
Expand Down Expand Up @@ -96,7 +97,7 @@ def restrict_policy(self):
.. note:: This keeps self._policy unchanged.
You need to recheck SQS Queue policy to ensure that it was really restricted.
"""
restricted_policy = S3Operations.restrict_policy(self._policy)
restricted_policy = PolicyOperations.restrict_policy(self._policy)
try:
SQSOperations.put_queue_policy(self.account.client("sqs"), self.url, restricted_policy)
except Exception:
Expand Down