Skip to content

Commit

Permalink
Refactor storage methods and improve error handling
Browse files Browse the repository at this point in the history
Renamed `getFileStream` to `getUncompressedFileStream` and added decompression for non-binary files using `GZipStream`. Updated `getReadSharedAccessSignature` to return `UriWithSharedAccessSignature` type and added error handling for unsupported storage providers. Modified `getWriteSharedAccessSignature` to return a result type and added error handling. Refactored `SaveFileToObjectStorageWithMetadata` for performance and error handling. Removed `fileExists` method from `Storage.Server`. Updated `GetUploadUriParameters` to handle new result type. Changed `UriWithSharedAccessSignature` type from string to `Uri` in `Types.Shared`. Added error logging to `uploadFilesToObjectStorage` in `Services.CLI`. Improved file upload efficiency in `Storage.SDK`. Made minor documentation and comment updates for clarity.
  • Loading branch information
ScottArbeit committed Dec 30, 2024
1 parent 04a4140 commit 0272762
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 159 deletions.
123 changes: 69 additions & 54 deletions src/Grace.Actors/Diff.Actor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,12 @@ module Diff =
}

/// Gets a Stream from object storage for a specific FileVersion, using a generated Uri.
member private this.getFileStream (repositoryDto: RepositoryDto) (fileVersion: FileVersion) (url: UriWithSharedAccessSignature) correlationId =
member private this.getUncompressedFileStream
(repositoryDto: RepositoryDto)
(fileVersion: FileVersion)
(url: UriWithSharedAccessSignature)
correlationId
=
task {
this.correlationId <- correlationId
let repositoryActorProxy = Repository.CreateActorProxy repositoryDto.RepositoryId correlationId
Expand All @@ -138,12 +143,17 @@ module Diff =
match objectStorageProvider with
| AWSS3 -> return new MemoryStream() :> Stream
| AzureBlobStorage ->
let blobClient = BlockBlobClient(Uri(url))
let blobClient = BlockBlobClient(url)
logToConsole $"In DiffActor.getFileStream(): blobClient.Uri: {blobClient.Uri}."
let fileStream = blobClient.OpenRead(position = 0, bufferSize = (64 * 1024))
let uncompressedStream = if fileVersion.IsBinary then fileStream else fileStream
//let gzStream = new GZipStream(fileStream, CompressionMode.Decompress, leaveOpen = true)
//gzStream :> Stream
let! fileStream = blobClient.OpenReadAsync(position = 0, bufferSize = (64 * 1024))

let uncompressedStream =
if fileVersion.IsBinary then
fileStream
else
use gzStream = new GZipStream(fileStream, CompressionMode.Decompress, leaveOpen = true)
gzStream :> Stream

return uncompressedStream
| GoogleCloudStorage -> return new MemoryStream() :> Stream
| ObjectStorageProvider.Unknown -> return new MemoryStream() :> Stream
Expand Down Expand Up @@ -307,9 +317,12 @@ module Diff =
//logToConsole $"In DiffActor.getFileStream(); relativePath: {relativePath}; relativeDirectoryPath: {relativeDirectoryPath}; graceIndex.Count: {graceIndex.Count}."
let directory = graceIndex[relativeDirectoryPath]
let fileVersion = directory.Files.First(fun f -> f.RelativePath = relativePath)
let! uri = getReadSharedAccessSignature repositoryDto fileVersion correlationId
let! stream = this.getFileStream repositoryDto fileVersion (Result.get uri) correlationId
return (stream, fileVersion)

match! getReadSharedAccessSignature repositoryDto fileVersion correlationId with
| Ok uri ->
let! uncompressedFileStream = this.getUncompressedFileStream repositoryDto fileVersion uri correlationId
return Ok(uncompressedFileStream, fileVersion)
| Error ex -> return Error ex
}

// Process each difference.
Expand All @@ -329,51 +342,53 @@ module Diff =
| Directory -> () // Might have to revisit this.
| File ->
// Get streams for both file versions.
let! (fileStream1, fileVersion1) = getFileStream graceIndex1 difference.RelativePath repositoryDto

let! (fileStream2, fileVersion2) = getFileStream graceIndex2 difference.RelativePath repositoryDto

