Skip to content

Commit

Permalink
Refactor reminder and storage handling
Browse files Browse the repository at this point in the history
Refactored reminder service to use configurable batch size and updated storage operations to handle multiple file versions. Introduced new admin routes for Cosmos DB management. Improved parameter handling and logging for better maintainability and debugging.
  • Loading branch information
ScottArbeit committed Dec 29, 2024
1 parent 5c49ed0 commit 2e4675e
Show file tree
Hide file tree
Showing 23 changed files with 663 additions and 219 deletions.
2 changes: 1 addition & 1 deletion src/Grace.Actors/Constants.Actor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ module Constants =
#if DEBUG
let DefaultPhysicalDeletionReminderDuration = Duration.FromSeconds(300.0)
#else
let DefaultPhysicalDeletionReminderTime = Duration.FromDays(7.0)
let DefaultPhysicalDeletionReminderDuration = Duration.FromDays(7.0)
#endif

/// The time to wait between logical and physical deletion of an actor's state, as a TimeSpan.
Expand Down
10 changes: 3 additions & 7 deletions src/Grace.Actors/Diff.Actor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,10 @@ module Diff =
}

/// Gets a Stream from object storage for a specific FileVersion, using a generated Uri.
member private this.getFileStream (fileVersion: FileVersion) (url: UriWithSharedAccessSignature) correlationId =
member private this.getFileStream (repositoryDto: RepositoryDto) (fileVersion: FileVersion) (url: UriWithSharedAccessSignature) correlationId =
task {
this.correlationId <- correlationId
let repositoryActorId = Repository.GetActorId(fileVersion.RepositoryId)

let repositoryActorProxy =
actorProxyFactory.CreateActorProxyWithCorrelationId<IRepositoryActor>(repositoryActorId, ActorName.Repository, correlationId)

let repositoryActorProxy = Repository.CreateActorProxy repositoryDto.RepositoryId correlationId
let! objectStorageProvider = repositoryActorProxy.GetObjectStorageProvider correlationId

match objectStorageProvider with
Expand Down Expand Up @@ -312,7 +308,7 @@ module Diff =
let directory = graceIndex[relativeDirectoryPath]
let fileVersion = directory.Files.First(fun f -> f.RelativePath = relativePath)
let! uri = getReadSharedAccessSignature repositoryDto fileVersion correlationId
let! stream = this.getFileStream fileVersion (Result.get uri) correlationId
let! stream = this.getFileStream repositoryDto fileVersion (Result.get uri) correlationId
return (stream, fileVersion)
}

Expand Down
1 change: 0 additions & 1 deletion src/Grace.Actors/FileAppearance.Actor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ open System.Threading.Tasks

module FileAppearance =

let GetActorId (fileVersion: FileVersion) = ActorId($"{fileVersion.RepositoryId}*{fileVersion.RelativePath}*{fileVersion.Sha256Hash}")
let actorName = ActorName.FileAppearance
let log = loggerFactory.CreateLogger("FileAppearance.Actor")

