Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update blob_storage.py #15435

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 82 additions & 17 deletions src/integrations/prefect-azure/prefect_azure/blob_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
if TYPE_CHECKING:
from azure.storage.blob import BlobProperties


from pydantic import Field

from prefect import task
from prefect.blocks.abstract import ObjectStorageBlock
from prefect.filesystems import WritableDeploymentStorage, WritableFileSystem
Expand All @@ -29,18 +27,20 @@ async def blob_storage_download(
blob_storage_credentials: "AzureBlobStorageCredentials",
) -> bytes:
"""
Downloads a blob with a given key from a given Blob Storage container.
Downloads a blob with a given key from a specified Blob Storage container.
Args:
blob: Name of the blob within this container to retrieve.
container: Name of the Blob Storage container to retrieve from.
blob: Name of the blob within this container to retrieve.
blob_storage_credentials: Credentials to use for authentication with Azure.
Returns:
A `bytes` representation of the downloaded blob.
Example:
Download a file from a Blob Storage container
Download a file from a Blob Storage container:
```python
from prefect import flow
from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_download
Expand All @@ -64,8 +64,12 @@ def example_blob_storage_download_flow():
logger.info("Downloading blob from container %s with key %s", container, blob)

async with blob_storage_credentials.get_blob_client(container, blob) as blob_client:
blob_obj = await blob_client.download_blob()
output = await blob_obj.content_as_bytes()
try:
blob_obj = await blob_client.download_blob()
output = await blob_obj.content_as_bytes()
except ResourceNotFoundError as exc:
logger.error("Blob %s not found in container %s: %s", blob, container, exc.reason)
raise RuntimeError(f"Download failed: {exc.reason}") from exc

return output

Expand All @@ -79,21 +83,22 @@ async def blob_storage_upload(
overwrite: bool = False,
) -> str:
"""
Uploads data to an Blob Storage container.
Uploads data to a specified Blob Storage container.
Args:
data: Bytes representation of data to upload to Blob Storage.
container: Name of the Blob Storage container to upload to.
blob_storage_credentials: Credentials to use for authentication with Azure.
blob: Name of the blob within this container to retrieve.
overwrite: If `True`, an existing blob with the same name will be overwritten.
Defaults to `False` and an error will be thrown if the blob already exists.
blob: Name of the blob within this container to upload.
overwrite: If `True`, overwrite an existing blob with the same name.
Returns:
The blob name of the uploaded object
The blob name of the uploaded object.
Example:
Read and upload a file to a Blob Storage container
Upload a file to a Blob Storage container:
```python
from prefect import flow
from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_upload
Expand All @@ -119,16 +124,76 @@ def example_blob_storage_upload_flow():
logger = get_run_logger()
logger.info("Uploading blob to container %s with key %s", container, blob)

# create key if not provided
# Create key if not provided
if blob is None:
blob = str(uuid.uuid4())

async with blob_storage_credentials.get_blob_client(container, blob) as blob_client:
await blob_client.upload_blob(data, overwrite=overwrite)
try:
await blob_client.upload_blob(data, overwrite=overwrite)
except ResourceNotFoundError as exc:
logger.error("Error uploading blob %s to container %s: %s", blob, container, exc.reason)
raise RuntimeError(f"Upload failed: {exc.reason}") from exc

return blob


@task
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this appears to be a duplicate of the below function?

async def blob_storage_list(
container: str,
blob_storage_credentials: "AzureBlobStorageCredentials",
name_starts_with: Optional[str] = None,
include: Union[str, List[str], None] = None,
**kwargs,
) -> List["BlobProperties"]:
"""
Lists objects from a specified Blob Storage container.
Args:
container: Name of the Blob Storage container to list blobs from.
blob_storage_credentials: Credentials to use for authentication with Azure.
name_starts_with: Filters results to only include blobs whose names start with the specified prefix.
include: Specifies additional datasets to include in the response.
Returns:
A list of `BlobProperties` containing metadata about the blobs.
Example:
List blobs in a container:
```python
from prefect import flow
from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_list
@flow
def example_blob_storage_list_flow():
connection_string = "connection_string"
blob_storage_credentials = AzureBlobStorageCredentials(
connection_string=connection_string,
)
data = blob_storage_list(
container="container",
blob_storage_credentials=blob_storage_credentials,
)
return data
example_blob_storage_list_flow()
```
"""
logger = get_run_logger()
logger.info("Listing blobs from container %s", container)

async with blob_storage_credentials.get_container_client(container) as container_client:
blobs = [
blob
async for blob in container_client.list_blobs(
name_starts_with=name_starts_with, include=include, **kwargs
)
]

return blobs


@task
async def blob_storage_list(
container: str,
Expand Down
Loading