// Compare the streams using DiffPlex, and get the Inline and Side-by-Side diffs.
let! diffResults =
task {
let (fv1, fv2) = (fileVersion1, fileVersion2)

if createdAt1.CompareTo(createdAt2) < 0 then
return! diffTwoFiles fileStream1 fileStream2
else
return! diffTwoFiles fileStream2 fileStream1
}

if not <| isNull (fileStream1) then do! fileStream1.DisposeAsync()

if not <| isNull (fileStream2) then do! fileStream2.DisposeAsync()

// Create a FileDiff with the DiffPlex results and corresponding Sha256Hash values.
let fileDiff =
if createdAt1.CompareTo(createdAt2) < 0 then
FileDiff.Create
fileVersion1.RelativePath
fileVersion1.Sha256Hash
fileVersion1.CreatedAt
fileVersion2.Sha256Hash
fileVersion2.CreatedAt
(fileVersion1.IsBinary || fileVersion2.IsBinary)
diffResults.InlineDiff
diffResults.SideBySideOld
diffResults.SideBySideNew
else
FileDiff.Create
fileVersion1.RelativePath
fileVersion2.Sha256Hash
fileVersion1.CreatedAt
fileVersion1.Sha256Hash
fileVersion2.CreatedAt
(fileVersion1.IsBinary || fileVersion2.IsBinary)
diffResults.InlineDiff
diffResults.SideBySideOld
diffResults.SideBySideNew

fileDiffs.Add(fileDiff)
let! result1 = getFileStream graceIndex1 difference.RelativePath repositoryDto

let! result2 = getFileStream graceIndex2 difference.RelativePath repositoryDto

match (result1, result2) with
| (Ok(fileStream1, fileVersion1), Ok(fileStream2, fileVersion2)) ->
try
// Compare the streams using DiffPlex, and get the Inline and Side-by-Side diffs.
let! diffResults =
task {
if createdAt1.CompareTo(createdAt2) < 0 then
return! diffTwoFiles fileStream1 fileStream2
else
return! diffTwoFiles fileStream2 fileStream1
}

// Create a FileDiff with the DiffPlex results and corresponding Sha256Hash values.
let fileDiff =
if createdAt1.CompareTo(createdAt2) < 0 then
FileDiff.Create
fileVersion1.RelativePath
fileVersion1.Sha256Hash
fileVersion1.CreatedAt
fileVersion2.Sha256Hash
fileVersion2.CreatedAt
(fileVersion1.IsBinary || fileVersion2.IsBinary)
diffResults.InlineDiff
diffResults.SideBySideOld
diffResults.SideBySideNew
else
FileDiff.Create
fileVersion1.RelativePath
fileVersion2.Sha256Hash
fileVersion1.CreatedAt
fileVersion1.Sha256Hash
fileVersion2.CreatedAt
(fileVersion1.IsBinary || fileVersion2.IsBinary)
diffResults.InlineDiff
diffResults.SideBySideOld
diffResults.SideBySideNew

fileDiffs.Add(fileDiff)
finally
if not <| isNull fileStream1 then fileStream1.Dispose()
if not <| isNull fileStream2 then fileStream2.Dispose()
| (Error ex, _) -> raise ex
| (_, Error ex) -> raise ex
| Add -> ()
| Delete -> ()
}
Expand Down
29 changes: 10 additions & 19 deletions src/Grace.Actors/Services.Actor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ module Services =
let blobSasBuilder =
BlobSasBuilder(
permissions = permission,
expiresOn = DateTimeOffset.UtcNow.Add(TimeSpan.FromMinutes(Constants.SharedAccessSignatureExpiration)),
expiresOn = DateTimeOffset.UtcNow.Add(TimeSpan.FromMinutes(SharedAccessSignatureExpiration)),
StartsOn = DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(15.0)),
BlobContainerName = blobContainerClient.Name,
BlobName = Path.Combine($"{fileVersion.RelativePath}", fileVersion.GetObjectFileName),
Expand All @@ -175,7 +175,7 @@ module Services =

let sasUriParameters = blobSasBuilder.ToSasQueryParameters(sharedKeyCredential)

return Ok $"{blobContainerClient.Uri}/{fileVersion.RelativePath}/{fileVersion.GetObjectFileName}?{sasUriParameters}"
return UriWithSharedAccessSignature($"{blobContainerClient.Uri}/{fileVersion.RelativePath}/{fileVersion.GetObjectFileName}?{sasUriParameters}")
}

