Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parwinder/bcda-8627-archive cleanup job migration #1042

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions bcda/bcdacli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func setUpApp() *cli.App {
},
Action: func(c *cli.Context) error {
cutoff := time.Now().Add(-time.Hour * time.Duration(thresholdHr))
return archiveExpiring(cutoff)
return ArchiveExpiring(cutoff)
bhagatparwinder marked this conversation as resolved.
Show resolved Hide resolved
},
},
{
Expand All @@ -317,7 +317,7 @@ func setUpApp() *cli.App {
},
Action: func(c *cli.Context) error {
cutoff := time.Now().Add(-time.Hour * time.Duration(thresholdHr))
return cleanupJob(cutoff, models.JobStatusArchived, models.JobStatusExpired,
return CleanupJob(cutoff, models.JobStatusArchived, models.JobStatusExpired,
conf.GetEnv("FHIR_ARCHIVE_DIR"), conf.GetEnv("FHIR_STAGING_DIR"))
},
},
Expand All @@ -334,7 +334,7 @@ func setUpApp() *cli.App {
},
Action: func(c *cli.Context) error {
cutoff := time.Now().Add(-(time.Hour * time.Duration(thresholdHr)))
return cleanupJob(cutoff, models.JobStatusFailed, models.JobStatusFailedExpired,
return CleanupJob(cutoff, models.JobStatusFailed, models.JobStatusFailedExpired,
conf.GetEnv("FHIR_STAGING_DIR"), conf.GetEnv("FHIR_PAYLOAD_DIR"))
},
},
Expand All @@ -351,7 +351,7 @@ func setUpApp() *cli.App {
},
Action: func(c *cli.Context) error {
cutoff := time.Now().Add(-(time.Hour * time.Duration(thresholdHr)))
return cleanupJob(cutoff, models.JobStatusCancelled, models.JobStatusCancelledExpired,
return CleanupJob(cutoff, models.JobStatusCancelled, models.JobStatusCancelledExpired,
conf.GetEnv("FHIR_STAGING_DIR"), conf.GetEnv("FHIR_PAYLOAD_DIR"))
},
},
Expand Down Expand Up @@ -748,7 +748,7 @@ func revokeAccessToken(accessToken string) error {
return auth.GetProvider().RevokeAccessToken(accessToken)
}

func archiveExpiring(maxDate time.Time) error {
func ArchiveExpiring(maxDate time.Time) error {
log.API.Info("Archiving expiring job files...")

jobs, err := r.GetJobsByUpdateTimeAndStatus(context.Background(),
Expand Down Expand Up @@ -786,7 +786,7 @@ func archiveExpiring(maxDate time.Time) error {
return lastJobError
}

func cleanupJob(maxDate time.Time, currentStatus, newStatus models.JobStatus, rootDirsToClean ...string) error {
func CleanupJob(maxDate time.Time, currentStatus, newStatus models.JobStatus, rootDirsToClean ...string) error {
jobs, err := r.GetJobsByUpdateTimeAndStatus(context.Background(),
time.Time{}, maxDate, currentStatus)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions bcda/constants/test_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ const MockClient = "mock-client"
const CMSIDArg = "--cms-id"
const ThresholdArg = "--threshold"
const CleanupArchArg = "cleanup-archive"
const CleanupFailedArg = "cleanup-failed"
const CleanupCancelledArg = "cleanup-cancelled"
const ArchiveJobFiles = "archive-job-files"
const CreateGroupArg = "create-group"
const NameArg = "--name"
const ACOIDArg = "--aco-id"
Expand Down
87 changes: 84 additions & 3 deletions bcdaworker/queueing/river.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,22 @@ import (
"path/filepath"
"time"

"github.com/CMSgov/bcda-app/bcda/bcdacli"
"github.com/CMSgov/bcda-app/bcda/constants"
"github.com/CMSgov/bcda-app/bcda/database"
"github.com/CMSgov/bcda-app/bcda/metrics"
"github.com/CMSgov/bcda-app/bcda/models"
"github.com/CMSgov/bcda-app/bcda/utils"
"github.com/CMSgov/bcda-app/bcdaworker/repository/postgres"
"github.com/CMSgov/bcda-app/bcdaworker/worker"
"github.com/CMSgov/bcda-app/conf"
"github.com/CMSgov/bcda-app/log"
"github.com/ccoveille/go-safecast"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/robfig/cron/v3"
sloglogrus "github.com/samber/slog-logrus"
"github.com/sirupsen/logrus"
)
Expand All @@ -29,15 +33,37 @@ import (
func StartRiver(numWorkers int) *queue {
workers := river.NewWorkers()
river.AddWorker(workers, &JobWorker{})
river.AddWorker(workers, &CleanupJobWorker{})

schedule, err := cron.ParseStandard("0 11,23 * * *")
bhagatparwinder marked this conversation as resolved.
Show resolved Hide resolved
bhagatparwinder marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
panic("Invalid cron schedule")
}

periodicJobs := []*river.PeriodicJob{
river.NewPeriodicJob(
schedule,
func() (river.JobArgs, *river.InsertOpts) {
return CleanupJobArgs{}, &river.InsertOpts{
UniqueOpts: river.UniqueOpts{
ByArgs: true,
},
}
},
&river.PeriodicJobOpts{RunOnStart: true},
),
}

riverClient, err := river.NewClient(riverpgxv5.New(database.Pgxv5Pool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: numWorkers},
},
// TODO: whats an appropriate timeout?
JobTimeout: -1, // default for river is 1m, que-go had no timeout, mimicking que-go for now
Logger: getSlogLogger(),
Workers: workers,
JobTimeout: -1, // default for river is 1m, que-go had no timeout, mimicking que-go for now
bhagatparwinder marked this conversation as resolved.
Show resolved Hide resolved
Logger: getSlogLogger(),
Workers: workers,
PeriodicJobs: periodicJobs,
})
if err != nil {
panic(err)
Expand Down Expand Up @@ -99,6 +125,61 @@ type JobWorker struct {
river.WorkerDefaults[models.JobEnqueueArgs]
}

type CleanupJobArgs struct {
}

type CleanupJobWorker struct {
river.WorkerDefaults[CleanupJobArgs]
}

func (args CleanupJobArgs) Kind() string {
bhagatparwinder marked this conversation as resolved.
Show resolved Hide resolved
return "CleanupJob"
}
bhagatparwinder marked this conversation as resolved.
Show resolved Hide resolved

func (w *CleanupJobWorker) Work(ctx context.Context, rjob *river.Job[CleanupJobArgs]) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

ctx = log.NewStructuredLoggerEntry(log.Worker, ctx)
ctx, logger := log.SetCtxLogger(ctx, "transaction_id", uuid.New())

cutoff := getCutOffTime()
archiveDir := conf.GetEnv("FHIR_ARCHIVE_DIR")
stagingDir := conf.GetEnv("FHIR_STAGING_DIR")
payloadDir := conf.GetEnv("PAYLOAD_DIR")

// Cleanup archived jobs: remove job directory and files from archive and update job status to Expired
if err := bcdacli.CleanupJob(cutoff, models.JobStatusArchived, models.JobStatusExpired, archiveDir, stagingDir); err != nil {
bhagatparwinder marked this conversation as resolved.
Show resolved Hide resolved
logger.Error(errors.Wrap(err, fmt.Sprintf("failed to process job: %s", constants.CleanupArchArg)))
return err
}

// Cleanup failed jobs: remove job directory and files from failed jobs and update job status to FailedExpired
if err := bcdacli.CleanupJob(cutoff, models.JobStatusFailed, models.JobStatusFailedExpired, stagingDir, payloadDir); err != nil {
logger.Error(errors.Wrap(err, fmt.Sprintf("failed to process job: %s", constants.CleanupFailedArg)))
return err
}

// Cleanup cancelled jobs: remove job directory and files from cancelled jobs and update job status to CancelledExpired
if err := bcdacli.CleanupJob(cutoff, models.JobStatusCancelled, models.JobStatusCancelledExpired, stagingDir, payloadDir); err != nil {
logger.Error(errors.Wrap(err, fmt.Sprintf("failed to process job: %s", constants.CleanupCancelledArg)))
return err
}

// Archive expiring jobs: update job statuses and move files to an inaccessible location
if err := bcdacli.ArchiveExpiring(cutoff); err != nil {
logger.Error(errors.Wrap(err, fmt.Sprintf("failed to process job: %s", constants.ArchiveJobFiles)))
return err
}

return nil
}

func getCutOffTime() time.Time {
cutoff := time.Now().Add(-time.Hour * time.Duration(utils.GetEnvInt("ARCHIVE_THRESHOLD_HR", 24)))
return cutoff
}

func (w *JobWorker) Work(ctx context.Context, rjob *river.Job[models.JobEnqueueArgs]) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ require (
github.com/riverqueue/river/riverdriver v0.14.1 // indirect
github.com/riverqueue/river/rivershared v0.14.1 // indirect
github.com/riverqueue/river/rivertype v0.14.1 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/samber/lo v1.44.0 // indirect
github.com/tidwall/gjson v1.18.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
Expand Down
Loading