Skip to content

Commit

Permalink
Check condition inside rules (#63)
Browse files Browse the repository at this point in the history
* Move resource_is_whitelisted to parent class

* move regex to class constant in SQSQueuePolicyPublicRule

* revert changes of resource_is_whitelisted

* remove is_resource from SQSQueuePolicyPublicRule

* Checks cross acount if id exists

* change regex inside IAMRoleWildcardActionOnPermissionsPolicyRule and fix tests

* add condition checker to KMSKeyWildcardPrincipal

* remove unnecessary blanck space

* add condition checker to S3BucketPolicyPrincipalRule

* remove unnecessary space

* bump cfripper version

* change confition

* Improve condition check

* Update condition

* add stack params as config

* remove version bump

* remove [arameters in config

* update main

* Add condition to S3CrossAccountTrustRule

* add check for getatt

* minot improvement

* checks for undefined

* fix black issue

* update changelog
  • Loading branch information
oscarbc96 authored and jsoucheiron committed Nov 8, 2019
1 parent a163bbb commit 9d0c064
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 38 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ All notable changes to this project will be documented in this file.
- New regex `REGEX_IS_STAR`, matches only a `*` character.

### Changed
- `GenericWildcardPrincipalRule` now trust the condition to reduce false positives.
- `GenericWildcardPrincipalRule`, `S3BucketPolicyPrincipalRule`, `S3CrossAccountTrustRule`, `SQSQueuePolicyPublicRule` and `KMSKeyWildcardPrincipal` now trust the condition to reduce false positives.
- Rules check the resource type using `isinstance` instead of comparing type to a string if pycfmodel implements the resource.
- Instance method `add_failure` now accepts `risk_value` and `risk_mode` as optional parameters.
- `CrossAccountTrustRule` only runs if config has defined `self._config.aws_account_id`.
- `IAMRoleWildcardActionOnPermissionsPolicyRule`now uses `REGEX_WILDCARD_POLICY_ACTION`.

### Fixed
- `IAMRolesOverprivilegedRule` now uses `REGEX_IS_STAR` for finding statements instead of `REGEX_CONTAINS_STAR`.
12 changes: 5 additions & 7 deletions cfripper/model/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,17 @@ def add_failure(
granularity: Optional[RuleGranularity] = None,
resource_ids: Optional[Set] = None,
actions: Optional[Set] = None,
risk_value: Optional[RuleRisk] = None,
rule_mode: Optional[RuleMode] = None,
):

if granularity is None:
granularity = self.GRANULARITY

self._result.add_failure(
rule=rule,
reason=reason,
rule_mode=self.RULE_MODE,
risk_value=self.RISK_VALUE,
rule_mode=rule_mode or self.RULE_MODE,
risk_value=risk_value or self.RISK_VALUE,
resource_ids=resource_ids,
actions=actions,
granularity=granularity,
granularity=granularity or self.GRANULARITY,
)

def add_warning(self, warning):
Expand Down
27 changes: 19 additions & 8 deletions cfripper/rules/CrossAccountTrustRule.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from pycfmodel.model.resources.iam_role import IAMRole

from ..config.regex import REGEX_CROSS_ACCOUNT_ROOT
from ..model.enums import RuleGranularity
from ..model.enums import RuleGranularity, RuleMode
from ..model.rule import Rule


Expand All @@ -38,10 +38,21 @@ def invoke(self, cfmodel):
type(self).__name__, self.REASON.format(logical_id, principal), resource_ids={logical_id}
)

for principal in resource.Properties.AssumeRolePolicyDocument.allowed_principals_with(
not_has_account_id
):
if not principal.endswith(".amazonaws.com"): # Checks if principal is an AWS service
self.add_failure(
type(self).__name__, self.REASON.format(logical_id, principal), resource_ids={logical_id}
)
if self._config.aws_account_id:
for principal in resource.Properties.AssumeRolePolicyDocument.allowed_principals_with(
not_has_account_id
):
if not principal.endswith(".amazonaws.com"): # Checks if principal is an AWS service
if "GETATT" in principal or "UNDEFINED_" in principal:
self.add_failure(
type(self).__name__,
self.REASON.format(logical_id, principal),
resource_ids={logical_id},
rule_mode=RuleMode.DEBUG,
)
else:
self.add_failure(
type(self).__name__,
self.REASON.format(logical_id, principal),
resource_ids={logical_id},
)
4 changes: 1 addition & 3 deletions cfripper/rules/GenericWildcardPrincipalRule.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@

logger = logging.getLogger(__file__)

POLICY_DOCUMENT_TYPES = ("policy_document", "key_policy", "assume_role_policy_document")


class GenericWildcardPrincipalRule(Rule):

Expand Down Expand Up @@ -64,7 +62,7 @@ def check_for_wildcards(self, logical_id: str, resource: PolicyDocument):
if account_id_match:
self.validate_account_id(logical_id, account_id_match.group(1))

if statement.Condition is not None:
if statement.Condition and statement.Condition.dict():
logger.warning(
f"Not adding {type(self).__name__} failure in {logical_id} because there are conditions: {statement.Condition}"
)
Expand Down
3 changes: 2 additions & 1 deletion cfripper/rules/HardcodedRDSPasswordRule.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def invoke(self, cfmodel):
for logical_id, resource in cfmodel.Resources.items():
if (
resource.Type == "AWS::RDS::DBInstance"
and resource.Properties.get("MasterUserPassword") != Parameter.NO_ECHO_NO_DEFAULT
and resource.Properties.get("MasterUserPassword", Parameter.NO_ECHO_NO_DEFAULT)
!= Parameter.NO_ECHO_NO_DEFAULT
):
self.add_failure(type(self).__name__, self.REASON.format(logical_id))
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""
from pycfmodel.model.resources.iam_role import IAMRole

from ..config.regex import REGEX_CONTAINS_STAR
from ..config.regex import REGEX_WILDCARD_POLICY_ACTION
from ..model.rule import Rule


Expand All @@ -26,5 +26,5 @@ def invoke(self, cfmodel):
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, IAMRole):
for policy in resource.Properties.Policies:
if policy.PolicyDocument.allowed_actions_with(REGEX_CONTAINS_STAR):
if policy.PolicyDocument.allowed_actions_with(REGEX_WILDCARD_POLICY_ACTION):
self.add_failure(type(self).__name__, self.REASON.format(logical_id, policy.PolicyName))
18 changes: 14 additions & 4 deletions cfripper/rules/KMSKeyWildcardPrincipal.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import logging
import re