Expand Down
38 changes: 30 additions & 8 deletions src/Grace.Actors/Services.Actor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,7 @@ module Services =
/// Deletes all documents from CosmosDb.
///
/// **** This method is implemented only in Debug configuration. It is a no-op in Release configuration. ****
let deleteAllFromCosmosDB () =
let deleteAllFromCosmosDBThatMatch (queryDefinition: QueryDefinition) =
task {
#if DEBUG
let failed = List<string>()
Expand All @@ -1130,10 +1130,7 @@ module Services =
let parallelOptions = ParallelOptions(MaxDegreeOfParallelism = 3)

let itemRequestOptions =
ItemRequestOptions(AddRequestHeaders = fun headers -> headers.Add(Constants.CorrelationIdHeaderKey, "Deleting all records from CosmosDB"))

let queryDefinition = QueryDefinition("""SELECT c.id, c.partitionKey FROM c ORDER BY c.partitionKey""")
//let queryDefinition = QueryDefinition("""SELECT c.id, c.partitionKey FROM c WHERE ENDSWITH(c.id, "||Rmd") ORDER BY c.partitionKey""")
ItemRequestOptions(AddRequestHeaders = fun headers -> headers.Add(Constants.CorrelationIdHeaderKey, "deleteAllFromCosmosDBThatMatch"))

let mutable totalRecordsDeleted = 0
let overallStartTime = getCurrentInstant ()
Expand Down Expand Up @@ -1178,7 +1175,7 @@ module Services =
let overallRps = float totalRecordsDeleted / overall_duration_s

logToConsole
$"In Services.deleteAllFromCosmosDB(): batch duration (s): {duration_s:F3}; batch requests/second: {rps:F3}; failed.Count: {failed.Count}; totalRequestCharge: {float totalRequestCharge / 1000.0:F2}; totalRecordsDeleted: {totalRecordsDeleted}; overall duration (m): {overall_duration_s / 60.0:F3}; overall requests/second: {overallRps:F3}."
$"In Services.deleteAllFromCosmosDBThatMatch(): batch duration (s): {duration_s:F3}; batch requests/second: {rps:F3}; failed.Count: {failed.Count}; totalRequestCharge: {float totalRequestCharge / 1000.0:F2}; totalRecordsDeleted: {totalRecordsDeleted}; overall duration (m): {overall_duration_s / 60.0:F3}; overall requests/second: {overallRps:F3}."

return failed
with ex ->
Expand All @@ -1189,6 +1186,32 @@ module Services =
#endif
}

/// Deletes all documents from CosmosDB.
///
/// **** This method is implemented only in Debug configuration. It is a no-op in Release configuration. ****
let deleteAllFromCosmosDb () =
task {
#if DEBUG
let queryDefinition = QueryDefinition("""SELECT c.id, c.partitionKey FROM c ORDER BY c.partitionKey""")
return! deleteAllFromCosmosDBThatMatch queryDefinition
#else
return List<string>([ "Not implemented" ])
#endif
}

/// Deletes all Reminders from CosmosDB.
///
/// **** This method is implemented only in Debug configuration. It is a no-op in Release configuration. ****
let deleteAllRemindersFromCosmosDb () =
task {
#if DEBUG
let queryDefinition = QueryDefinition("""SELECT c.id, c.partitionKey FROM c WHERE ENDSWITH(c.id, "||Rmd") ORDER BY c.partitionKey""")
return! deleteAllFromCosmosDBThatMatch queryDefinition
#else
return List<string>([ "Not implemented" ])
#endif
}

/// Gets a list of references of a given ReferenceType for a branch.
let getReferencesByType (referenceType: ReferenceType) (branchId: BranchId) (maxCount: int) (correlationId: CorrelationId) =
task {
Expand Down Expand Up @@ -1372,7 +1395,7 @@ module Services =
}

