Skip to content

Commit

Permalink
More MemoryCache extension work; cleaned up all uses of .CreateActorP…
Browse files Browse the repository at this point in the history
…roxy; enabled CorrelationId's to show up in Actor OnActivateAsync() by using MemoryCache.
  • Loading branch information
ScottArbeit committed Jul 31, 2024
1 parent 83567de commit 51895ed
Show file tree
Hide file tree
Showing 35 changed files with 822 additions and 856 deletions.
79 changes: 79 additions & 0 deletions src/Grace.Actors/ActorProxy.Extensions.Actor.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
namespace Grace.Actors.Extensions

open Dapr.Actors
open Grace.Actors.Extensions.MemoryCache
open Grace.Actors.Constants
open Grace.Actors.Context
open Grace.Actors.Interfaces
open Grace.Shared.Constants
open Grace.Shared.Types
open System

module ActorProxy =

type Dapr.Actors.Client.IActorProxyFactory with

member this.CreateActorProxyWithCorrelationId<'T when 'T :> IActor> (actorId: ActorId, actorType: string, correlationId: CorrelationId) =
let actorProxy = actorProxyFactory.CreateActorProxy<'T>(actorId, actorType)
memoryCache.CreateCorrelationIdEntry actorId correlationId
actorProxy

module Branch =
let GetActorId (branchId: BranchId) = ActorId($"{branchId}")
let CreateActorProxy (branchId: BranchId) correlationId = actorProxyFactory.CreateActorProxyWithCorrelationId<IBranchActor>(GetActorId branchId, ActorName.Branch, correlationId)

module BranchName =
let GetActorId (repositoryId: RepositoryId) (branchName: BranchName) = ActorId($"{branchName}|{repositoryId}")
let CreateActorProxy (repositoryId: RepositoryId) (branchName: BranchName) correlationId = actorProxyFactory.CreateActorProxyWithCorrelationId<IBranchNameActor>(GetActorId repositoryId branchName, ActorName.BranchName, correlationId)

module Diff =
/// Gets an ActorId for a Diff actor.
let GetActorId (directoryId1: DirectoryVersionId) (directoryId2: DirectoryVersionId) =
if directoryId1 < directoryId2 then
ActorId($"{directoryId1}*{directoryId2}")
else
ActorId($"{directoryId2}*{directoryId1}")

let CreateActorProxy (directoryId1: DirectoryVersionId) (directoryId2: DirectoryVersionId) correlationId =
actorProxyFactory.CreateActorProxyWithCorrelationId<IDiffActor>(GetActorId directoryId1 directoryId2, ActorName.Diff, correlationId)

module DirectoryVersion =
let GetActorId (directoryVersionId: DirectoryVersionId) = ActorId($"{directoryVersionId}")
let CreateActorProxy (directoryVersionId: DirectoryVersionId) correlationId =
actorProxyFactory.CreateActorProxyWithCorrelationId<IDirectoryVersionActor>(GetActorId directoryVersionId, ActorName.DirectoryVersion, correlationId)

module Organization =
let GetActorId (organizationId: OrganizationId) = ActorId($"{organizationId}")
let CreateActorProxy (organizationId: OrganizationId) correlationId =
actorProxyFactory.CreateActorProxyWithCorrelationId<IOrganizationActor>(GetActorId organizationId, ActorName.Organization, correlationId)

module OrganizationName =
let GetActorId (ownerId: OwnerId) (organizationName: OrganizationName) = ActorId($"{organizationName}|{ownerId}")
let CreateActorProxy (ownerId: OwnerId) (organizationName: OrganizationName) correlationId =
actorProxyFactory.CreateActorProxyWithCorrelationId<IOrganizationNameActor>(GetActorId ownerId organizationName, ActorName.OrganizationName, correlationId)

module Owner =
let GetActorId (ownerId: OwnerId) = ActorId($"{ownerId}")
let CreateActorProxy (ownerId: OwnerId) correlationId =
actorProxyFactory.CreateActorProxyWithCorrelationId<IOwnerActor>(GetActorId ownerId, ActorName.Owner, correlationId)

module OwnerName =
let GetActorId (ownerName: OwnerName) = ActorId(ownerName)
let CreateActorProxy (ownerName: OwnerName) correlationId =
actorProxyFactory.CreateActorProxyWithCorrelationId<IOwnerNameActor>(GetActorId ownerName, ActorName.OwnerName, correlationId)

