6
6
from pydantic import Field
7
7
8
8
from prefect import task
9
+ from prefect ._internal .compatibility .async_dispatch import async_dispatch
9
10
from prefect .blocks .abstract import SecretBlock
10
11
from prefect .logging import get_run_logger
11
- from prefect .utilities .asyncutils import run_sync_in_worker_thread , sync_compatible
12
+ from prefect .utilities .asyncutils import run_sync_in_worker_thread
12
13
from prefect_aws import AwsCredentials
13
14
14
15
@@ -365,22 +366,21 @@ class AwsSecret(SecretBlock):
365
366
secret_name: The name of the secret.
366
367
"""
367
368
368
- _logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/d74b16fe84ce626345adf235a47008fea2869a60-225x225.png" # noqa
369
+ _logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/d74b16fe84ce626345adf235a47008fea2869a60-225x225.png" # type: ignore
369
370
_block_type_name = "AWS Secret"
370
- _documentation_url = "https://docs.prefect.io/integrations/prefect-aws" # noqa
371
+ _documentation_url = "https://docs.prefect.io/integrations/prefect-aws" # type: ignore
371
372
372
373
aws_credentials : AwsCredentials
373
374
secret_name : str = Field (default = ..., description = "The name of the secret." )
374
375
375
- @sync_compatible
376
- async def read_secret (
376
+ async def aread_secret (
377
377
self ,
378
378
version_id : Optional [str ] = None ,
379
379
version_stage : Optional [str ] = None ,
380
- ** read_kwargs : Dict [ str , Any ] ,
380
+ ** read_kwargs : Any ,
381
381
) -> bytes :
382
382
"""
383
- Reads the secret from the secret storage service.
383
+ Asynchronously reads the secret from the secret storage service.
384
384
385
385
Args:
386
386
version_id: The version of the secret to read. If not provided, the latest
@@ -397,7 +397,7 @@ async def read_secret(
397
397
Reads a secret.
398
398
```python
399
399
secrets_manager = SecretsManager.load("MY_BLOCK")
400
- secrets_manager.read_secret ()
400
+ await secrets_manager.aread_secret ()
401
401
```
402
402
"""
403
403
client = self .aws_credentials .get_secrets_manager_client ()
@@ -416,12 +416,53 @@ async def read_secret(
416
416
self .logger .info (f"The secret { arn !r} data was successfully read." )
417
417
return secret
418
418
419
- @sync_compatible
420
- async def write_secret (
419
+ @async_dispatch (aread_secret )
420
+ def read_secret (
421
+ self ,
422
+ version_id : Optional [str ] = None ,
423
+ version_stage : Optional [str ] = None ,
424
+ ** read_kwargs : Any ,
425
+ ) -> bytes :
426
+ """
427
+ Reads the secret from the secret storage service.
428
+
429
+ Args:
430
+ version_id: The version of the secret to read. If not provided, the latest
431
+ version will be read.
432
+ version_stage: The version stage of the secret to read. If not provided,
433
+ the latest version will be read.
434
+ read_kwargs: Additional keyword arguments to pass to the
435
+ `get_secret_value` method of the boto3 client.
436
+
437
+ Returns:
438
+ The secret data.
439
+
440
+ Examples:
441
+ Reads a secret.
442
+ ```python
443
+ secrets_manager = SecretsManager.load("MY_BLOCK")
444
+ secrets_manager.read_secret()
445
+ ```
446
+ """
447
+ client = self .aws_credentials .get_secrets_manager_client ()
448
+ if version_id is not None :
449
+ read_kwargs ["VersionId" ] = version_id
450
+ if version_stage is not None :
451
+ read_kwargs ["VersionStage" ] = version_stage
452
+ response = client .get_secret_value (SecretId = self .secret_name , ** read_kwargs )
453
+ if "SecretBinary" in response :
454
+ secret = response ["SecretBinary" ]
455
+ elif "SecretString" in response :
456
+ secret = response ["SecretString" ]
457
+ arn = response ["ARN" ]
458
+ self .logger .info (f"The secret { arn !r} data was successfully read." )
459
+ return secret
460
+
461
+ async def awrite_secret (
421
462
self , secret_data : bytes , ** put_or_create_secret_kwargs : Dict [str , Any ]
422
463
) -> str :
423
464
"""
424
- Writes the secret to the secret storage service as a SecretBinary;
465
+ Asynchronously writes the secret to the secret storage service as a SecretBinary;
425
466
if it doesn't exist, it will be created.
426
467
427
468
Args:
@@ -436,7 +477,7 @@ async def write_secret(
436
477
Write some secret data.
437
478
```python
438
479
secrets_manager = SecretsManager.load("MY_BLOCK")
439
- secrets_manager.write_secret (b"my_secret_data")
480
+ await secrets_manager.awrite_secret (b"my_secret_data")
440
481
```
441
482
"""
442
483
client = self .aws_credentials .get_secrets_manager_client ()
@@ -461,15 +502,57 @@ async def write_secret(
461
502
self .logger .info (f"The secret data was written successfully to { arn !r} ." )
462
503
return arn
463
504
464
- @sync_compatible
465
- async def delete_secret (
505
+ @async_dispatch (awrite_secret )
506
+ def write_secret (
507
+ self , secret_data : bytes , ** put_or_create_secret_kwargs : Dict [str , Any ]
508
+ ) -> str :
509
+ """
510
+ Writes the secret to the secret storage service as a SecretBinary;
511
+ if it doesn't exist, it will be created.
512
+
513
+ Args:
514
+ secret_data: The secret data to write.
515
+ **put_or_create_secret_kwargs: Additional keyword arguments to pass to
516
+ put_secret_value or create_secret method of the boto3 client.
517
+
518
+ Returns:
519
+ The path that the secret was written to.
520
+
521
+ Examples:
522
+ Write some secret data.
523
+ ```python
524
+ secrets_manager = SecretsManager.load("MY_BLOCK")
525
+ secrets_manager.write_secret(b"my_secret_data")
526
+ ```
527
+ """
528
+ client = self .aws_credentials .get_secrets_manager_client ()
529
+ try :
530
+ response = client .put_secret_value (
531
+ SecretId = self .secret_name ,
532
+ SecretBinary = secret_data ,
533
+ ** put_or_create_secret_kwargs ,
534
+ )
535
+ except client .exceptions .ResourceNotFoundException :
536
+ self .logger .info (
537
+ f"The secret { self .secret_name !r} does not exist yet, creating it now."
538
+ )
539
+ response = client .create_secret (
540
+ Name = self .secret_name ,
541
+ SecretBinary = secret_data ,
542
+ ** put_or_create_secret_kwargs ,
543
+ )
544
+ arn = response ["ARN" ]
545
+ self .logger .info (f"The secret data was written successfully to { arn !r} ." )
546
+ return arn
547
+
548
+ async def adelete_secret (
466
549
self ,
467
550
recovery_window_in_days : int = 30 ,
468
551
force_delete_without_recovery : bool = False ,
469
552
** delete_kwargs : Dict [str , Any ],
470
553
) -> str :
471
554
"""
472
- Deletes the secret from the secret storage service.
555
+ Asynchronously deletes the secret from the secret storage service.
473
556
474
557
Args:
475
558
recovery_window_in_days: The number of days to wait before permanently
@@ -486,7 +569,7 @@ async def delete_secret(
486
569
Deletes the secret with a recovery window of 15 days.
487
570
```python
488
571
secrets_manager = SecretsManager.load("MY_BLOCK")
489
- secrets_manager.delete_secret (recovery_window_in_days=15)
572
+ await secrets_manager.adelete_secret (recovery_window_in_days=15)
490
573
```
491
574
"""
492
575
if force_delete_without_recovery and recovery_window_in_days :
@@ -510,3 +593,52 @@ async def delete_secret(
510
593
arn = response ["ARN" ]
511
594
self .logger .info (f"The secret { arn } was deleted successfully." )
512
595
return arn
596
+
597
+ @async_dispatch (adelete_secret )
598
+ def delete_secret (
599
+ self ,
600
+ recovery_window_in_days : int = 30 ,
601
+ force_delete_without_recovery : bool = False ,
602
+ ** delete_kwargs : Dict [str , Any ],
603
+ ) -> str :
604
+ """
605
+ Deletes the secret from the secret storage service.
606
+
607
+ Args:
608
+ recovery_window_in_days: The number of days to wait before permanently
609
+ deleting the secret. Must be between 7 and 30 days.
610
+ force_delete_without_recovery: If True, the secret will be deleted
611
+ immediately without a recovery window.
612
+ **delete_kwargs: Additional keyword arguments to pass to the
613
+ delete_secret method of the boto3 client.
614
+
615
+ Returns:
616
+ The path that the secret was deleted from.
617
+
618
+ Examples:
619
+ Deletes the secret with a recovery window of 15 days.
620
+ ```python
621
+ secrets_manager = SecretsManager.load("MY_BLOCK")
622
+ secrets_manager.delete_secret(recovery_window_in_days=15)
623
+ ```
624
+ """
625
+ if force_delete_without_recovery and recovery_window_in_days :
626
+ raise ValueError (
627
+ "Cannot specify recovery window and force delete without recovery."
628
+ )
629
+ elif not (7 <= recovery_window_in_days <= 30 ):
630
+ raise ValueError (
631
+ "Recovery window must be between 7 and 30 days, got "
632
+ f"{ recovery_window_in_days } ."
633
+ )
634
+
635
+ client = self .aws_credentials .get_secrets_manager_client ()
636
+ response = client .delete_secret (
637
+ SecretId = self .secret_name ,
638
+ RecoveryWindowInDays = recovery_window_in_days ,
639
+ ForceDeleteWithoutRecovery = force_delete_without_recovery ,
640
+ ** delete_kwargs ,
641
+ )
642
+ arn = response ["ARN" ]
643
+ self .logger .info (f"The secret { arn } was deleted successfully." )
644
+ return arn
0 commit comments