Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flytepropeller][flyteadmin] Streaming Decks V2 #6053

Merged
merged 34 commits into from
Jan 24, 2025
Merged
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
54aa165
add tests from Yi-Cheng
Future-Outlier Nov 27, 2024
9ed6b6e
helped by Kevin and Yi-Cheng
Future-Outlier Nov 27, 2024
4b4f6bd
lint
Future-Outlier Nov 27, 2024
dd774cb
nit
Future-Outlier Nov 28, 2024
0bb8e91
add comments
Future-Outlier Dec 13, 2024
25fea29
add comments and better solution for backward compativle
Future-Outlier Dec 17, 2024
4e24e91
better comments
Future-Outlier Dec 17, 2024
8d1d0e4
DeckStatus
Future-Outlier Dec 18, 2024
31853bb
rename GetDeckStatus
Future-Outlier Dec 18, 2024
4068043
comments
Future-Outlier Dec 18, 2024
65b6efe
lint
Future-Outlier Jan 2, 2025
137579f
fix
Future-Outlier Jan 9, 2025
04f7fbc
Merge branch 'master' into streaming-deck-v2
Future-Outlier Jan 9, 2025
aa56d64
Merge branch 'master' into streaming-deck-v2
Future-Outlier Jan 13, 2025
a16851f
use BoolValue as IDL, suggested by Eduardo
Future-Outlier Jan 13, 2025
7314455
change commennts
Future-Outlier Jan 13, 2025
19498f5
update
Future-Outlier Jan 13, 2025
74f595f
fix
Future-Outlier Jan 13, 2025
3bd3336
fix
Future-Outlier Jan 14, 2025
f6d8493
Merge branch 'master' into streaming-deck-v2
Future-Outlier Jan 14, 2025
4b56e52
fix
Future-Outlier Jan 14, 2025
db4b19e
remove unused ogic
Future-Outlier Jan 14, 2025
2737251
Update flyteidl/protos/flyteidl/core/tasks.proto
Future-Outlier Jan 16, 2025
564dc5f
Update flyteidl/protos/flyteidl/core/tasks.proto
Future-Outlier Jan 16, 2025
69ba94e
Merge remote-tracking branch 'origin' into streaming-deck-v2
eapolinario Jan 16, 2025
c992eae
Merge branch 'master' into streaming-deck-v2
Future-Outlier Jan 17, 2025
0b91b5c
Update by Kevin's advice
Future-Outlier Jan 17, 2025
1d18265
update
Future-Outlier Jan 17, 2025
96500c1
update
Future-Outlier Jan 22, 2025
dd9dbaa
Merge branch 'master' into streaming-deck-v2
Future-Outlier Jan 23, 2025
f51ff8c
RemoveDeckURIIfDeckNotExists
Future-Outlier Jan 23, 2025
bd5e682
update
Future-Outlier Jan 23, 2025
561a43c
nit suggestion by Eduardo
Future-Outlier Jan 24, 2025
a33ba09
update
Future-Outlier Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add comments and better solution for backward compativle
Signed-off-by: Future-Outlier <[email protected]>
Future-Outlier committed Dec 17, 2024
commit 25fea2956a0108c9b1937c6a4f5bb7c1210898e1
59 changes: 52 additions & 7 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
"context"
"fmt"
"runtime/debug"
"strings"
"time"

regErrors "github.com/pkg/errors"
@@ -40,6 +41,7 @@
)

const pluginContextKey = contextutils.Key("plugin")
const FLYTE_ENABLE_DECK = string("FLYTE_ENABLE_DECK")

type metrics struct {
pluginPanics labeled.Counter
@@ -71,43 +73,47 @@
return taskType + "_" + pluginID
}

func (p *pluginRequestedTransition) AddDeckURI(tCtx *taskExecutionContext) {
var deckURI *storage.DataReference
deckURIValue := tCtx.ow.GetDeckPath()
deckURI = &deckURIValue

if p.execInfo.OutputInfo == nil {
p.execInfo.OutputInfo = &handler.OutputInfo{}
}

Check warning on line 83 in flytepropeller/pkg/controller/nodes/task/handler.go

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L76-L83

Added lines #L76 - L83 were not covered by tests

p.execInfo.OutputInfo.DeckURI = deckURI

Check warning on line 85 in flytepropeller/pkg/controller/nodes/task/handler.go

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L85

Added line #L85 was not covered by tests
}

// RemoveNonexistentDeckURI removes the deck URI from the plugin execution info if the URI does not exist in remote storage.
func (p *pluginRequestedTransition) RemoveNonexistentDeckURI(ctx context.Context, tCtx *taskExecutionContext) error {
func (p *pluginRequestedTransition) AddDeckURIIfDeckExists(ctx context.Context, tCtx *taskExecutionContext) error {
reader := tCtx.ow.GetReader()
if reader == nil && p.execInfo.OutputInfo != nil {
p.execInfo.OutputInfo.DeckURI = nil
return nil
}

Check warning on line 93 in flytepropeller/pkg/controller/nodes/task/handler.go

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L91-L93

Added lines #L91 - L93 were not covered by tests

exists, err := reader.DeckExists(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
return regErrors.Wrapf(err, "failed to check existence of deck file")
}

Check warning on line 99 in flytepropeller/pkg/controller/nodes/task/handler.go

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L97-L99

Added lines #L97 - L99 were not covered by tests

if !exists && p.execInfo.OutputInfo != nil {
p.execInfo.OutputInfo.DeckURI = nil
if p.execInfo.OutputInfo == nil {
p.execInfo.OutputInfo = &handler.OutputInfo{}
}

if exists {
deckURIValue := tCtx.ow.GetDeckPath()
p.execInfo.OutputInfo.DeckURI = &deckURIValue
}

Check warning on line 108 in flytepropeller/pkg/controller/nodes/task/handler.go

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L106-L108

Added lines #L106 - L108 were not covered by tests

return nil
}

func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, entry catalog.Entry) {

Check warning on line 113 in flytepropeller/pkg/controller/nodes/task/handler.go

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L113

Added line #L113 was not covered by tests
p.ttype = handler.TransitionTypeEphemeral
p.pInfo = pluginCore.PhaseInfoSuccess(nil)
p.ObserveSuccess(outputPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()})

Check warning on line 116 in flytepropeller/pkg/controller/nodes/task/handler.go

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L116

Added line #L116 was not covered by tests
}