module Reference =
let GetActorId (referenceId: ReferenceId) = ActorId($"{referenceId}")
let CreateActorProxy (referenceId: ReferenceId) correlationId =
actorProxyFactory.CreateActorProxyWithCorrelationId<IReferenceActor>(GetActorId referenceId, ActorName.Reference, correlationId)

module Repository =
let GetActorId (repositoryId: RepositoryId) = ActorId($"{repositoryId}")
let CreateActorProxy (repositoryId: RepositoryId) correlationId =
actorProxyFactory.CreateActorProxyWithCorrelationId<IRepositoryActor>(GetActorId repositoryId, ActorName.Repository, correlationId)

module RepositoryName =
let GetActorId (ownerId: OwnerId) (organizationId: OrganizationId) (repositoryName: RepositoryName) = ActorId($"{repositoryName}|{ownerId}|{organizationId}")
let CreateActorProxy (ownerId: OwnerId) (organizationId: OrganizationId) (repositoryName: RepositoryName) correlationId =
actorProxyFactory.CreateActorProxyWithCorrelationId<IRepositoryNameActor>(GetActorId ownerId organizationId repositoryName, ActorName.RepositoryName, correlationId)

48 changes: 25 additions & 23 deletions src/Grace.Actors/Branch.Actor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ open Dapr.Actors.Runtime
open Grace.Actors.Commands
open Grace.Actors.Commands.Branch
open Grace.Actors.Constants
open Grace.Actors.Context
open Grace.Actors.Events.Branch
open Grace.Actors.Extensions.ActorProxy
open Grace.Actors.Extensions.MemoryCache
open Grace.Actors.Interfaces
open Grace.Actors.Services
open Grace.Shared
Expand All @@ -28,8 +31,6 @@ open FSharpPlus.Data.MultiMap

module Branch =

let GetActorId (branchId: BranchId) = ActorId($"{branchId}")

// Branch should support logical deletes with physical deletes set using a Dapr Timer based on a repository-level setting.
// Branch Deletion should enumerate and delete each reference in the branch.

Expand All @@ -50,7 +51,7 @@ module Branch =
/// Indicates that the actor is in an undefined state, and should be reset.
let mutable isDisposed = false

let updateDto branchEvent currentBranchDto =
let updateDto branchEvent correlationId currentBranchDto =
task {
let branchEventType = branchEvent.Event
let! newBranchDto =
Expand All @@ -60,8 +61,8 @@ module Branch =
let! referenceDto =
if basedOn <> ReferenceId.Empty then
task {
let referenceActorProxy = actorProxyFactory.CreateActorProxy<IReferenceActor>(Reference.GetActorId basedOn, ActorName.Reference)
return! referenceActorProxy.Get branchEvent.Metadata.CorrelationId
let referenceActorProxy = Reference.CreateActorProxy basedOn correlationId
return! referenceActorProxy.Get correlationId
}
else
ReferenceDto.Default |> returnTask
Expand Down Expand Up @@ -91,7 +92,7 @@ module Branch =
}
| Rebased referenceId ->
task {
let referenceActorProxy = actorProxyFactory.CreateActorProxy<IReferenceActor>(Reference.GetActorId referenceId, ActorName.Reference)
let referenceActorProxy = Reference.CreateActorProxy referenceId correlationId
let! referenceDto = referenceActorProxy.Get branchEvent.Metadata.CorrelationId
return { currentBranchDto with BasedOn = referenceDto }
}
Expand Down Expand Up @@ -132,14 +133,19 @@ module Branch =
let mutable message = String.Empty
let! retrievedEvents = Storage.RetrieveState<List<BranchEvent>> stateManager eventsStateName

let correlationId =
match memoryCache.GetCorrelationIdEntry this.Id with
| Some correlationId -> correlationId
| None -> String.Empty

match retrievedEvents with
| Some retrievedEvents ->
// Load the branchEvents from the retrieved events.
branchEvents.AddRange(retrievedEvents)

// Apply all events to the branchDto.
for branchEvent in retrievedEvents do
let! updatedBranchDto = branchDto |> updateDto branchEvent
let! updatedBranchDto = branchDto |> updateDto branchEvent correlationId
branchDto <- updatedBranchDto

