Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

platform(terraform): Add eval keys #6929

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions checkov/terraform/checks/data/aws/IAMManagedAdminPolicy.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ def __init__(self):
def scan_data_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
if "name" in conf.keys():
if conf.get("name")[0] == ADMIN_POLICY_NAME:
self.evaluated_keys = ["name"]
return CheckResult.FAILED

if "arn" in conf.keys():
if conf.get("arn")[0] == ADMIN_POLICY_ARN:
self.evaluated_keys = ["arn"]
return CheckResult.FAILED

return CheckResult.PASSED
Expand Down
3 changes: 3 additions & 0 deletions checkov/terraform/checks/resource/alicloud/AbsRDSParameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ def scan_resource_conf(self, conf):
if param['name'][0] == self.parameter and (param['value'][0]).lower() == 'on':
return CheckResult.PASSED
return CheckResult.FAILED

def get_evaluated_keys(self):
return ["parameters"]
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Any
from typing import Any, List

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
Expand All @@ -23,5 +23,8 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
return CheckResult.PASSED
return CheckResult.FAILED

def get_evaluated_keys(self) -> List[str]:
return ["encrypted"]


check = DiskEncryptedWithCMK()
9 changes: 7 additions & 2 deletions checkov/terraform/checks/resource/alicloud/DiskIsEncrypted.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
from typing import List

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck


class DiskIsEncrypted(BaseResourceCheck):
def __init__(self):
def __init__(self) -> None:
name = "Ensure disk is encrypted"
id = "CKV_ALI_7"
supported_resources = ['alicloud_disk']
categories = [CheckCategories.ENCRYPTION]
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf):
def scan_resource_conf(self, conf) -> CheckResult:
if conf.get("snapshot_id"):
return CheckResult.UNKNOWN
if conf.get("encrypted") and conf.get("encrypted") == [True]:
return CheckResult.PASSED
return CheckResult.FAILED

def get_evaluated_keys(self) -> List[str]:
return ['encrypted']


check = DiskIsEncrypted()
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ def __init__(self):
def scan_resource_conf(self, conf):
data_disks = conf.get("data_disks")
if data_disks and isinstance(data_disks, list):
for disk in data_disks:
for idx, disk in enumerate(data_disks):
if disk.get('encrypted') != [True]:
self.evaluated_keys = [f'data_disks/[{idx}]/encrypted']
return CheckResult.FAILED
return CheckResult.PASSED