/// Gets a shared access signature for reading from the object storage provider.
Expand All @@ -185,20 +185,17 @@ module Services =
| AzureBlobStorage ->
let permissions = (BlobSasPermissions.Read ||| BlobSasPermissions.List) // These are the minimum permissions needed to read a file.
let! sas = createAzureBlobSasUri repositoryDto fileVersion permissions correlationId

match sas with
| Ok sas -> return Ok(sas.ToString())
| Error error -> return Error error
| AWSS3 -> return Error "Not implemented"
| GoogleCloudStorage -> return Error "Not implemented"
| ObjectStorageProvider.Unknown -> return Error "Not implemented"
return Ok sas
| AWSS3 -> return Error(NotImplementedException("AWS S3 storage type is not implemented."))
| GoogleCloudStorage -> return Error(NotImplementedException("Google Cloud storage type is not implemented."))
| ObjectStorageProvider.Unknown -> return Error(NotImplementedException("Unknown storage type."))
}

/// Gets a shared access signature for writing to the object storage provider.
let getWriteSharedAccessSignature (repositoryDto: RepositoryDto) (fileVersion: FileVersion) (correlationId: CorrelationId) =
task {
match repositoryDto.ObjectStorageProvider with
| AWSS3 -> return Uri("http://localhost:3500")
| AWSS3 -> return Error(NotImplementedException("AWS S3 storage type is not implemented."))
| AzureBlobStorage ->
// Adding read permission to allow for calls to .ExistsAsync().
let! sas =
Expand All @@ -212,15 +209,9 @@ module Services =
||| BlobSasPermissions.Read)
correlationId

match sas with
| Ok sas ->
//logToConsole $"In Actor.Services.getWriteSharedAccessSignature; {sas}"
return Uri(sas)
| Error error ->
//logToConsole $"In Actor.Services.getWriteSharedAccessSignature; {error}"
return Uri("http://localhost")
| GoogleCloudStorage -> return Uri("http://localhost:3500")
| ObjectStorageProvider.Unknown -> return Uri("http://localhost:3500")
return Ok sas
| GoogleCloudStorage -> return Error(NotImplementedException("Google Cloud storage type is not implemented."))
| ObjectStorageProvider.Unknown -> return Error(NotImplementedException("Unknown storage type."))
}

/// Checks whether an owner name exists in the system.
Expand Down
7 changes: 6 additions & 1 deletion src/Grace.CLI/Command/Services.CLI.fs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,12 @@ module Services =
parameters.CorrelationId
with
| Ok result -> () //logToAnsiConsole Colors.Verbose $"In Services.uploadFilesToObjectStorage(): Uploaded {fileVersion.GetObjectFileName} to object storage."
| Error error -> errors.Enqueue(error)
| Error error ->
logToAnsiConsole
Colors.Error
$"Error uploading {fileVersion.GetObjectFileName} to object storage: {error.Error}"

errors.Enqueue(error)
}
))
)
Expand Down
4 changes: 2 additions & 2 deletions src/Grace.CLI/Command/Watch.CLI.fs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ module Watch =

/// Holds a list of the created or changed files that we need to process, as determined by the FileSystemWatcher.
///
/// NOTE: We're using ConcurrentDictionary because it's safe for multithreading, doesn't allow us to insert the same key twice, and for its algorithms. We're not using the values of the ConcurrentDictionary here, only the keys.
/// Note: We're using ConcurrentDictionary because it's safe for multithreading, doesn't allow us to insert the same key twice, and for its algorithms. We're not using the values of the ConcurrentDictionary here, only the keys.
let private filesToProcess = ConcurrentDictionary<string, unit>()

/// Holds a list of the created or changed directories that we need to process, as determined by the FileSystemWatcher.
///
/// NOTE: We're using ConcurrentDictionary because it's safe for multithreading, doesn't allow us to insert the same key twice, and for its algorithms. We're not using the values of the ConcurrentDictionary here, only the keys.
/// Note: We're using ConcurrentDictionary because it's safe for multithreading, doesn't allow us to insert the same key twice, and for its algorithms. We're not using the values of the ConcurrentDictionary here, only the keys.
let private directoriesToProcess = ConcurrentDictionary<string, unit>()

type WatchParameters() =
Expand Down
Loading

0 comments on commit 0272762

Please sign in to comment.