from pycfmodel.model.cf_model import CFModel
from pycfmodel.model.resources.kms_key import KMSKey

from ..model.rule import Rule

logger = logging.getLogger(__file__)


class KMSKeyWildcardPrincipal(Rule):

Expand All @@ -27,7 +30,14 @@ class KMSKeyWildcardPrincipal(Rule):

def invoke(self, cfmodel: CFModel):
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, KMSKey) and resource.Properties.KeyPolicy.allowed_principals_with(
self.CONTAINS_WILDCARD_PATTERN
):
self.add_failure(type(self).__name__, self.REASON.format(logical_id))
if isinstance(resource, KMSKey):
for statement in resource.Properties.KeyPolicy._statement_as_list():
if statement.Effect == "Allow" and statement.principals_with(self.CONTAINS_WILDCARD_PATTERN):
for principal in statement.get_principal_list():
if self.CONTAINS_WILDCARD_PATTERN.match(principal):
if statement.Condition and statement.Condition.dict():
logger.warning(
f"Not adding {type(self).__name__} failure in {logical_id} because there are conditions: {statement.Condition}"
)
else:
self.add_failure(type(self).__name__, self.REASON.format(logical_id))
18 changes: 10 additions & 8 deletions cfripper/rules/S3BucketPolicyPrincipalRule.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ def invoke(self, cfmodel):
if isinstance(resource, S3BucketPolicy):
for statement in resource.Properties.PolicyDocument._statement_as_list():
for principal in statement.get_principal_list():
self.check_account_number(logical_id, principal)