Expand Down
4 changes: 3 additions & 1 deletion checkov/terraform/checks/resource/aws/AMIEncryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ def __init__(self):
def scan_resource_conf(self, conf) -> CheckResult:
if conf.get('ebs_block_device'):
mappings = conf.get('ebs_block_device')
for mapping in mappings:
self.evaluated_keys = ["ebs_block_device"]
for mapping_idx, mapping in enumerate(mappings):
if not mapping.get("snapshot_id"):
if not mapping.get("encrypted"):
return CheckResult.FAILED
if mapping.get("encrypted")[0] is False:
self.evaluated_keys.append(f"ebs_block_device/[{mapping_idx}]/encrypted")
return CheckResult.FAILED
# pass thru
return CheckResult.PASSED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def __init__(self) -> None:
categories = (CheckCategories.NETWORKING,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def _is_policy_secure(self, policy: Dict[str, Any]) -> bool:
def _is_policy_secure(self, policy: Dict[str, Any]) -> CheckResult:
# Check that the policy doesn't allow for all principals to us action execute-api:Invoke
passed = True
if policy.get("Statement"):
Expand Down
3 changes: 3 additions & 0 deletions checkov/terraform/checks/resource/aws/AWSCodeGuruHasCMK.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@ def scan_resource_conf(self, conf: Dict[str, List[Any]]) -> CheckResult:

return CheckResult.FAILED

def get_evaluated_keys(self) -> List[str]:
return ['kms_key_details']


check = AWSCodeGuruHasCMK()
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def __init__(self) -> None:
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
self.evaluated_keys = ["container_properties"]
container_properties = conf.get("container_properties")
if container_properties:
if isinstance(container_properties[0], str):
Expand All @@ -28,6 +29,7 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
if not isinstance(container, dict):
return CheckResult.UNKNOWN
if container.get("privileged"):
self.evaluated_keys.append("container_properties/[0]/privileged")
return CheckResult.FAILED
return CheckResult.PASSED
return CheckResult.UNKNOWN
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Any
from typing import Any, List

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_check import \
Expand Down Expand Up @@ -33,5 +33,8 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:

return CheckResult.FAILED

def get_evaluated_keys(self) -> List[str]:
return ["retention_in_days"]


check = CloudWatchLogGroupRetentionYear()
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ def __init__(self) -> None:
def scan_resource_conf(self, conf):
groups = conf.get("origin_group")
if groups and isinstance(groups, list):
for group in groups:
self.evaluated_keys = ["origin_group"]
for group_idx, group in enumerate(groups):
if isinstance(group, dict) and group.get("failover_criteria"):
member = group.get("member")
if not member or len(member) < 2:
self.evaluated_keys.append(f"origin_group/[{group_idx}]/member")
return CheckResult.FAILED
else:
return CheckResult.FAILED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
if execute_command and isinstance(execute_command, list):
execute_command = execute_command[0]
if isinstance(execute_command, dict) and not execute_command.get("logging") == ["NONE"]:
self.evaluated_keys = ["configuration/[0]/execute_command_configuration"]
if execute_command.get("kms_key_id"):
log_conf = execute_command.get("log_configuration")
if log_conf and isinstance(log_conf, list):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Any
from typing import Any, List

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
Expand Down Expand Up @@ -33,5 +33,8 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:

return CheckResult.UNKNOWN

def get_evaluated_keys(self) -> List[str]:
return ["launch_type", "platform_version"]


check = ECSServiceFargateLatest()
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ def __init__(self) -> None:

def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
if 'listener' in conf:
for listener in conf.get('listener'):
self.evaluated_keys = ['listener']
for listener_idx, listener in enumerate(conf.get('listener')):
if 'instance_protocol' in listener:
self.evaluated_keys.append(f'listener/[{listener_idx}]/instance_protocol')
if listener.get('instance_protocol')[0].lower() in ('http', 'tcp'):
return CheckResult.FAILED
if listener.get('instance_protocol')[0].lower() in ('https', 'ssl') and \
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Any
from typing import Any, List

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.common.util.data_structures_utils import find_in_dict
Expand Down Expand Up @@ -31,5 +31,11 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:

return CheckResult.UNKNOWN

def get_evaluated_keys(self) -> List[str]:
return [
"configuration",
"configuration/[0]/EncryptionConfiguration/AtRestEncryptionConfiguration/LocalDiskEncryptionConfiguration/EnableEbsEncryption"
]


check = EMRClusterConfEncryptsEBS()
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:

return CheckResult.UNKNOWN

def get_evaluated_keys(self) -> list[str]:
return ["configuration"]


check = EMRClusterConfEncryptsInTransit()
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Any
from typing import Any, List

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.common.util.data_structures_utils import find_in_dict
Expand Down Expand Up @@ -31,5 +31,11 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:

return CheckResult.UNKNOWN

def get_evaluated_keys(self) -> List[str]:
return [
"configuration",
"configuration/[0]/EncryptionConfiguration/AtRestEncryptionConfiguration/LocalDiskEncryptionConfiguration/EnableAtRestEncryption"
]


check = EMRClusterConfEncryptsLocalDisk()
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import List

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck

Expand Down Expand Up @@ -30,5 +32,8 @@ def scan_resource_conf(self, conf):
return CheckResult.PASSED
return CheckResult.FAILED

def get_evaluated_keys(self) -> List[str]:
return ["setting"]


check = ElasticBeanstalkUseEnhancedHealthChecks()
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,8 @@ def scan_resource_conf(self, conf):
return CheckResult.PASSED
return CheckResult.FAILED

def get_evaluated_keys(self):
return ["setting"]


check = ElasticBeanstalkUseManagedUpdates()
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,8 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:

return CheckResult.FAILED

def get_evaluated_keys(self) -> list[str]:
return ["log_publishing_options"]


check = ElasticsearchDomainAuditLogging()
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
if self.entity_type == "aws_iam_role":
if "managed_policy_arns" in conf.keys():
if ADMIN_POLICY_ARN in conf["managed_policy_arns"][0]:
self.evaluated_keys = ["managed_policy_arns"]
return CheckResult.FAILED

elif self.entity_type in (
Expand All @@ -46,13 +47,15 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
):
policy_arn = conf.get("policy_arn")
if policy_arn and isinstance(policy_arn, list) and policy_arn[0] == ADMIN_POLICY_ARN:
self.evaluated_keys = ["policy_arn"]
return CheckResult.FAILED

elif self.entity_type in (
"aws_ssoadmin_managed_policy_attachment"
):
managed_policy_arn = conf.get("managed_policy_arn")
if managed_policy_arn and isinstance(managed_policy_arn, list) and managed_policy_arn[0] == ADMIN_POLICY_ARN:
self.evaluated_keys = ["managed_policy_arn"]
return CheckResult.FAILED

return CheckResult.PASSED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ def __init__(self) -> None:
def scan_resource_conf(self, conf: Dict[str, List[Any]]) -> CheckResult:
mappings = conf.get("block_device_mapping")
if mappings and isinstance(mappings, list):
self.evaluated_keys = ["block_device_mapping"]
for mapping in mappings:
if mapping.get("ebs"):
self.evaluated_keys.append("block_device_mapping/[0]/ebs")
ebs = mapping["ebs"][0]
if not ebs.get("encrypted"):
return CheckResult.FAILED
Expand Down
5 changes: 5 additions & 0 deletions checkov/terraform/checks/resource/aws/MQBrokerVersion.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import re
from typing import List

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck

Expand Down Expand Up @@ -37,5 +39,8 @@ def scan_resource_conf(self, conf) -> CheckResult:

return CheckResult.FAILED

def get_evaluated_keys(self) -> List[str]:
return ["engine_type", "engine_version"]


check = MQBrokerVersion()
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def scan_resource_conf(self, conf):
return CheckResult.FAILED

def get_evaluated_keys(self):
return 'backup_retention_period'
return ['backup_retention_period']


check = NeptuneClusterBackupRetention()
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Any
from typing import Any, List

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
Expand Down Expand Up @@ -44,5 +44,8 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:

return CheckResult.FAILED

def get_evaluated_keys(self) -> List[str]:
return ["enabled_cloudwatch_logs_exports"]


check = RDSClusterAuditLogging()
5 changes: 3 additions & 2 deletions checkov/terraform/checks/resource/aws/S3GlobalViewACL.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ def __init__(self) -> None:

def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
if 'access_control_policy' in conf:
for policy in conf.get('access_control_policy'):
for policy_idx, policy in enumerate(conf["access_control_policy"]):
if 'grant' in policy:
for grant in policy.get('grant'):
for grant_idx, grant in enumerate(policy["grant"]):
self.evaluated_keys = [f"access_control_policy/[{policy_idx}]/grant/[{grant_idx}]/permission"]
if (isinstance(grant, dict) and 'permission' in grant and
('FULL_CONTROL' in grant.get('permission') or 'READ_ACP' in grant.get('permission'))):
if 'grantee' in grant:
Expand Down
5 changes: 4 additions & 1 deletion checkov/terraform/checks/resource/aws/SQSQueueEncryption.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Any
from typing import Any, List

from checkov.common.models.enums import CheckCategories, CheckResult
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
Expand Down Expand Up @@ -29,5 +29,8 @@ def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:

return CheckResult.FAILED

def get_evaluated_keys(self) -> List[str]:
return ["sqs_managed_sse_enabled", "kms_master_key_id"]


check = SQSQueueEncryption()
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ def __init__(self) -> None:
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
self.evaluated_keys = ["rotation_rules"]
rules = conf.get("rotation_rules")
if rules and isinstance(rules, list):
days = rules[0].get('automatically_after_days')
if days and isinstance(days, list):
self.evaluated_keys = ["rotation_rules/[0]/automatically_after_days"]
days = force_int(days[0])
if days is not None and days < 90:
return CheckResult.PASSED
Expand Down
Loading
Loading