diff --git a/src/integrations/prefect-azure/prefect_azure/blob_storage.py b/src/integrations/prefect-azure/prefect_azure/blob_storage.py index 7a6edb5af999..1ac3444ee12f 100644 --- a/src/integrations/prefect-azure/prefect_azure/blob_storage.py +++ b/src/integrations/prefect-azure/prefect_azure/blob_storage.py @@ -10,9 +10,7 @@ if TYPE_CHECKING: from azure.storage.blob import BlobProperties - from pydantic import Field - from prefect import task from prefect.blocks.abstract import ObjectStorageBlock from prefect.filesystems import WritableDeploymentStorage, WritableFileSystem @@ -29,18 +27,20 @@ async def blob_storage_download( blob_storage_credentials: "AzureBlobStorageCredentials", ) -> bytes: """ - Downloads a blob with a given key from a given Blob Storage container. + Downloads a blob with a given key from a specified Blob Storage container. + Args: - blob: Name of the blob within this container to retrieve. container: Name of the Blob Storage container to retrieve from. + blob: Name of the blob within this container to retrieve. blob_storage_credentials: Credentials to use for authentication with Azure. + Returns: A `bytes` representation of the downloaded blob. + Example: - Download a file from a Blob Storage container + Download a file from a Blob Storage container: ```python from prefect import flow - from prefect_azure import AzureBlobStorageCredentials from prefect_azure.blob_storage import blob_storage_download @@ -64,8 +64,12 @@ def example_blob_storage_download_flow(): logger.info("Downloading blob from container %s with key %s", container, blob) async with blob_storage_credentials.get_blob_client(container, blob) as blob_client: - blob_obj = await blob_client.download_blob() - output = await blob_obj.content_as_bytes() + try: + blob_obj = await blob_client.download_blob() + output = await blob_obj.content_as_bytes() + except ResourceNotFoundError as exc: + logger.error("Blob %s not found in container %s: %s", blob, container, exc.reason) + raise RuntimeError(f"Download failed: {exc.reason}") from exc return output @@ -79,21 +83,22 @@ async def blob_storage_upload( overwrite: bool = False, ) -> str: """ - Uploads data to an Blob Storage container. + Uploads data to a specified Blob Storage container. + Args: data: Bytes representation of data to upload to Blob Storage. container: Name of the Blob Storage container to upload to. blob_storage_credentials: Credentials to use for authentication with Azure. - blob: Name of the blob within this container to retrieve. - overwrite: If `True`, an existing blob with the same name will be overwritten. - Defaults to `False` and an error will be thrown if the blob already exists. + blob: Name of the blob within this container to upload. + overwrite: If `True`, overwrite an existing blob with the same name. + Returns: - The blob name of the uploaded object + The blob name of the uploaded object. + Example: - Read and upload a file to a Blob Storage container + Upload a file to a Blob Storage container: ```python from prefect import flow - from prefect_azure import AzureBlobStorageCredentials from prefect_azure.blob_storage import blob_storage_upload @@ -119,16 +124,76 @@ def example_blob_storage_upload_flow(): logger = get_run_logger() logger.info("Uploading blob to container %s with key %s", container, blob) - # create key if not provided + # Create key if not provided if blob is None: blob = str(uuid.uuid4()) async with blob_storage_credentials.get_blob_client(container, blob) as blob_client: - await blob_client.upload_blob(data, overwrite=overwrite) + try: + await blob_client.upload_blob(data, overwrite=overwrite) + except ResourceNotFoundError as exc: + logger.error("Error uploading blob %s to container %s: %s", blob, container, exc.reason) + raise RuntimeError(f"Upload failed: {exc.reason}") from exc return blob +@task +async def blob_storage_list( + container: str, + blob_storage_credentials: "AzureBlobStorageCredentials", + name_starts_with: Optional[str] = None, + include: Union[str, List[str], None] = None, + **kwargs, +) -> List["BlobProperties"]: + """ + Lists objects from a specified Blob Storage container. + + Args: + container: Name of the Blob Storage container to list blobs from. + blob_storage_credentials: Credentials to use for authentication with Azure. + name_starts_with: Filters results to only include blobs whose names start with the specified prefix. + include: Specifies additional datasets to include in the response. + + Returns: + A list of `BlobProperties` containing metadata about the blobs. + + Example: + List blobs in a container: + ```python + from prefect import flow + from prefect_azure import AzureBlobStorageCredentials + from prefect_azure.blob_storage import blob_storage_list + + @flow + def example_blob_storage_list_flow(): + connection_string = "connection_string" + blob_storage_credentials = AzureBlobStorageCredentials( + connection_string=connection_string, + ) + data = blob_storage_list( + container="container", + blob_storage_credentials=blob_storage_credentials, + ) + return data + + example_blob_storage_list_flow() + ``` + """ + logger = get_run_logger() + logger.info("Listing blobs from container %s", container) + + async with blob_storage_credentials.get_container_client(container) as container_client: + blobs = [ + blob + async for blob in container_client.list_blobs( + name_starts_with=name_starts_with, include=include, **kwargs + ) + ] + + return blobs + + @task async def blob_storage_list( container: str,