/// Gets the latest reference for a given ReferenceType in a branch.
let getLatestReferenceByType referenceType (branchId: BranchId) =
let getLatestReferenceByType (referenceType: ReferenceType) (branchId: BranchId) =
task {
match actorStateStorageProvider with
| Unknown -> return None
Expand Down Expand Up @@ -1464,7 +1487,6 @@ module Services =
AND event.Event.created.RepositoryId = @repositoryId
AND event.Event.created.Class = @class"""
)
//let queryDefinition = QueryDefinition("""SELECT TOP 1 c["value"] FROM c WHERE c["value"].RepositoryId = @repositoryId AND STARTSWITH(c["value"].Sha256Hash, @sha256Hash, true) AND c["value"].Class = @class""")
.WithParameter("@sha256Hash", sha256Hash)
.WithParameter("@repositoryId", repositoryId)
.WithParameter("@class", nameof (DirectoryVersion))
Expand Down
2 changes: 1 addition & 1 deletion src/Grace.Aspire.AppHost/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ string daprConfigurationPath()

// Add local containers as services for Dapr.
var zipkin = builder.AddContainer("zipkin", "openzipkin/zipkin", "latest");
zipkin.WithEndpoint(hostPort: 9411, name: "zipkin-http", scheme: "http");
zipkin.WithEndpoint(port: 9411, name: "zipkin-http", scheme: "http");

//var prometheus = builder.AddContainer("prometheus", "prom/prometheus", "latest")
// .WithBindMount("../prometheus", "/etc/prometheus", isReadOnly: true);
Expand Down
89 changes: 84 additions & 5 deletions src/Grace.CLI/Command/Branch.CLI.fs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ open System.Security.Cryptography
open System.Threading.Tasks
open System.Text
open System.Text.Json
open Grace.Shared.Parameters

module Branch =
open Grace.Shared.Validation.Common.Input
Expand Down Expand Up @@ -798,6 +799,7 @@ module Branch =
if parseResult |> verbose then printParseResult parseResult

let validateIncomingParameters = CommonValidations parseResult parameters
let repositoryId = RepositoryId.Parse(parameters.RepositoryId)

match validateIncomingParameters with
| Ok _ ->
Expand Down Expand Up @@ -884,7 +886,22 @@ module Branch =
let mutable lastFileUploadInstant = newGraceStatus.LastSuccessfulFileUpload

if newFileVersions.Count() > 0 then
match! uploadFilesToObjectStorage newFileVersions (getCorrelationId parseResult) with
let getUploadMetadataForFilesParameters =
Storage.GetUploadMetadataForFilesParameters(
OwnerId = parameters.OwnerId,
OwnerName = parameters.OwnerName,
OrganizationId = parameters.OrganizationId,
OrganizationName = parameters.OrganizationName,
RepositoryId = parameters.RepositoryId,
RepositoryName = parameters.RepositoryName,
CorrelationId = getCorrelationId parseResult,
FileVersions =
(newFileVersions
|> Seq.map (fun localFileVersion -> localFileVersion.ToFileVersion)
|> Seq.toArray)
)

match! uploadFilesToObjectStorage getUploadMetadataForFilesParameters with
| Ok returnValue -> () //logToAnsiConsole Colors.Verbose $"Uploaded all files to object storage."
| Error error -> logToAnsiConsole Colors.Error $"Error uploading files to object storage: {error.Error}"

Expand Down Expand Up @@ -970,7 +987,22 @@ module Branch =
.First(fun dv -> dv.Files.Exists(fun file -> file.RelativePath = relativePath))
.Files.First(fun file -> file.RelativePath = relativePath))

let! uploadResult = uploadFilesToObjectStorage newFileVersions (getCorrelationId parseResult)
let getUploadMetadataForFilesParameters =
Storage.GetUploadMetadataForFilesParameters(
OwnerId = parameters.OwnerId,
OwnerName = parameters.OwnerName,
OrganizationId = parameters.OrganizationId,
OrganizationName = parameters.OrganizationName,
RepositoryId = parameters.RepositoryId,
RepositoryName = parameters.RepositoryName,
CorrelationId = getCorrelationId parseResult,
FileVersions =
(newFileVersions
|> Seq.map (fun localFileVersion -> localFileVersion.ToFileVersion)
|> Seq.toArray)
)

let! uploadResult = uploadFilesToObjectStorage getUploadMetadataForFilesParameters
let saveParameters = SaveDirectoryVersionsParameters()

saveParameters.DirectoryVersions <- newDirectoryVersions.Select(fun dv -> dv.ToDirectoryVersion).ToList()
Expand Down Expand Up @@ -1926,6 +1958,7 @@ module Branch =
) =
task {
t |> startProgressTask showOutput
let repositoryId = RepositoryId.Parse(parameters.RepositoryId)

if currentBranch.SaveEnabled && newDirectoryVersions.Any() then
let updatedRelativePaths =
Expand Down Expand Up @@ -1954,7 +1987,22 @@ module Branch =
Colors.Verbose
$"Uploading {newFileVersions.Count()} file(s) from {newDirectoryVersions.Count} new directory version(s) to object storage."

match! uploadFilesToObjectStorage newFileVersions (getCorrelationId parseResult) with
let getUploadMetadataForFilesParameters =
Storage.GetUploadMetadataForFilesParameters(
OwnerId = parameters.OwnerId,
OwnerName = parameters.OwnerName,
OrganizationId = parameters.OrganizationId,
OrganizationName = parameters.OrganizationName,
RepositoryId = parameters.RepositoryId,
RepositoryName = parameters.RepositoryName,
CorrelationId = getCorrelationId parseResult,
FileVersions =
(newFileVersions
|> Seq.map (fun localFileVersion -> localFileVersion.ToFileVersion)
|> Seq.toArray)
)

match! uploadFilesToObjectStorage getUploadMetadataForFilesParameters with
| Ok returnValue ->
t |> setProgressTaskValue showOutput 100.0
return Ok(showOutput, parseResult, parameters, currentBranch, newDirectoryVersions)
Expand Down Expand Up @@ -2369,6 +2417,7 @@ module Branch =
let mutable rootDirectoryId = DirectoryVersionId.Empty
let mutable rootDirectorySha256Hash = Sha256Hash String.Empty
let mutable previousDirectoryIds: HashSet<DirectoryVersionId> = null
let repositoryId = RepositoryId.Parse(parameters.RepositoryId)

if parseResult |> hasOutput then
return!
Expand Down Expand Up @@ -2461,7 +2510,22 @@ module Branch =
.First(fun dv -> dv.Files.Exists(fun file -> file.RelativePath = relativePath))
.Files.First(fun file -> file.RelativePath = relativePath))

let! uploadResult = uploadFilesToObjectStorage newFileVersions (getCorrelationId parseResult)
let getUploadMetadataForFilesParameters =
Storage.GetUploadMetadataForFilesParameters(
OwnerId = parameters.OwnerId,
OwnerName = parameters.OwnerName,
OrganizationId = parameters.OrganizationId,
OrganizationName = parameters.OrganizationName,
RepositoryId = parameters.RepositoryId,
RepositoryName = parameters.RepositoryName,
CorrelationId = getCorrelationId parseResult,
FileVersions =
(newFileVersions
|> Seq.map (fun localFileVersion -> localFileVersion.ToFileVersion)
|> Seq.toArray)
)

let! uploadResult = uploadFilesToObjectStorage getUploadMetadataForFilesParameters

t3.Value <- 100.0

Expand Down Expand Up @@ -2688,7 +2752,22 @@ module Branch =
.First(fun dv -> dv.Files.Exists(fun file -> file.RelativePath = relativePath))
.Files.First(fun file -> file.RelativePath = relativePath))

let! uploadResult = uploadFilesToObjectStorage newFileVersions (getCorrelationId parseResult)
let getUploadMetadataForFilesParameters =
Storage.GetUploadMetadataForFilesParameters(
OwnerId = parameters.OwnerId,
OwnerName = parameters.OwnerName,
OrganizationId = parameters.OrganizationId,
OrganizationName = parameters.OrganizationName,
RepositoryId = parameters.RepositoryId,
RepositoryName = parameters.RepositoryName,
CorrelationId = getCorrelationId parseResult,
FileVersions =
(newFileVersions
|> Seq.map (fun localFileVersion -> localFileVersion.ToFileVersion)
|> Seq.toArray)
)

let! uploadResult = uploadFilesToObjectStorage getUploadMetadataForFilesParameters

let saveParameters = SaveDirectoryVersionsParameters()

Expand Down
Loading

0 comments on commit 2e4675e

Please sign in to comment.