// Get the latest references and update the dto.
Expand All @@ -159,8 +165,8 @@ module Branch =
match basedOnLink with
| ReferenceLinkType.BasedOn referenceId -> referenceId

let basedOnReferenceActorProxy = actorProxyFactory.CreateActorProxy<IReferenceActor>(Reference.GetActorId basedOnReferenceId, ActorName.Reference)
let! basedOnReferenceDto = basedOnReferenceActorProxy.Get $"OnActivateAsync-{generateCorrelationId()}"
let basedOnReferenceActorProxy = Reference.CreateActorProxy basedOnReferenceId correlationId
let! basedOnReferenceDto = basedOnReferenceActorProxy.Get correlationId

branchDto <- { branchDto with BasedOn = basedOnReferenceDto }
| External -> ()
Expand All @@ -172,10 +178,11 @@ module Branch =
let duration_ms = getPaddedDuration_ms activateStartTime

log.LogInformation(
"{currentInstant}: Node: {hostName}; Duration: {duration_ms}ms; CorrelationId: ; Activated {ActorType} {ActorId}. BranchName: {BranchName}; {message}.",
"{currentInstant}: Node: {hostName}; Duration: {duration_ms}ms; CorrelationId: {correlationId}; Activated {ActorType} {ActorId}. BranchName: {BranchName}; {message}.",
getCurrentInstantExtended (),
getMachineName,
duration_ms,
correlationId,
actorName,
host.Id,
branchDto.BranchName,
Expand Down Expand Up @@ -262,7 +269,7 @@ module Branch =
if branchEvents.Count = 0 then do! this.OnFirstWrite()

// Update the branchDto with the event.
let! updatedBranchDto = branchDto |> updateDto branchEvent
let! updatedBranchDto = branchDto |> updateDto branchEvent this.correlationId
branchDto <- updatedBranchDto

match branchEvent.Event with
Expand Down Expand Up @@ -371,9 +378,7 @@ module Branch =
let addReference repositoryId branchId directoryId sha256Hash referenceText referenceType links =
task {
let referenceId: ReferenceId = ReferenceId.NewGuid()
let actorId = Reference.GetActorId referenceId

let referenceActor = actorProxyFactory.CreateActorProxy<IReferenceActor>(actorId, Constants.ActorName.Reference)
let referenceActor = Reference.CreateActorProxy referenceId this.correlationId

let referenceDto =
{ReferenceDto.Default with
Expand Down Expand Up @@ -407,8 +412,8 @@ module Branch =
// Add an initial Rebase reference to this branch that points to the BasedOn reference, unless we're creating `main`.
if branchName <> InitialBranchName then
// We need to get the reference that we're rebasing on, so we can get the directoryId and sha256Hash.
let referenceActorProxy = actorProxyFactory.CreateActorProxy<IReferenceActor>(Reference.GetActorId basedOn, ActorName.Reference)
let! promotionDto = referenceActorProxy.Get metadata.CorrelationId
let referenceActorProxy = Reference.CreateActorProxy basedOn this.correlationId
let! promotionDto = referenceActorProxy.Get this.correlationId

match! addReference repositoryId branchId promotionDto.DirectoryId promotionDto.Sha256Hash promotionDto.ReferenceText ReferenceType.Rebase [| ReferenceLinkType.BasedOn promotionDto.ReferenceId |] with
| Ok rebaseReferenceId ->
Expand All @@ -432,7 +437,7 @@ module Branch =
metadata.Properties[nameof (BranchName)] <- $"{branchDto.BranchName}"

// We need to get the reference that we're rebasing on, so we can get the directoryId and sha256Hash.
let referenceActorProxy = actorProxyFactory.CreateActorProxy<IReferenceActor>(ActorId($"{referenceId}"), ActorName.Reference)
let referenceActorProxy = Reference.CreateActorProxy referenceId this.correlationId
let! promotionDto = referenceActorProxy.Get metadata.CorrelationId

// Add the Rebase reference to this branch.
Expand Down Expand Up @@ -482,9 +487,8 @@ module Branch =
| Error error -> return Error error
| RemoveReference referenceId -> return Ok (ReferenceRemoved referenceId)
| DeleteLogical(force, deleteReason) ->
let repositoryActorProxy =
actorProxyFactory.CreateActorProxy<IRepositoryActor>(ActorId($"{branchDto.RepositoryId}"), ActorName.Repository)
let! repositoryDto = repositoryActorProxy.Get(metadata.CorrelationId)
let repositoryActorProxy = Repository.CreateActorProxy branchDto.RepositoryId metadata.CorrelationId
let! repositoryDto = repositoryActorProxy.Get (metadata.CorrelationId)
this.SchedulePhysicalDeletion(deleteReason, TimeSpan.FromDays(repositoryDto.LogicalDeleteDays), metadata.CorrelationId)
return Ok (LogicalDeleted(force, deleteReason))
| DeletePhysical ->
Expand Down Expand Up @@ -516,9 +520,7 @@ module Branch =
member this.GetParentBranch correlationId =
task {
this.correlationId <- correlationId
let actorId = ActorId($"{branchDto.ParentBranchId}")

let branchActorProxy = this.Host.ProxyFactory.CreateActorProxy<IBranchActor>(actorId, ActorName.Branch)
let branchActorProxy = Branch.CreateActorProxy branchDto.ParentBranchId correlationId

return! branchActorProxy.Get correlationId
}
Expand Down
10 changes: 9 additions & 1 deletion src/Grace.Actors/BranchName.Actor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ namespace Grace.Actors
open Dapr.Actors
open Dapr.Actors.Runtime
open Grace.Actors.Constants
open Grace.Actors.Context
open Grace.Actors.Extensions.MemoryCache
open Grace.Actors.Interfaces
open Grace.Actors.Services
open Grace.Shared.Types
Expand Down Expand Up @@ -33,10 +35,16 @@ module BranchName =
member val private correlationId: CorrelationId = String.Empty with get, set

override this.OnActivateAsync() =
let correlationId =
match memoryCache.GetCorrelationIdEntry this.Id with
| Some correlationId -> correlationId
| None -> String.Empty

log.LogInformation(
"{CurrentInstant}: Node: {hostName}; Duration: 0.100ms; CorrelationId: ; Activated {ActorType} {ActorId}.",
"{CurrentInstant}: Node: {hostName}; Duration: 0.100ms; CorrelationId: {correlationId}; Activated {ActorType} {ActorId}.",
getCurrentInstantExtended (),
getMachineName,
correlationId,
this.GetType().Name,
host.Id
)
Expand Down
52 changes: 52 additions & 0 deletions src/Grace.Actors/Context.Actor.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
namespace Grace.Actors

open Dapr.Actors.Client
open Grace.Shared.Types
open Microsoft.Azure.Cosmos
open Microsoft.Extensions.Caching.Memory
open Microsoft.Extensions.Logging
open System

module Context =

/// Actor proxy factory instance
let mutable internal actorProxyFactory: IActorProxyFactory = null

/// Setter for actor proxy factory
let setActorProxyFactory proxyFactory = actorProxyFactory <- proxyFactory

/// Actor state storage provider instance
let mutable internal actorStateStorageProvider: ActorStateStorageProvider = ActorStateStorageProvider.Unknown

/// Setter for actor state storage provider
let setActorStateStorageProvider storageProvider = actorStateStorageProvider <- storageProvider

/// Cosmos client instance
let mutable internal cosmosClient: CosmosClient = null

/// Setter for Cosmos client
let setCosmosClient (client: CosmosClient) = cosmosClient <- client

/// Cosmos container instance
let mutable internal cosmosContainer: Container = null

/// Setter for Cosmos container
let setCosmosContainer (container: Container) = cosmosContainer <- container

/// Host services collection
let mutable internal hostServiceProvider: IServiceProvider = null

/// Setter for services collection
let setHostServiceProvider (hostServices: IServiceProvider) = hostServiceProvider <- hostServices

/// Logger factory instance
let mutable internal loggerFactory : ILoggerFactory = null //hostServiceProvider.GetService(typeof<ILoggerFactory>) :?> ILoggerFactory

/// Setter for logger factory
let setLoggerFactory (factory: ILoggerFactory) = loggerFactory <- factory

/// Grace Server's universal .NET memory cache
let mutable internal memoryCache: IMemoryCache = null

/// Setter for memory cache
let setMemoryCache (cache: IMemoryCache) = memoryCache <- cache
Loading

0 comments on commit 51895ed

Please sign in to comment.