func (p *pluginRequestedTransition) PopulateCacheInfo(entry catalog.Entry) {
@@ -179,9 +185,9 @@

func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
if p.execInfo.OutputInfo == nil {
p.execInfo.OutputInfo = &handler.OutputInfo{
OutputURI: outputPath,
}

Check warning on line 190 in flytepropeller/pkg/controller/nodes/task/handler.go

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L188-L190

Added lines #L188 - L190 were not covered by tests
} else {
p.execInfo.OutputInfo.OutputURI = outputPath
}
@@ -417,6 +423,21 @@
return t.taskMetricsMap[metricNameKey], nil
}

func IsDeckEnabled(ctx context.Context, tCtx *taskExecutionContext) (bool, error) {
template, err := tCtx.tr.Read(ctx)
if err != nil {
return false, regErrors.Wrapf(err, "failed to read task template")
}

Check warning on line 430 in flytepropeller/pkg/controller/nodes/task/handler.go

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L429-L430

Added lines #L429 - L430 were not covered by tests

templateConfig := template.GetConfig()
if templateConfig == nil {
return false, nil
}

deckEnabled := strings.ToLower(templateConfig[FLYTE_ENABLE_DECK])
return deckEnabled == "1" || deckEnabled == "t" || deckEnabled == "true", nil

Check warning on line 438 in flytepropeller/pkg/controller/nodes/task/handler.go

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L437-L438

Added lines #L437 - L438 were not covered by tests
}

func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *taskExecutionContext, ts handler.TaskNodeState) (*pluginRequestedTransition, error) {
pluginTrns := &pluginRequestedTransition{}

@@ -505,15 +526,37 @@
// The deck should be accessible even if the task is still running or has failed.
// It's possible that the deck URI may not exist in remote storage yet or will never exist.
// So, it is console's responsibility to handle the case when the deck URI actually does not exist.
pluginTrns.AddDeckURI(tCtx)
deckEnabled, err := IsDeckEnabled(ctx, tCtx)
if err != nil {
return nil, err
}

Check warning on line 532 in flytepropeller/pkg/controller/nodes/task/handler.go

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L531-L532

Added lines #L531 - L532 were not covered by tests
if deckEnabled {
pluginTrns.AddDeckURI(tCtx)
}

Check warning on line 535 in flytepropeller/pkg/controller/nodes/task/handler.go

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L534-L535

Added lines #L534 - L535 were not covered by tests

// Handle backward compatibility for Flyte deck display behavior.
//
// Before (legacy behavior):
// - Deck URI was only shown if the deck file existed in the terminal state.
// - We relied on a HEAD request to check if the deck file exists, then added the URI to the event.
//
// After (new behavior):
// - If `FLYTE_ENABLE_DECK = true` is set in the task template config (requires Flytekit > 1.14.0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is no longer correct right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes super nice catch

// we display the deck URI from the beginning rather than waiting until the terminal state.
//
// For backward compatibility with older Flytekit versions (which don't support `FLYTE_ENABLE_DECK`),
// we still need to check deck file existence in the terminal state. This ensures that when the deck
// isn't enabled via config or doesn't exist, we only show the URI in terminal states if the deck file
// is actually present.
switch pluginTrns.pInfo.Phase() {
case pluginCore.PhaseSuccess:
// This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseSuccess).
err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx)
if !deckEnabled {
err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we always check if the file exists in the terminal state? if flytekit fails to generate a deck for some reasons, we should not add deck_uri to the output info, right

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the concerns with having a deck_uri set in the event? flyteconsole will still make the call to ensure that the file exists before showing the final deck, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @eapolinario

Yes, but FlyteConsole currently needs to make an additional call to check if the task is in a terminal phase.

I think it's better to handle all the logic in Propeller, as this would make maintenance easier. It would also simplify FlyteConsole's implementation.

In summary:
We should keep as much backend logic in the backend as possible. This approach reduces the maintenance burden on FlyteConsole and improves the readability of the backend code.

}
if err != nil {
return pluginTrns, err
}

Check warning on line 559 in flytepropeller/pkg/controller/nodes/task/handler.go

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L558-L559

Added lines #L558 - L559 were not covered by tests
// -------------------------------------
// TODO: @kumare create Issue# Remove the code after we use closures to handle dynamic nodes
// This code only exists to support Dynamic tasks. Eventually dynamic tasks will use closure nodes to execute
@@ -557,12 +600,14 @@
case pluginCore.PhaseRetryableFailure:
fallthrough
case pluginCore.PhasePermanentFailure:
// This is to prevent the console from potentially checking the deck URI that does not exist if in final
// phase(PhaseFailure).
err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx)
if !deckEnabled {
err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx)
}
if err != nil {
return pluginTrns, err
}

Check warning on line 610 in flytepropeller/pkg/controller/nodes/task/handler.go

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L603-L610

Added lines #L603 - L610 were not covered by tests
pluginTrns.ObservedFailure(
&event.TaskNodeMetadata{
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),