Crypto shredding is the practice of 'deleting' data through the destruction of the cryptographic keys protecting the data.
You can find the source on GitHub.
- Python 3.6+
Note
If you have not already installed cryptography, you might need to install additional prerequisites as detailed in the cryptography installation guide for your operating system.
$ pip install cryptoshredding
import boto3
from cryptoshredding import DynamodbKeyStore
from dynamodb_encryption_sdk.material_providers.aws_kms import AwsKmsCryptographicMaterialsProvider
aws_cmk_id = "arn:aws:kms:YOUR_KEY"
aws_kms_cmp = AwsKmsCryptographicMaterialsProvider(key_id=aws_cmk_id)
table = boto3.resource("dynamodb").Table("key_store_table")
key_store = DynamodbKeyStore(table=table, materials_provider=aws_kms_cmp)
key_id = "key4711"
key_store.create_main_key(key_id)
main_key = key_store.get_main_key(key_id)
key_store.delete_main_key(key_id) # shredding
import boto3
from cryptoshredding import MainKey
main_key = key_store.get_main_key(key_id)
data_key, encrypted_data_key = main_key.generate_data_key()
decrypted_data_key = main_key.decrypt(encrypted_data_key)
assert data_key == decrypted_data_key
import boto3
from cryptoshredding.dynamodb import CryptoTable
table = boto3.resource("dynamodb").Table("data_table")
crypto_table = CryptoTable(
table=table,
key_store=key_store,
)
crypto_table.put_item(
CSEKeyId=key_id,
Item=plaintext_item
)
index_key = {"id": "foo"}
encrypted_item = table.get_item(Key=index_key)["Item"]
decrypted_item = crypto_table.get_item(Key=index_key)["Item"]
encrypted_items = table.scan()["Items"]
decrypted_items = crypto_table.scan()["Items"]
assert len(encrypted_items) == 1
assert len(decrypted_items) == 1
key_store.delete_main_key(key_id) # shredding
encrypted_items = table.scan()["Items"]
decrypted_items = crypto_table.scan()["Items"]
assert len(encrypted_items) == 1
assert len(decrypted_items) == 0 # !!!
import boto3
from cryptoshredding.s3 import CryptoClient
s3 = boto3.client("s3", region_name="us-east-1")
crypto_client = CryptoClient(
client=s3,
key_store=key_store,
)
crypto_s3.put_object(
CSEKeyId=key_id,
Bucket=bucket.name,
Key="object",
Body="foo bar"",
)
encrypted_obj = s3.get_object(
Bucket=bucket.name,
Key="object",
)
decrypted_obj = crypto_s3.get_object(
Bucket=bucket.name,
Key="object",
)
from cryptoshredding.raw import CryptoFile
crypto_file = CryptoFile(
key_store=key_store,
)
crypto_file.encrypt(
key_id=key_id,
plaintext_filename="plain.txt",
ciphertext_filename="cipher.txt"
)
crypto_file.decrypt(
ciphertext_filename="cipher.txt",
plaintext_filename="decrypt.txt",
)
from cryptoshredding.raw import CryptoBytes
crypto_bytes = CryptoBytes(
key_store=key_store,
)
encrypted, encrypted_header = crypto_bytes.encrypt(
key_id=key_id,
data=plain,
)
decrypted, decrypted_header = crypto_bytes.decrypt(
data=encrypted,
)
import boto3
from cryptoshredding.kinesis import CryptoClient
kinesis = boto3.client("kinesis", region_name="us-east-1")
crypto_kinesis = CryptoClient(
client=kinesis,
key_store=key_store,
)
data = b"foo bar"
crypto_kinesis.put_record(
CSEKeyId=key_id,
StreamName=stream_name,
Data=data,
PartitionKey="key1",
)
response = crypto_kinesis.describe_stream(
StreamName=stream_name,
)
shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]
response = crypto_kinesis.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType="TRIM_HORIZON",
)
shard_iterator = response["ShardIterator"]
encrypred_response = kinesis.get_records(ShardIterator=shard_iterator)
decrypred_response = crypto_kinesis.get_records(ShardIterator=shard_iterator)
assert len(encrypred_response["Records"]) == 1
assert data != encrypred_response["Records"][0]["Data"]
assert len(decrypred_response["Records"]) == 1
assert data == decrypred_response["Records"][0]["Data"]