Skip to content

Commit

Permalink
Accept reason on commands
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Feb 12, 2025
1 parent 907c9a1 commit 2563d45
Show file tree
Hide file tree
Showing 28 changed files with 473 additions and 352 deletions.
4 changes: 3 additions & 1 deletion core/src/core_tests/child_workflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async fn parent_cancels_child_wf(ctx: WfContext) -> WorkflowResult<()> {
.await
.into_started()
.expect("Child should get started");
start_res.cancel(&ctx);
start_res.cancel(&ctx, "cancel reason".to_string());
let stat = start_res
.result()
.await
Expand Down Expand Up @@ -157,6 +157,7 @@ async fn cancel_child_workflow_lang_thinks_not_started_but_is(
act.run_id,
CancelChildWorkflowExecution {
child_workflow_seq: 1,
reason: "dieee".to_string(),
}
.into(),
))
Expand Down Expand Up @@ -215,6 +216,7 @@ async fn cancel_already_complete_child_ignored() {
vec![
CancelChildWorkflowExecution {
child_workflow_seq: 1,
reason: "go away!".to_string(),
}
.into(),
CompleteWorkflowExecution { result: None }.into(),
Expand Down
84 changes: 41 additions & 43 deletions core/src/worker/workflow/machines/activity_state_machine.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#![allow(clippy::large_enum_variant)]

use super::{
workflow_machines::MachineResponse, Cancellable, EventInfo, NewMachineWithCommand,
OnEventWrapper, WFMachinesAdapter, WFMachinesError,
workflow_machines::MachineResponse, EventInfo, NewMachineWithCommand, OnEventWrapper,
WFMachinesAdapter, WFMachinesError,
};
use crate::{
abstractions::dbg_panic,
Expand Down Expand Up @@ -164,6 +164,45 @@ impl ActivityMachine {
is_local: false,
}
}

pub(super) fn cancel(&mut self) -> Result<Vec<MachineResponse>, MachineError<WFMachinesError>> {
if matches!(
self.state(),
ActivityMachineState::Completed(_)
| ActivityMachineState::Canceled(_)
| ActivityMachineState::Failed(_)
| ActivityMachineState::TimedOut(_)
) {
// Ignore attempted cancels in terminal states
debug!(
"Attempted to cancel already resolved activity (seq {})",
self.shared_state.attrs.seq
);
return Ok(vec![]);
}
let event = match self.shared_state.cancellation_type {
ActivityCancellationType::Abandon => ActivityMachineEvents::Abandon,
_ => ActivityMachineEvents::Cancel,
};
let vec = OnEventWrapper::on_event_mut(self, event)?;
let res = vec
.into_iter()
.flat_map(|amc| match amc {
ActivityMachineCommand::RequestCancellation(cmd) => {
self.machine_responses_from_cancel_request(cmd)
}
ActivityMachineCommand::Cancel(details) => {
vec![self.create_cancelation_resolve(details).into()]
}
x => panic!("Invalid cancel event response {x:?}"),
})
.collect();
Ok(res)
}

pub(super) fn was_cancelled_before_sent_to_server(&self) -> bool {
self.shared_state().cancelled_before_sent
}
}

impl TryFrom<HistEventData> for ActivityMachineEvents {
Expand Down Expand Up @@ -299,47 +338,6 @@ impl TryFrom<CommandType> for ActivityMachineEvents {
}
}

impl Cancellable for ActivityMachine {
fn cancel(&mut self) -> Result<Vec<MachineResponse>, MachineError<Self::Error>> {
if matches!(
self.state(),
ActivityMachineState::Completed(_)
| ActivityMachineState::Canceled(_)
| ActivityMachineState::Failed(_)
| ActivityMachineState::TimedOut(_)
) {
// Ignore attempted cancels in terminal states
debug!(
"Attempted to cancel already resolved activity (seq {})",
self.shared_state.attrs.seq
);
return Ok(vec![]);
}
let event = match self.shared_state.cancellation_type {
ActivityCancellationType::Abandon => ActivityMachineEvents::Abandon,
_ => ActivityMachineEvents::Cancel,
};
let vec = OnEventWrapper::on_event_mut(self, event)?;
let res = vec
.into_iter()
.flat_map(|amc| match amc {
ActivityMachineCommand::RequestCancellation(cmd) => {
self.machine_responses_from_cancel_request(cmd)
}
ActivityMachineCommand::Cancel(details) => {
vec![self.create_cancelation_resolve(details).into()]
}
x => panic!("Invalid cancel event response {x:?}"),
})
.collect();
Ok(res)
}

fn was_cancelled_before_sent_to_server(&self) -> bool {
self.shared_state().cancelled_before_sent
}
}

#[derive(Clone)]
pub(super) struct SharedState {
scheduled_event_id: i64,
Expand Down
19 changes: 10 additions & 9 deletions core/src/worker/workflow/machines/cancel_external_state_machine.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
workflow_machines::MachineResponse, Cancellable, EventInfo, NewMachineWithCommand,
OnEventWrapper, WFMachinesAdapter, WFMachinesError,
workflow_machines::MachineResponse, EventInfo, NewMachineWithCommand, OnEventWrapper,
WFMachinesAdapter, WFMachinesError,
};
use crate::worker::workflow::machines::HistEventData;
use rustfsm::{fsm, StateMachine, TransitionResult};
Expand Down Expand Up @@ -208,8 +208,6 @@ impl WFMachinesAdapter for CancelExternalMachine {
}
}