def check_account_number(self, logical_id, principal):
account_id_match = re.match(self.PATTERN, principal)
if account_id_match:
account_id = account_id_match.group(1)
if self._config.aws_principals and account_id not in self._config.aws_principals:
self.add_failure(type(self).__name__, self.REASON.format(logical_id, account_id))
account_id_match = re.match(self.PATTERN, principal)
if account_id_match:
account_id = account_id_match.group(1)
if self._config.aws_principals and account_id not in self._config.aws_principals:
if statement.Condition and statement.Condition.dict():
logger.warning(
f"Not adding {type(self).__name__} failure in {logical_id} because there are conditions: {statement.Condition}"
)
else:
self.add_failure(type(self).__name__, self.REASON.format(logical_id, account_id))
20 changes: 19 additions & 1 deletion cfripper/rules/S3CrossAccountTrustRule.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import logging

from pycfmodel.model.resources.s3_bucket_policy import S3BucketPolicy

from cfripper.model.enums import RuleMode

from ..model.rule import Rule

logger = logging.getLogger(__file__)


class S3CrossAccountTrustRule(Rule):

Expand All @@ -28,4 +34,16 @@ def invoke(self, cfmodel):
if statement.Effect == "Allow":
for principal in statement.get_principal_list():
if self._config.aws_account_id and self._config.aws_account_id not in principal:
self.add_failure(type(self).__name__, self.REASON.format(logical_id, principal))
if statement.Condition and statement.Condition.dict():
logger.warning(
f"Not adding {type(self).__name__} failure in {logical_id} because there are conditions: {statement.Condition}"
)
else:
if "GETATT" in principal or "UNDEFINED_" in principal:
self.add_failure(
type(self).__name__,
self.REASON.format(logical_id, principal),
rule_mode=RuleMode.DEBUG,
)
else:
self.add_failure(type(self).__name__, self.REASON.format(logical_id, principal))
15 changes: 13 additions & 2 deletions cfripper/rules/SQSQueuePolicyPublicRule.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,33 @@
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import logging
import re

from pycfmodel.model.resources.sqs_queue_policy import SQSQueuePolicy

from ..model.enums import RuleRisk
from ..model.rule import Rule

logger = logging.getLogger(__file__)


class SQSQueuePolicyPublicRule(Rule):

REASON = "SQS Queue policy {} should not be public"
RISK_VALUE = RuleRisk.HIGH
REGEX_HAS_STAR_AFTER_COLON = re.compile(r"^(\w*:){0,1}\*$")

def invoke(self, cfmodel):
for logical_id, resource in cfmodel.Resources.items():
if isinstance(resource, SQSQueuePolicy) and resource.Properties.PolicyDocument.allowed_principals_with(
re.compile(r"^(\w*:){0,1}\*$")
self.REGEX_HAS_STAR_AFTER_COLON
):
self.add_failure(type(self).__name__, self.REASON.format(logical_id))
for statement in resource.Properties.PolicyDocument._statement_as_list():
if statement.Effect == "Allow" and statement.principals_with(self.REGEX_HAS_STAR_AFTER_COLON):
if statement.Condition and statement.Condition.dict():
logger.warning(
f"Not adding {type(self).__name__} failure in {logical_id} because there are conditions: {statement.Condition}"
)
else:
self.add_failure(type(self).__name__, self.REASON.format(logical_id))
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"Statement": [
{
"Effect": "Allow",
"Action": "*",
"Action": "sts:*",
"NotResource": "*"
}
]
Expand Down

0 comments on commit 9d0c064

Please sign in to comment.