impl Cancellable for CancelExternalMachine {}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -222,11 +220,14 @@ mod tests {

async fn cancel_sender(ctx: WfContext) -> WorkflowResult<()> {
let res = ctx
.cancel_external(NamespacedWorkflowExecution {
namespace: "some_namespace".to_string(),
workflow_id: "fake_wid".to_string(),
run_id: "fake_rid".to_string(),
})
.cancel_external(
NamespacedWorkflowExecution {
namespace: "some_namespace".to_string(),
workflow_id: "fake_wid".to_string(),
run_id: "fake_rid".to_string(),
},
"cancel reason".to_string(),
)
.await;
if res.is_err() {
Err(anyhow::anyhow!("Cancel fail!"))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
workflow_machines::MachineResponse, Cancellable, EventInfo, NewMachineWithCommand,
OnEventWrapper, WFMachinesAdapter, WFMachinesError,
workflow_machines::MachineResponse, EventInfo, NewMachineWithCommand, OnEventWrapper,
WFMachinesAdapter, WFMachinesError,
};
use crate::worker::workflow::machines::HistEventData;
use rustfsm::{fsm, StateMachine, TransitionResult};
Expand Down Expand Up @@ -97,8 +97,6 @@ impl WFMachinesAdapter for CancelWorkflowMachine {
}
}

impl Cancellable for CancelWorkflowMachine {}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
80 changes: 42 additions & 38 deletions core/src/worker/workflow/machines/child_workflow_state_machine.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
workflow_machines::MachineResponse, Cancellable, EventInfo, NewMachineWithCommand,
OnEventWrapper, WFMachinesAdapter, WFMachinesError,
workflow_machines::MachineResponse, EventInfo, NewMachineWithCommand, OnEventWrapper,
WFMachinesAdapter, WFMachinesError,
};
use crate::{
internal_flags::CoreInternalFlags,
Expand Down Expand Up @@ -54,7 +54,7 @@ fsm! {
StartCommandCreated --(CommandStartChildWorkflowExecution) --> StartCommandCreated;
StartCommandCreated --(StartChildWorkflowExecutionInitiated(ChildWorkflowInitiatedData),
shared on_start_child_workflow_execution_initiated) --> StartEventRecorded;
StartCommandCreated --(Cancel, shared on_cancelled) --> Cancelled;
StartCommandCreated --(Cancel(String), shared on_cancelled) --> Cancelled;

StartEventRecorded --(ChildWorkflowExecutionStarted(ChildWorkflowExecutionStartedEvent),
shared on_child_workflow_execution_started) --> Started;
Expand All @@ -74,13 +74,13 @@ fsm! {
// If cancelled after started, we need to issue a cancel external workflow command, and then
// the child workflow will resolve somehow, so we want to go back to started and wait for that
// resolution.
Started --(Cancel, shared on_cancelled) --> Started;
Started --(Cancel(String), shared on_cancelled) --> Started;
// Abandon & try cancel modes may immediately move to cancelled
Started --(Cancel, shared on_cancelled) --> Cancelled;
Started --(Cancel(String), shared on_cancelled) --> Cancelled;
Started --(CommandRequestCancelExternalWorkflowExecution) --> Started;

// Ignore any spurious cancellations after resolution
Cancelled --(Cancel) --> Cancelled;
Cancelled --(Cancel(String)) --> Cancelled;
Cancelled --(ChildWorkflowExecutionCancelled,
on_child_workflow_execution_cancelled) --> Cancelled;
// Completions of any kind after cancellation are acceptable for abandoned children
Expand All @@ -92,10 +92,10 @@ fsm! {
shared on_child_workflow_execution_timed_out) --> Cancelled;
Cancelled --(ChildWorkflowExecutionTerminated,
shared on_child_workflow_execution_terminated) --> Cancelled;
Failed --(Cancel) --> Failed;
StartFailed --(Cancel) --> StartFailed;
TimedOut --(Cancel) --> TimedOut;
Completed --(Cancel) --> Completed;
Failed --(Cancel(String)) --> Failed;
StartFailed --(Cancel(String)) --> StartFailed;
TimedOut --(Cancel(String)) --> TimedOut;
Completed --(Cancel(String)) --> Completed;
}

pub(super) struct ChildWorkflowExecutionStartedEvent {
Expand Down Expand Up @@ -255,10 +255,14 @@ impl StartCommandCreated {
pub(super) fn on_cancelled(
self,
state: &mut SharedState,
reason: String,
) -> ChildWorkflowMachineTransition<Cancelled> {
state.cancelled_before_sent = true;
ChildWorkflowMachineTransition::commands(vec![ChildWorkflowCommand::StartCancel(Failure {
message: "Child Workflow execution cancelled before scheduled".to_owned(),
message: format!(
"Child Workflow Execution cancelled before scheduled: {}",
reason
),
cause: Some(Box::new(Failure {
failure_info: Some(FailureInfo::CanceledFailureInfo(
failure::CanceledFailureInfo {
Expand Down Expand Up @@ -385,6 +389,7 @@ impl Started {
fn on_cancelled(
self,
state: &mut SharedState,
reason: String,
) -> ChildWorkflowMachineTransition<StartedOrCancelled> {
let dest = match state.cancel_type {
ChildWorkflowCancellationType::Abandon | ChildWorkflowCancellationType::TryCancel => {
Expand All @@ -393,9 +398,7 @@ impl Started {
_ => StartedOrCancelled::Started(Default::default()),
};
TransitionResult::ok(
[ChildWorkflowCommand::IssueCancelAfterStarted {
reason: "Parent workflow requested cancel".to_string(),
}],
[ChildWorkflowCommand::IssueCancelAfterStarted { reason }],
dest,
)
}
Expand Down Expand Up @@ -483,6 +486,30 @@ impl ChildWorkflowMachine {
}),
}
}

pub(crate) fn cancel(
&mut self,
reason: String,
) -> Result<Vec<MachineResponse>, MachineError<WFMachinesError>> {
let event = ChildWorkflowMachineEvents::Cancel(reason);
let vec = OnEventWrapper::on_event_mut(self, event)?;
let res = vec
.into_iter()
.map(|mc| match mc {
c @ ChildWorkflowCommand::StartCancel(_)
| c @ ChildWorkflowCommand::IssueCancelAfterStarted { .. } => {
self.adapt_response(c, None)
}
x => panic!("Invalid cancel event response {x:?}"),
})
.flatten_ok()
.try_collect()?;
Ok(res)
}

pub(crate) fn was_cancelled_before_sent_to_server(&self) -> bool {
self.shared_state.cancelled_before_sent
}
}

impl TryFrom<HistEventData> for ChildWorkflowMachineEvents {
Expand Down Expand Up @@ -713,29 +740,6 @@ impl TryFrom<CommandType> for ChildWorkflowMachineEvents {
}
}

impl Cancellable for ChildWorkflowMachine {
fn cancel(&mut self) -> Result<Vec<MachineResponse>, MachineError<Self::Error>> {
let event = ChildWorkflowMachineEvents::Cancel;
let vec = OnEventWrapper::on_event_mut(self, event)?;
let res = vec
.into_iter()
.map(|mc| match mc {
c @ ChildWorkflowCommand::StartCancel(_)
| c @ ChildWorkflowCommand::IssueCancelAfterStarted { .. } => {
self.adapt_response(c, None)
}
x => panic!("Invalid cancel event response {x:?}"),
})
.flatten_ok()
.try_collect()?;
Ok(res)
}

fn was_cancelled_before_sent_to_server(&self) -> bool {
self.shared_state.cancelled_before_sent
}
}

fn failure_info_from_state(state: &SharedState, retry_state: RetryState) -> Option<FailureInfo> {
Some(FailureInfo::ChildWorkflowExecutionFailureInfo(
failure::ChildWorkflowExecutionFailureInfo {
Expand Down Expand Up @@ -988,7 +992,7 @@ mod test {
internal_flags: Rc::new(RefCell::new(InternalFlags::default())),
},
);
let cmds = s.cancel().unwrap();
let cmds = s.cancel("cancel reason".to_string()).unwrap();
assert_eq!(cmds.len(), 0);
assert_eq!(discriminant(&state), discriminant(s.state()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
workflow_machines::MachineResponse, Cancellable, EventInfo, NewMachineWithCommand,
OnEventWrapper, WFMachinesAdapter, WFMachinesError,
workflow_machines::MachineResponse, EventInfo, NewMachineWithCommand, OnEventWrapper,
WFMachinesAdapter, WFMachinesError,
};
use crate::worker::workflow::machines::HistEventData;
use rustfsm::{fsm, StateMachine, TransitionResult};
Expand Down Expand Up @@ -111,5 +111,3 @@ impl WFMachinesAdapter for CompleteWorkflowMachine {
Ok(vec![])
}
}

impl Cancellable for CompleteWorkflowMachine {}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
Cancellable, EventInfo, MachineResponse, NewMachineWithCommand, OnEventWrapper,
WFMachinesAdapter, WFMachinesError,
EventInfo, MachineResponse, NewMachineWithCommand, OnEventWrapper, WFMachinesAdapter,
WFMachinesError,
};
use crate::worker::workflow::machines::HistEventData;
use rustfsm::{fsm, StateMachine, TransitionResult};
Expand Down Expand Up @@ -105,8 +105,6 @@ impl WFMachinesAdapter for ContinueAsNewWorkflowMachine {
}
}

impl Cancellable for ContinueAsNewWorkflowMachine {}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
workflow_machines::MachineResponse, Cancellable, EventInfo, NewMachineWithCommand,
OnEventWrapper, WFMachinesAdapter, WFMachinesError,
workflow_machines::MachineResponse, EventInfo, NewMachineWithCommand, OnEventWrapper,
WFMachinesAdapter, WFMachinesError,
};
use crate::worker::workflow::machines::HistEventData;
use rustfsm::{fsm, StateMachine, TransitionResult};
Expand Down Expand Up @@ -113,5 +113,3 @@ impl WFMachinesAdapter for FailWorkflowMachine {
Ok(vec![])
}
}

impl Cancellable for FailWorkflowMachine {}
Loading

0 comments on commit 2563d45

Please sign in to comment.