diff --git a/bcda/bcdacli/cli.go b/bcda/bcdacli/cli.go index b4854a217..1af2fee28 100644 --- a/bcda/bcdacli/cli.go +++ b/bcda/bcdacli/cli.go @@ -13,7 +13,6 @@ import ( "path" "path/filepath" "regexp" - "strconv" "strings" "syscall" "time" @@ -39,7 +38,6 @@ import ( "github.com/CMSgov/bcda-app/conf" "github.com/CMSgov/bcda-app/log" "github.com/CMSgov/bcda-app/optout" - "github.com/sirupsen/logrus" "github.com/pborman/uuid" "github.com/pkg/errors" @@ -74,7 +72,6 @@ func setUpApp() *cli.App { fmt.Println("Error converting FILE_ARCHIVE_THRESHOLD_HR to uint", err) } var acoName, acoCMSID, acoID, accessToken, acoSize, filePath, fileSource, s3Endpoint, assumeRoleArn, environment, groupID, groupName, ips, fileType, alrFile string - var thresholdHr int var httpPort, httpsPort int app.Commands = []cli.Command{ { @@ -286,75 +283,6 @@ func setUpApp() *cli.App { return nil }, }, - { - Name: "archive-job-files", - Category: "Cleanup", - Usage: "Update job statuses and move files to an inaccessible location", - Flags: []cli.Flag{ - cli.IntFlag{ - Name: "threshold", - Value: 24, - Usage: constants.CliArchDesc, - EnvVar: "ARCHIVE_THRESHOLD_HR", - Destination: &thresholdHr, - }, - }, - Action: func(c *cli.Context) error { - cutoff := time.Now().Add(-time.Hour * time.Duration(thresholdHr)) - return archiveExpiring(cutoff) - }, - }, - { - Name: constants.CleanupArchArg, - Category: "Cleanup", - Usage: constants.CliRemoveArchDesc, - Flags: []cli.Flag{ - cli.IntFlag{ - Name: "threshold", - Usage: constants.CliArchDesc, - Destination: &thresholdHr, - }, - }, - Action: func(c *cli.Context) error { - cutoff := time.Now().Add(-time.Hour * time.Duration(thresholdHr)) - return cleanupJob(cutoff, models.JobStatusArchived, models.JobStatusExpired, - conf.GetEnv("FHIR_ARCHIVE_DIR"), conf.GetEnv("FHIR_STAGING_DIR")) - }, - }, - { - Name: "cleanup-failed", - Category: "Cleanup", - Usage: constants.CliRemoveArchDesc, - Flags: []cli.Flag{ - cli.IntFlag{ - Name: "threshold", - Usage: constants.CliArchDesc, - Destination: &thresholdHr, - }, - }, - Action: func(c *cli.Context) error { - cutoff := time.Now().Add(-(time.Hour * time.Duration(thresholdHr))) - return cleanupJob(cutoff, models.JobStatusFailed, models.JobStatusFailedExpired, - conf.GetEnv("FHIR_STAGING_DIR"), conf.GetEnv("FHIR_PAYLOAD_DIR")) - }, - }, - { - Name: "cleanup-cancelled", - Category: "Cleanup", - Usage: constants.CliRemoveArchDesc, - Flags: []cli.Flag{ - cli.IntFlag{ - Name: "threshold", - Usage: constants.CliRemoveArchDesc, - Destination: &thresholdHr, - }, - }, - Action: func(c *cli.Context) error { - cutoff := time.Now().Add(-(time.Hour * time.Duration(thresholdHr))) - return cleanupJob(cutoff, models.JobStatusCancelled, models.JobStatusCancelledExpired, - conf.GetEnv("FHIR_STAGING_DIR"), conf.GetEnv("FHIR_PAYLOAD_DIR")) - }, - }, { Name: "import-cclf-directory", Category: constants.CliDataImpCategory, @@ -748,90 +676,6 @@ func revokeAccessToken(accessToken string) error { return auth.GetProvider().RevokeAccessToken(accessToken) } -func archiveExpiring(maxDate time.Time) error { - log.API.Info("Archiving expiring job files...") - - jobs, err := r.GetJobsByUpdateTimeAndStatus(context.Background(), - time.Time{}, maxDate, models.JobStatusCompleted) - if err != nil { - log.API.Error(err) - return err - } - - var lastJobError error - for _, j := range jobs { - id := j.ID - jobPayloadDir := fmt.Sprintf("%s/%d", conf.GetEnv("FHIR_PAYLOAD_DIR"), id) - _, err = os.Stat(jobPayloadDir) - jobPayloadDirExist := err == nil - jobArchiveDir := fmt.Sprintf("%s/%d", conf.GetEnv("FHIR_ARCHIVE_DIR"), id) - - if jobPayloadDirExist { - err = os.Rename(jobPayloadDir, jobArchiveDir) - if err != nil { - log.API.Error(err) - lastJobError = err - continue - } - } - - j.Status = models.JobStatusArchived - err = r.UpdateJob(context.Background(), *j) - if err != nil { - log.API.Error(err) - lastJobError = err - } - } - - return lastJobError -} - -func cleanupJob(maxDate time.Time, currentStatus, newStatus models.JobStatus, rootDirsToClean ...string) error { - jobs, err := r.GetJobsByUpdateTimeAndStatus(context.Background(), - time.Time{}, maxDate, currentStatus) - if err != nil { - return err - } - - if len(jobs) == 0 { - log.API.Infof("No %s job files to clean", currentStatus) - return nil - } - - for _, job := range jobs { - if err := cleanupJobData(job.ID, rootDirsToClean...); err != nil { - log.API.Errorf("Unable to cleanup directories %s", err) - continue - } - - job.Status = newStatus - err = r.UpdateJob(context.Background(), *job) - if err != nil { - log.API.Errorf("Failed to update job status to %s %s", newStatus, err) - continue - } - - log.API.WithFields(logrus.Fields{ - "job_began": job.CreatedAt, - "files_removed": time.Now(), - "job_id": job.ID, - }).Infof("Files cleaned from %s and job status set to %s", rootDirsToClean, newStatus) - } - - return nil -} - -func cleanupJobData(jobID uint, rootDirs ...string) error { - for _, rootDirToClean := range rootDirs { - dir := filepath.Join(rootDirToClean, strconv.FormatUint(uint64(jobID), 10)) - if err := os.RemoveAll(dir); err != nil { - return fmt.Errorf("unable to remove %s because %s", dir, err) - } - } - - return nil -} - func setDenylistState(cmsID string, td *models.Termination) error { aco, err := r.GetACOByCMSID(context.Background(), cmsID) if err != nil { diff --git a/bcda/bcdacli/cli_test.go b/bcda/bcdacli/cli_test.go index cf7f2e0ce..3eaf1182f 100644 --- a/bcda/bcdacli/cli_test.go +++ b/bcda/bcdacli/cli_test.go @@ -10,7 +10,6 @@ import ( "net/http/httptest" "os" "os/signal" - "path" "path/filepath" "regexp" "strconv" @@ -239,320 +238,6 @@ func (s *CLITestSuite) TestRevokeToken() { mock.AssertExpectations(s.T()) } -func (s *CLITestSuite) TestArchiveExpiring() { - assert := assert.New(s.T()) - - // condition: no jobs exist - args := []string{"bcda", constants.ArchJobFiles} - err := s.testApp.Run(args) - assert.Nil(err) - - // timestamp to ensure that the job gets archived (older than the default 24h window) - t := time.Now().Add(-48 * time.Hour) - j := models.Job{ - ACOID: uuid.Parse(constants.TestACOID), - RequestURL: constants.V1Path + constants.EOBExportPath, - Status: models.JobStatusCompleted, - CreatedAt: t, - UpdatedAt: t, - } - postgrestest.CreateJobs(s.T(), s.db, &j) - - conf.SetEnv(s.T(), "FHIR_PAYLOAD_DIR", "../bcdaworker/data/test") - conf.SetEnv(s.T(), "FHIR_ARCHIVE_DIR", constants.TestArchivePath) - - path := fmt.Sprintf("%s/%d/", conf.GetEnv("FHIR_PAYLOAD_DIR"), j.ID) - newpath := conf.GetEnv("FHIR_ARCHIVE_DIR") - - if _, err = os.Stat(path); os.IsNotExist(err) { - err = os.MkdirAll(path, os.ModePerm) - if err != nil { - s.T().Error(err) - } - } - - if _, err = os.Stat(newpath); os.IsNotExist(err) { - err = os.MkdirAll(newpath, os.ModePerm) - if err != nil { - s.T().Error(err) - } - } - - f, err := os.Create(fmt.Sprintf("%s/fake.ndjson", path)) - if err != nil { - s.T().Error(err) - } - defer f.Close() - - // condition: normal execution - // execute the test case from CLI - args = []string{"bcda", constants.ArchJobFiles} - err = s.testApp.Run(args) - assert.Nil(err) - - // check that the file has moved to the archive location - expPath := fmt.Sprintf("%s/%d/fake.ndjson", conf.GetEnv("FHIR_ARCHIVE_DIR"), j.ID) - _, err = os.ReadFile(expPath) - if err != nil { - s.T().Error(err) - } - assert.FileExists(expPath, "File not Found") - - testJob := postgrestest.GetJobByID(s.T(), s.db, j.ID) - - // check the status of the job - assert.Equal(models.JobStatusArchived, testJob.Status) - - // clean up - os.RemoveAll(conf.GetEnv("FHIR_ARCHIVE_DIR")) -} - -func (s *CLITestSuite) TestArchiveExpiringWithoutPayloadDir() { - assert := assert.New(s.T()) - - // condition: no jobs exist - args := []string{"bcda", constants.ArchJobFiles} - err := s.testApp.Run(args) - assert.Nil(err) - - // timestamp to ensure that the job gets archived (older than the default 24h window) - t := time.Now().Add(-48 * time.Hour) - j := models.Job{ - ACOID: uuid.Parse(constants.TestACOID), - RequestURL: constants.V1Path + constants.EOBExportPath, - Status: models.JobStatusCompleted, - CreatedAt: t, - UpdatedAt: t, - } - postgrestest.CreateJobs(s.T(), s.db, &j) - - // condition: normal execution - // execute the test case from CLI - args = []string{"bcda", constants.ArchJobFiles} - err = s.testApp.Run(args) - assert.Nil(err) - - testJob := postgrestest.GetJobByID(s.T(), s.db, j.ID) - - // check the status of the job - assert.Equal(models.JobStatusArchived, testJob.Status) - - // clean up - os.RemoveAll(conf.GetEnv("FHIR_ARCHIVE_DIR")) -} - -func (s *CLITestSuite) TestArchiveExpiringWithThreshold() { - // save a job to our db - j := models.Job{ - ACOID: uuid.Parse(constants.TestACOID), - RequestURL: constants.V1Path + constants.EOBExportPath, - Status: models.JobStatusCompleted, - } - postgrestest.CreateJobs(s.T(), s.db, &j) - - conf.SetEnv(s.T(), "FHIR_PAYLOAD_DIR", "../bcdaworker/data/test") - conf.SetEnv(s.T(), "FHIR_ARCHIVE_DIR", constants.TestArchivePath) - - path := fmt.Sprintf("%s/%d/", conf.GetEnv("FHIR_PAYLOAD_DIR"), j.ID) - if _, err := os.Stat(path); os.IsNotExist(err) { - err = os.MkdirAll(path, os.ModePerm) - if err != nil { - s.T().Error(err) - } - } - - f, err := os.Create(fmt.Sprintf("%s/fake.ndjson", path)) - if err != nil { - s.T().Error(err) - } - defer f.Close() - - // execute the test case from CLI - args := []string{"bcda", constants.ArchJobFiles, constants.ThresholdArg, "1"} - err = s.testApp.Run(args) - assert.Nil(s.T(), err) - - // check that the file has not moved to the archive location - dataPath := fmt.Sprintf("%s/%d/fake.ndjson", conf.GetEnv("FHIR_PAYLOAD_DIR"), j.ID) - _, err = os.ReadFile(dataPath) - if err != nil { - s.T().Error(err) - } - assert.FileExists(s.T(), dataPath, "File not Found") - - testJob := postgrestest.GetJobByID(s.T(), s.db, j.ID) - // check the status of the job - assert.Equal(s.T(), models.JobStatusCompleted, testJob.Status) - - // clean up - os.Remove(dataPath) -} - -func (s *CLITestSuite) TestCleanArchive() { - // init - const Threshold = 30 - now := time.Now() - - assert := assert.New(s.T()) - - // condition: FHIR_ARCHIVE_DIR doesn't exist - conf.UnsetEnv(s.T(), "FHIR_ARCHIVE_DIR") - args := []string{"bcda", constants.CleanupArchArg, constants.ThresholdArg, strconv.Itoa(Threshold)} - err := s.testApp.Run(args) - assert.Nil(err) - conf.SetEnv(s.T(), "FHIR_ARCHIVE_DIR", constants.TestArchivePath) - - // condition: FHIR_STAGING_DIR doesn't exist - conf.UnsetEnv(s.T(), "FHIR_STAGING_DIR") - args = []string{"bcda", constants.CleanupArchArg, constants.ThresholdArg, strconv.Itoa(Threshold)} - err = s.testApp.Run(args) - assert.Nil(err) - conf.SetEnv(s.T(), "FHIR_STAGING_DIR", constants.TestStagingPath) - - // condition: no jobs exist - args = []string{"bcda", constants.CleanupArchArg, constants.ThresholdArg, strconv.Itoa(Threshold)} - err = s.testApp.Run(args) - assert.Nil(err) - - // create a file that was last modified before the Threshold, but accessed after it - modified := now.Add(-(time.Hour * (Threshold + 1))) - beforeJobID, before := s.setupJobFile(modified, models.JobStatusArchived, conf.GetEnv("FHIR_ARCHIVE_DIR")) - defer before.Close() - - // create a file that is clearly after the threshold (unless the threshold is 0) - afterJobID, after := s.setupJobFile(now, models.JobStatusArchived, conf.GetEnv("FHIR_ARCHIVE_DIR")) - defer after.Close() - - // condition: bad threshold value - args = []string{"bcda", constants.CleanupArchArg, constants.ThresholdArg, "abcde"} - err = s.testApp.Run(args) - assert.EqualError(err, "invalid value \"abcde\" for flag -threshold: parse error") - - // condition: before < Threshold < after <= now - // a file created before the Threshold should be deleted; one created after should not - // we use last modified as a proxy for created, because these files should not be changed after creation - args = []string{"bcda", constants.CleanupArchArg, constants.ThresholdArg, strconv.Itoa(Threshold)} - err = s.testApp.Run(args) - assert.Nil(err) - - assertFileNotExists(s.T(), before.Name()) - - beforeJob := postgrestest.GetJobByID(s.T(), s.db, beforeJobID) - assert.Equal(models.JobStatusExpired, beforeJob.Status) - - assert.FileExists(after.Name(), "%s not found; it should have been", after.Name()) - - afterJob := postgrestest.GetJobByID(s.T(), s.db, afterJobID) - assert.Equal(models.JobStatusArchived, afterJob.Status) - - // I think this is an application directory and should always exist, but that doesn't seem to be the norm - os.RemoveAll(conf.GetEnv("FHIR_ARCHIVE_DIR")) -} - -func (s *CLITestSuite) TestCleanupFailed() { - const threshold = 30 - modified := time.Now().Add(-(time.Hour * (threshold + 1))) - beforePayloadJobID, beforePayload := s.setupJobFile(modified, models.JobStatusFailed, conf.GetEnv("FHIR_PAYLOAD_DIR")) - beforeStagingJobID, beforeStaging := s.setupJobFile(modified, models.JobStatusFailed, conf.GetEnv("FHIR_STAGING_DIR")) - // Job is old enough, but does not match the status - completedJobID, completed := s.setupJobFile(modified, models.JobStatusCompleted, conf.GetEnv("FHIR_PAYLOAD_DIR")) - - afterPayloadJobID, afterPayload := s.setupJobFile(time.Now(), models.JobStatusFailed, conf.GetEnv("FHIR_PAYLOAD_DIR")) - afterStagingJobID, afterStaging := s.setupJobFile(time.Now(), models.JobStatusFailed, conf.GetEnv("FHIR_STAGING_DIR")) - - // Check that we can clean up jobs that do not have data - noDataID, noData := s.setupJobFile(modified, models.JobStatusFailed, conf.GetEnv("FHIR_STAGING_DIR")) - dir, _ := path.Split(noData.Name()) - os.RemoveAll(dir) - assertFileNotExists(s.T(), noData.Name()) - - shouldExist := []*os.File{afterPayload, afterStaging, completed} - shouldNotExist := []*os.File{beforePayload, beforeStaging, noData} - - defer func() { - for _, f := range append(shouldExist, shouldNotExist...) { - os.Remove(f.Name()) - f.Close() - } - }() - - err := s.testApp.Run([]string{"bcda", "cleanup-failed", constants.ThresholdArg, strconv.Itoa(threshold)}) - assert.NoError(s.T(), err) - - assert.Equal(s.T(), models.JobStatusFailedExpired, - postgrestest.GetJobByID(s.T(), s.db, beforePayloadJobID).Status) - assert.Equal(s.T(), models.JobStatusFailedExpired, - postgrestest.GetJobByID(s.T(), s.db, noDataID).Status) - assert.Equal(s.T(), models.JobStatusFailedExpired, - postgrestest.GetJobByID(s.T(), s.db, beforeStagingJobID).Status) - - assert.Equal(s.T(), models.JobStatusFailed, - postgrestest.GetJobByID(s.T(), s.db, afterPayloadJobID).Status) - assert.Equal(s.T(), models.JobStatusFailed, - postgrestest.GetJobByID(s.T(), s.db, afterStagingJobID).Status) - assert.Equal(s.T(), models.JobStatusCompleted, - postgrestest.GetJobByID(s.T(), s.db, completedJobID).Status) - - for _, f := range shouldExist { - assert.FileExists(s.T(), f.Name()) - } - for _, f := range shouldNotExist { - assertFileNotExists(s.T(), f.Name()) - } -} - -func (s *CLITestSuite) TestCleanupCancelled() { - const threshold = 30 - modified := time.Now().Add(-(time.Hour * (threshold + 1))) - beforePayloadJobID, beforePayload := s.setupJobFile(modified, models.JobStatusCancelled, conf.GetEnv("FHIR_PAYLOAD_DIR")) - beforeStagingJobID, beforeStaging := s.setupJobFile(modified, models.JobStatusCancelled, conf.GetEnv("FHIR_STAGING_DIR")) - // Job is old enough, but does not match the status - completedJobID, completed := s.setupJobFile(modified, models.JobStatusCompleted, conf.GetEnv("FHIR_PAYLOAD_DIR")) - - afterPayloadJobID, afterPayload := s.setupJobFile(time.Now(), models.JobStatusCancelled, conf.GetEnv("FHIR_PAYLOAD_DIR")) - afterStagingJobID, afterStaging := s.setupJobFile(time.Now(), models.JobStatusCancelled, conf.GetEnv("FHIR_STAGING_DIR")) - - // Check that we can clean up jobs that do not have data - noDataID, noData := s.setupJobFile(modified, models.JobStatusCancelled, conf.GetEnv("FHIR_STAGING_DIR")) - dir, _ := path.Split(noData.Name()) - os.RemoveAll(dir) - assertFileNotExists(s.T(), noData.Name()) - - shouldExist := []*os.File{afterPayload, afterStaging, completed} - shouldNotExist := []*os.File{beforePayload, beforeStaging, noData} - - defer func() { - for _, f := range append(shouldExist, shouldNotExist...) { - os.Remove(f.Name()) - f.Close() - } - }() - - err := s.testApp.Run([]string{"bcda", "cleanup-cancelled", constants.ThresholdArg, strconv.Itoa(threshold)}) - assert.NoError(s.T(), err) - - assert.Equal(s.T(), models.JobStatusCancelledExpired, - postgrestest.GetJobByID(s.T(), s.db, beforePayloadJobID).Status) - assert.Equal(s.T(), models.JobStatusCancelledExpired, - postgrestest.GetJobByID(s.T(), s.db, noDataID).Status) - assert.Equal(s.T(), models.JobStatusCancelledExpired, - postgrestest.GetJobByID(s.T(), s.db, beforeStagingJobID).Status) - - assert.Equal(s.T(), models.JobStatusCancelled, - postgrestest.GetJobByID(s.T(), s.db, afterPayloadJobID).Status) - assert.Equal(s.T(), models.JobStatusCancelled, - postgrestest.GetJobByID(s.T(), s.db, afterStagingJobID).Status) - assert.Equal(s.T(), models.JobStatusCompleted, - postgrestest.GetJobByID(s.T(), s.db, completedJobID).Status) - - for _, f := range shouldExist { - assert.FileExists(s.T(), f.Name()) - } - for _, f := range shouldNotExist { - assertFileNotExists(s.T(), f.Name()) - } -} - func (s *CLITestSuite) TestStartAPI() { httpsPort, httpPort := strconv.Itoa(getRandomPort(s.T())), strconv.Itoa(getRandomPort(s.T())) args := []string{"bcda", "start-api", "--https-port", httpsPort, "--http-port", httpPort} @@ -990,30 +675,6 @@ func (s *CLITestSuite) TestGenerateAlrData() { assert.EqualError(s.T(), err, "no CCLF8 file found for CMS ID UNKNOWN_ACO") } -func (s *CLITestSuite) setupJobFile(modified time.Time, status models.JobStatus, rootPath string) (uint, *os.File) { - j := models.Job{ - ACOID: s.testACO.UUID, - RequestURL: constants.V1Path + constants.EOBExportPath, - Status: status, - UpdatedAt: modified, - } - - postgrestest.CreateJobs(s.T(), s.db, &j) - - path := fmt.Sprintf("%s/%d", rootPath, j.ID) - - if err := os.MkdirAll(path, os.ModePerm); err != nil { - s.T().Error(err) - } - jobFile, err := os.Create(fmt.Sprintf("%s/%s", path, "fake.ndjson")) - if err != nil { - s.T().Error(err) - } - defer jobFile.Close() - - return j.ID, jobFile -} - func createTestZipFile(zFile string, cclfFiles ...string) error { zf, err := os.Create(zFile) if err != nil { @@ -1042,8 +703,3 @@ func getFileCount(t *testing.T, path string) int { assert.NoError(t, err) return len(f) } - -func assertFileNotExists(t *testing.T, path string) { - _, err := os.Stat(path) - assert.True(t, os.IsNotExist(err), "file %s should not be found", path) -} diff --git a/bcda/constants/test_constants.go b/bcda/constants/test_constants.go index bcfaaab19..5dc625aa0 100644 --- a/bcda/constants/test_constants.go +++ b/bcda/constants/test_constants.go @@ -51,6 +51,9 @@ const MockClient = "mock-client" const CMSIDArg = "--cms-id" const ThresholdArg = "--threshold" const CleanupArchArg = "cleanup-archive" +const CleanupFailedArg = "cleanup-failed" +const CleanupCancelledArg = "cleanup-cancelled" +const ArchiveJobFiles = "archive-job-files" const CreateGroupArg = "create-group" const NameArg = "--name" const ACOIDArg = "--aco-id" diff --git a/bcdaworker/cleanup/cleanup.go b/bcdaworker/cleanup/cleanup.go new file mode 100644 index 000000000..b9cbcffad --- /dev/null +++ b/bcdaworker/cleanup/cleanup.go @@ -0,0 +1,105 @@ +package cleanup + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strconv" + "time" + + "github.com/CMSgov/bcda-app/bcda/database" + "github.com/CMSgov/bcda-app/bcda/models" + "github.com/CMSgov/bcda-app/bcda/models/postgres" + "github.com/CMSgov/bcda-app/conf" + "github.com/CMSgov/bcda-app/log" + "github.com/sirupsen/logrus" +) + +func ArchiveExpiring(maxDate time.Time) error { + log.API.Info("Archiving expiring job files...") + + db := database.Connection + r := postgres.NewRepository(db) + jobs, err := r.GetJobsByUpdateTimeAndStatus(context.Background(), + time.Time{}, maxDate, models.JobStatusCompleted) + if err != nil { + log.API.Error(err) + return err + } + + var lastJobError error + for _, j := range jobs { + id := j.ID + jobPayloadDir := fmt.Sprintf("%s/%d", conf.GetEnv("FHIR_PAYLOAD_DIR"), id) + _, err = os.Stat(jobPayloadDir) + jobPayloadDirExist := err == nil + jobArchiveDir := fmt.Sprintf("%s/%d", conf.GetEnv("FHIR_ARCHIVE_DIR"), id) + + if jobPayloadDirExist { + err = os.Rename(jobPayloadDir, jobArchiveDir) + if err != nil { + log.API.Error(err) + lastJobError = err + continue + } + } + + j.Status = models.JobStatusArchived + err = r.UpdateJob(context.Background(), *j) + if err != nil { + log.API.Error(err) + lastJobError = err + } + } + + return lastJobError +} + +func CleanupJob(maxDate time.Time, currentStatus, newStatus models.JobStatus, rootDirsToClean ...string) error { + db := database.Connection + r := postgres.NewRepository(db) + jobs, err := r.GetJobsByUpdateTimeAndStatus(context.Background(), + time.Time{}, maxDate, currentStatus) + if err != nil { + return err + } + + if len(jobs) == 0 { + log.API.Infof("No %s job files to clean", currentStatus) + return nil + } + + for _, job := range jobs { + if err := cleanupJobData(job.ID, rootDirsToClean...); err != nil { + log.API.Errorf("Unable to cleanup directories %s", err) + continue + } + + job.Status = newStatus + err = r.UpdateJob(context.Background(), *job) + if err != nil { + log.API.Errorf("Failed to update job status to %s %s", newStatus, err) + continue + } + + log.API.WithFields(logrus.Fields{ + "job_began": job.CreatedAt, + "files_removed": time.Now(), + "job_id": job.ID, + }).Infof("Files cleaned from %s and job status set to %s", rootDirsToClean, newStatus) + } + + return nil +} + +func cleanupJobData(jobID uint, rootDirs ...string) error { + for _, rootDirToClean := range rootDirs { + dir := filepath.Join(rootDirToClean, strconv.FormatUint(uint64(jobID), 10)) + if err := os.RemoveAll(dir); err != nil { + return fmt.Errorf("unable to remove %s because %s", dir, err) + } + } + + return nil +} diff --git a/bcdaworker/cleanup/cleanup_test.go b/bcdaworker/cleanup/cleanup_test.go new file mode 100644 index 000000000..a9ad41608 --- /dev/null +++ b/bcdaworker/cleanup/cleanup_test.go @@ -0,0 +1,372 @@ +package cleanup + +import ( + "database/sql" + "fmt" + "os" + "path" + "strconv" + "testing" + "time" + + "github.com/CMSgov/bcda-app/bcda/constants" + "github.com/CMSgov/bcda-app/bcda/models" + "github.com/CMSgov/bcda-app/bcda/models/postgres/postgrestest" + "github.com/CMSgov/bcda-app/conf" + "github.com/pborman/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + "github.com/urfave/cli" +) + +type CleanupTestSuite struct { + suite.Suite + testApp *cli.App + + testACO models.ACO + + db *sql.DB +} + +func (s *CleanupTestSuite) setupJobFile(modified time.Time, status models.JobStatus, rootPath string) (uint, *os.File) { + j := models.Job{ + ACOID: s.testACO.UUID, + RequestURL: constants.V1Path + constants.EOBExportPath, + Status: status, + UpdatedAt: modified, + } + + postgrestest.CreateJobs(s.T(), s.db, &j) + + path := fmt.Sprintf("%s/%d", rootPath, j.ID) + + if err := os.MkdirAll(path, os.ModePerm); err != nil { + s.T().Error(err) + } + jobFile, err := os.Create(fmt.Sprintf("%s/%s", path, "fake.ndjson")) + if err != nil { + s.T().Error(err) + } + defer jobFile.Close() + + return j.ID, jobFile +} + +func assertFileNotExists(t *testing.T, path string) { + _, err := os.Stat(path) + assert.True(t, os.IsNotExist(err), "file %s should not be found", path) +} + +func (s *CleanupTestSuite) TestArchiveExpiring() { + assert := assert.New(s.T()) + + // condition: no jobs exist + args := []string{"bcda", constants.ArchJobFiles} + err := s.testApp.Run(args) + assert.Nil(err) + + // timestamp to ensure that the job gets archived (older than the default 24h window) + t := time.Now().Add(-48 * time.Hour) + j := models.Job{ + ACOID: uuid.Parse(constants.TestACOID), + RequestURL: constants.V1Path + constants.EOBExportPath, + Status: models.JobStatusCompleted, + CreatedAt: t, + UpdatedAt: t, + } + postgrestest.CreateJobs(s.T(), s.db, &j) + + conf.SetEnv(s.T(), "FHIR_PAYLOAD_DIR", "../bcdaworker/data/test") + conf.SetEnv(s.T(), "FHIR_ARCHIVE_DIR", constants.TestArchivePath) + + path := fmt.Sprintf("%s/%d/", conf.GetEnv("FHIR_PAYLOAD_DIR"), j.ID) + newpath := conf.GetEnv("FHIR_ARCHIVE_DIR") + + if _, err = os.Stat(path); os.IsNotExist(err) { + err = os.MkdirAll(path, os.ModePerm) + if err != nil { + s.T().Error(err) + } + } + + if _, err = os.Stat(newpath); os.IsNotExist(err) { + err = os.MkdirAll(newpath, os.ModePerm) + if err != nil { + s.T().Error(err) + } + } + + f, err := os.Create(fmt.Sprintf("%s/fake.ndjson", path)) + if err != nil { + s.T().Error(err) + } + defer f.Close() + + // condition: normal execution + // execute the test case from CLI + args = []string{"bcda", constants.ArchJobFiles} + err = s.testApp.Run(args) + assert.Nil(err) + + // check that the file has moved to the archive location + expPath := fmt.Sprintf("%s/%d/fake.ndjson", conf.GetEnv("FHIR_ARCHIVE_DIR"), j.ID) + _, err = os.ReadFile(expPath) + if err != nil { + s.T().Error(err) + } + assert.FileExists(expPath, "File not Found") + + testJob := postgrestest.GetJobByID(s.T(), s.db, j.ID) + + // check the status of the job + assert.Equal(models.JobStatusArchived, testJob.Status) + + // clean up + os.RemoveAll(conf.GetEnv("FHIR_ARCHIVE_DIR")) +} + +func (s *CleanupTestSuite) TestArchiveExpiringWithoutPayloadDir() { + assert := assert.New(s.T()) + + // condition: no jobs exist + args := []string{"bcda", constants.ArchJobFiles} + err := s.testApp.Run(args) + assert.Nil(err) + + // timestamp to ensure that the job gets archived (older than the default 24h window) + t := time.Now().Add(-48 * time.Hour) + j := models.Job{ + ACOID: uuid.Parse(constants.TestACOID), + RequestURL: constants.V1Path + constants.EOBExportPath, + Status: models.JobStatusCompleted, + CreatedAt: t, + UpdatedAt: t, + } + postgrestest.CreateJobs(s.T(), s.db, &j) + + // condition: normal execution + // execute the test case from CLI + args = []string{"bcda", constants.ArchJobFiles} + err = s.testApp.Run(args) + assert.Nil(err) + + testJob := postgrestest.GetJobByID(s.T(), s.db, j.ID) + + // check the status of the job + assert.Equal(models.JobStatusArchived, testJob.Status) + + // clean up + os.RemoveAll(conf.GetEnv("FHIR_ARCHIVE_DIR")) +} + +func (s *CleanupTestSuite) TestArchiveExpiringWithThreshold() { + // save a job to our db + j := models.Job{ + ACOID: uuid.Parse(constants.TestACOID), + RequestURL: constants.V1Path + constants.EOBExportPath, + Status: models.JobStatusCompleted, + } + postgrestest.CreateJobs(s.T(), s.db, &j) + + conf.SetEnv(s.T(), "FHIR_PAYLOAD_DIR", "../bcdaworker/data/test") + conf.SetEnv(s.T(), "FHIR_ARCHIVE_DIR", constants.TestArchivePath) + + path := fmt.Sprintf("%s/%d/", conf.GetEnv("FHIR_PAYLOAD_DIR"), j.ID) + if _, err := os.Stat(path); os.IsNotExist(err) { + err = os.MkdirAll(path, os.ModePerm) + if err != nil { + s.T().Error(err) + } + } + + f, err := os.Create(fmt.Sprintf("%s/fake.ndjson", path)) + if err != nil { + s.T().Error(err) + } + defer f.Close() + + // execute the test case from CLI + args := []string{"bcda", constants.ArchJobFiles, constants.ThresholdArg, "1"} + err = s.testApp.Run(args) + assert.Nil(s.T(), err) + + // check that the file has not moved to the archive location + dataPath := fmt.Sprintf("%s/%d/fake.ndjson", conf.GetEnv("FHIR_PAYLOAD_DIR"), j.ID) + _, err = os.ReadFile(dataPath) + if err != nil { + s.T().Error(err) + } + assert.FileExists(s.T(), dataPath, "File not Found") + + testJob := postgrestest.GetJobByID(s.T(), s.db, j.ID) + // check the status of the job + assert.Equal(s.T(), models.JobStatusCompleted, testJob.Status) + + // clean up + os.Remove(dataPath) +} + +func (s *CleanupTestSuite) TestCleanArchive() { + // init + const Threshold = 30 + now := time.Now() + + assert := assert.New(s.T()) + + // condition: FHIR_ARCHIVE_DIR doesn't exist + conf.UnsetEnv(s.T(), "FHIR_ARCHIVE_DIR") + args := []string{"bcda", constants.CleanupArchArg, constants.ThresholdArg, strconv.Itoa(Threshold)} + err := s.testApp.Run(args) + assert.Nil(err) + conf.SetEnv(s.T(), "FHIR_ARCHIVE_DIR", constants.TestArchivePath) + + // condition: FHIR_STAGING_DIR doesn't exist + conf.UnsetEnv(s.T(), "FHIR_STAGING_DIR") + args = []string{"bcda", constants.CleanupArchArg, constants.ThresholdArg, strconv.Itoa(Threshold)} + err = s.testApp.Run(args) + assert.Nil(err) + conf.SetEnv(s.T(), "FHIR_STAGING_DIR", constants.TestStagingPath) + + // condition: no jobs exist + args = []string{"bcda", constants.CleanupArchArg, constants.ThresholdArg, strconv.Itoa(Threshold)} + err = s.testApp.Run(args) + assert.Nil(err) + + // create a file that was last modified before the Threshold, but accessed after it + modified := now.Add(-(time.Hour * (Threshold + 1))) + beforeJobID, before := s.setupJobFile(modified, models.JobStatusArchived, conf.GetEnv("FHIR_ARCHIVE_DIR")) + defer before.Close() + + // create a file that is clearly after the threshold (unless the threshold is 0) + afterJobID, after := s.setupJobFile(now, models.JobStatusArchived, conf.GetEnv("FHIR_ARCHIVE_DIR")) + defer after.Close() + + // condition: bad threshold value + args = []string{"bcda", constants.CleanupArchArg, constants.ThresholdArg, "abcde"} + err = s.testApp.Run(args) + assert.EqualError(err, "invalid value \"abcde\" for flag -threshold: parse error") + + // condition: before < Threshold < after <= now + // a file created before the Threshold should be deleted; one created after should not + // we use last modified as a proxy for created, because these files should not be changed after creation + args = []string{"bcda", constants.CleanupArchArg, constants.ThresholdArg, strconv.Itoa(Threshold)} + err = s.testApp.Run(args) + assert.Nil(err) + + assertFileNotExists(s.T(), before.Name()) + + beforeJob := postgrestest.GetJobByID(s.T(), s.db, beforeJobID) + assert.Equal(models.JobStatusExpired, beforeJob.Status) + + assert.FileExists(after.Name(), "%s not found; it should have been", after.Name()) + + afterJob := postgrestest.GetJobByID(s.T(), s.db, afterJobID) + assert.Equal(models.JobStatusArchived, afterJob.Status) + + // I think this is an application directory and should always exist, but that doesn't seem to be the norm + os.RemoveAll(conf.GetEnv("FHIR_ARCHIVE_DIR")) +} + +func (s *CleanupTestSuite) TestCleanupFailed() { + const threshold = 30 + modified := time.Now().Add(-(time.Hour * (threshold + 1))) + beforePayloadJobID, beforePayload := s.setupJobFile(modified, models.JobStatusFailed, conf.GetEnv("FHIR_PAYLOAD_DIR")) + beforeStagingJobID, beforeStaging := s.setupJobFile(modified, models.JobStatusFailed, conf.GetEnv("FHIR_STAGING_DIR")) + // Job is old enough, but does not match the status + completedJobID, completed := s.setupJobFile(modified, models.JobStatusCompleted, conf.GetEnv("FHIR_PAYLOAD_DIR")) + + afterPayloadJobID, afterPayload := s.setupJobFile(time.Now(), models.JobStatusFailed, conf.GetEnv("FHIR_PAYLOAD_DIR")) + afterStagingJobID, afterStaging := s.setupJobFile(time.Now(), models.JobStatusFailed, conf.GetEnv("FHIR_STAGING_DIR")) + + // Check that we can clean up jobs that do not have data + noDataID, noData := s.setupJobFile(modified, models.JobStatusFailed, conf.GetEnv("FHIR_STAGING_DIR")) + dir, _ := path.Split(noData.Name()) + os.RemoveAll(dir) + assertFileNotExists(s.T(), noData.Name()) + + shouldExist := []*os.File{afterPayload, afterStaging, completed} + shouldNotExist := []*os.File{beforePayload, beforeStaging, noData} + + defer func() { + for _, f := range append(shouldExist, shouldNotExist...) { + os.Remove(f.Name()) + f.Close() + } + }() + + err := s.testApp.Run([]string{"bcda", "cleanup-failed", constants.ThresholdArg, strconv.Itoa(threshold)}) + assert.NoError(s.T(), err) + + assert.Equal(s.T(), models.JobStatusFailedExpired, + postgrestest.GetJobByID(s.T(), s.db, beforePayloadJobID).Status) + assert.Equal(s.T(), models.JobStatusFailedExpired, + postgrestest.GetJobByID(s.T(), s.db, noDataID).Status) + assert.Equal(s.T(), models.JobStatusFailedExpired, + postgrestest.GetJobByID(s.T(), s.db, beforeStagingJobID).Status) + + assert.Equal(s.T(), models.JobStatusFailed, + postgrestest.GetJobByID(s.T(), s.db, afterPayloadJobID).Status) + assert.Equal(s.T(), models.JobStatusFailed, + postgrestest.GetJobByID(s.T(), s.db, afterStagingJobID).Status) + assert.Equal(s.T(), models.JobStatusCompleted, + postgrestest.GetJobByID(s.T(), s.db, completedJobID).Status) + + for _, f := range shouldExist { + assert.FileExists(s.T(), f.Name()) + } + for _, f := range shouldNotExist { + assertFileNotExists(s.T(), f.Name()) + } +} + +func (s *CleanupTestSuite) TestCleanupCancelled() { + const threshold = 30 + modified := time.Now().Add(-(time.Hour * (threshold + 1))) + beforePayloadJobID, beforePayload := s.setupJobFile(modified, models.JobStatusCancelled, conf.GetEnv("FHIR_PAYLOAD_DIR")) + beforeStagingJobID, beforeStaging := s.setupJobFile(modified, models.JobStatusCancelled, conf.GetEnv("FHIR_STAGING_DIR")) + // Job is old enough, but does not match the status + completedJobID, completed := s.setupJobFile(modified, models.JobStatusCompleted, conf.GetEnv("FHIR_PAYLOAD_DIR")) + + afterPayloadJobID, afterPayload := s.setupJobFile(time.Now(), models.JobStatusCancelled, conf.GetEnv("FHIR_PAYLOAD_DIR")) + afterStagingJobID, afterStaging := s.setupJobFile(time.Now(), models.JobStatusCancelled, conf.GetEnv("FHIR_STAGING_DIR")) + + // Check that we can clean up jobs that do not have data + noDataID, noData := s.setupJobFile(modified, models.JobStatusCancelled, conf.GetEnv("FHIR_STAGING_DIR")) + dir, _ := path.Split(noData.Name()) + os.RemoveAll(dir) + assertFileNotExists(s.T(), noData.Name()) + + shouldExist := []*os.File{afterPayload, afterStaging, completed} + shouldNotExist := []*os.File{beforePayload, beforeStaging, noData} + + defer func() { + for _, f := range append(shouldExist, shouldNotExist...) { + os.Remove(f.Name()) + f.Close() + } + }() + + err := s.testApp.Run([]string{"bcda", "cleanup-cancelled", constants.ThresholdArg, strconv.Itoa(threshold)}) + assert.NoError(s.T(), err) + + assert.Equal(s.T(), models.JobStatusCancelledExpired, + postgrestest.GetJobByID(s.T(), s.db, beforePayloadJobID).Status) + assert.Equal(s.T(), models.JobStatusCancelledExpired, + postgrestest.GetJobByID(s.T(), s.db, noDataID).Status) + assert.Equal(s.T(), models.JobStatusCancelledExpired, + postgrestest.GetJobByID(s.T(), s.db, beforeStagingJobID).Status) + + assert.Equal(s.T(), models.JobStatusCancelled, + postgrestest.GetJobByID(s.T(), s.db, afterPayloadJobID).Status) + assert.Equal(s.T(), models.JobStatusCancelled, + postgrestest.GetJobByID(s.T(), s.db, afterStagingJobID).Status) + assert.Equal(s.T(), models.JobStatusCompleted, + postgrestest.GetJobByID(s.T(), s.db, completedJobID).Status) + + for _, f := range shouldExist { + assert.FileExists(s.T(), f.Name()) + } + for _, f := range shouldNotExist { + assertFileNotExists(s.T(), f.Name()) + } +} diff --git a/bcdaworker/queueing/river.go b/bcdaworker/queueing/river.go index 1e089add0..cc2d00a9f 100644 --- a/bcdaworker/queueing/river.go +++ b/bcdaworker/queueing/river.go @@ -9,35 +9,81 @@ import ( "path/filepath" "time" + bcdaaws "github.com/CMSgov/bcda-app/bcda/aws" "github.com/CMSgov/bcda-app/bcda/constants" "github.com/CMSgov/bcda-app/bcda/database" "github.com/CMSgov/bcda-app/bcda/metrics" "github.com/CMSgov/bcda-app/bcda/models" + "github.com/CMSgov/bcda-app/bcda/utils" "github.com/CMSgov/bcda-app/bcdaworker/repository/postgres" "github.com/CMSgov/bcda-app/bcdaworker/worker" "github.com/CMSgov/bcda-app/conf" "github.com/CMSgov/bcda-app/log" "github.com/ccoveille/go-safecast" + "github.com/google/uuid" "github.com/pkg/errors" "github.com/riverqueue/river" "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/robfig/cron/v3" sloglogrus "github.com/samber/slog-logrus" "github.com/sirupsen/logrus" + "github.com/slack-go/slack" ) +var slackChannel = "C034CFU945C" // #bcda-alerts + +type CleanupJobArgs struct { +} + +func (args CleanupJobArgs) Kind() string { + return "CleanupJob" +} + +type CleanupJobWorker struct { + river.WorkerDefaults[CleanupJobArgs] + cleanupJob func(time.Time, models.JobStatus, models.JobStatus, ...string) error + archiveExpiring func(time.Time) error +} + +type Notifier interface { + PostMessageContext(context.Context, string, ...slack.MsgOption) (string, string, error) +} + // TODO: better dependency injection (db, worker, logger). Waiting for pgxv5 upgrade func StartRiver(numWorkers int) *queue { workers := river.NewWorkers() river.AddWorker(workers, &JobWorker{}) + river.AddWorker(workers, &CleanupJobWorker{}) + + schedule, err := cron.ParseStandard("0 11,23 * * *") + + if err != nil { + panic("Invalid cron schedule") + } + + periodicJobs := []*river.PeriodicJob{ + river.NewPeriodicJob( + schedule, + func() (river.JobArgs, *river.InsertOpts) { + return CleanupJobArgs{}, &river.InsertOpts{ + UniqueOpts: river.UniqueOpts{ + ByArgs: true, + }, + } + }, + &river.PeriodicJobOpts{RunOnStart: true}, + ), + } riverClient, err := river.NewClient(riverpgxv5.New(database.Pgxv5Pool), &river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: numWorkers}, }, // TODO: whats an appropriate timeout? - JobTimeout: -1, // default for river is 1m, que-go had no timeout, mimicking que-go for now - Logger: getSlogLogger(), - Workers: workers, + JobTimeout: -1, // default for river is 1m, que-go had no timeout, mimicking que-go for now + Logger: getSlogLogger(), + Workers: workers, + PeriodicJobs: periodicJobs, }) if err != nil { panic(err) @@ -149,6 +195,102 @@ func (w *JobWorker) Work(ctx context.Context, rjob *river.Job[models.JobEnqueueA return nil } +func (w *CleanupJobWorker) Work(ctx context.Context, rjob *river.Job[CleanupJobArgs]) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + ctx = log.NewStructuredLoggerEntry(log.Worker, ctx) + _, logger := log.SetCtxLogger(ctx, "transaction_id", uuid.New()) + + cutoff := getCutOffTime() + archiveDir := conf.GetEnv("FHIR_ARCHIVE_DIR") + stagingDir := conf.GetEnv("FHIR_STAGING_DIR") + payloadDir := conf.GetEnv("PAYLOAD_DIR") + + params, err := getAWSParams() + + if err != nil { + logger.Error("Unable to extract Slack Token from parameter store: %+v", err) + return err + } + + slackClient := slack.New(params) + + _, _, err = slackClient.PostMessageContext(ctx, slackChannel, slack.MsgOptionText( + fmt.Sprintf("Started Archive and Clean Job Data for environment: %s.", os.Getenv("ENV")), false), + ) + + if err != nil { + logger.Error("Error sending notifier start message: %+v", err) + } + + // Cleanup archived jobs: remove job directory and files from archive and update job status to Expired + if err := w.cleanupJob(cutoff, models.JobStatusArchived, models.JobStatusExpired, archiveDir, stagingDir); err != nil { + logger.Error(errors.Wrap(err, fmt.Sprintf("failed to process job: %s", constants.CleanupArchArg))) + + _, _, err := slackClient.PostMessageContext(ctx, slackChannel, slack.MsgOptionText( + fmt.Sprintf("Failed: %s job in %s env.", constants.CleanupArchArg, os.Getenv("ENV")), false), + ) + if err != nil { + logger.Error("Error sending notifier failure message: %+v", err) + } + + return err + } + + // Cleanup failed jobs: remove job directory and files from failed jobs and update job status to FailedExpired + if err := w.cleanupJob(cutoff, models.JobStatusFailed, models.JobStatusFailedExpired, stagingDir, payloadDir); err != nil { + logger.Error(errors.Wrap(err, fmt.Sprintf("failed to process job: %s", constants.CleanupFailedArg))) + + _, _, err := slackClient.PostMessageContext(ctx, slackChannel, slack.MsgOptionText( + fmt.Sprintf("Failed: %s job in %s env.", constants.CleanupFailedArg, os.Getenv("ENV")), false), + ) + if err != nil { + logger.Error("Error sending notifier failure message: %+v", err) + } + + return err + } + + // Cleanup cancelled jobs: remove job directory and files from cancelled jobs and update job status to CancelledExpired + if err := w.cleanupJob(cutoff, models.JobStatusCancelled, models.JobStatusCancelledExpired, stagingDir, payloadDir); err != nil { + logger.Error(errors.Wrap(err, fmt.Sprintf("failed to process job: %s", constants.CleanupCancelledArg))) + + _, _, err := slackClient.PostMessageContext(ctx, slackChannel, slack.MsgOptionText( + fmt.Sprintf("Failed: %s job in %s env.", constants.CleanupCancelledArg, os.Getenv("ENV")), false), + ) + if err != nil { + logger.Error("Error sending notifier failure message: %+v", err) + } + + return err + } + + // Archive expiring jobs: update job statuses and move files to an inaccessible location + if err := w.archiveExpiring(cutoff); err != nil { + logger.Error(errors.Wrap(err, fmt.Sprintf("failed to process job: %s", constants.ArchiveJobFiles))) + + _, _, err := slackClient.PostMessageContext(ctx, slackChannel, slack.MsgOptionText( + fmt.Sprintf("Failed: %s job in %s env.", constants.ArchiveJobFiles, os.Getenv("ENV")), false), + ) + if err != nil { + logger.Error("Error sending notifier failure message: %+v", err) + } + + return err + } + + _, _, err = slackClient.PostMessageContext(ctx, slackChannel, slack.MsgOptionText( + fmt.Sprintf("SUCCESS: Archive and Clean Job Data for %s environment.", os.Getenv("ENV")), false), + ) + + if err != nil { + logger.Error("Error sending notifier success message: %+v", err) + } + + return nil +} + // TODO: once we remove que library and upgrade to pgx5 we can move the below functions into manager // Update the AWS Cloudwatch Metric for job queue count func updateJobQueueCountCloudwatchMetric(db *sql.DB, log logrus.FieldLogger) { @@ -178,3 +320,28 @@ func getQueueJobCount(db *sql.DB, log logrus.FieldLogger) float64 { return float64(count) } + +func getCutOffTime() time.Time { + cutoff := time.Now().Add(-time.Hour * time.Duration(utils.GetEnvInt("ARCHIVE_THRESHOLD_HR", 24))) + return cutoff +} + +func getAWSParams() (string, error) { + env := conf.GetEnv("ENV") + + if env == "local" { + return conf.GetEnv("workflow-alerts"), nil + } + + bcdaSession, err := bcdaaws.NewSession("", os.Getenv("LOCAL_STACK_ENDPOINT")) + if err != nil { + return "", err + } + + slackToken, err := bcdaaws.GetParameter(bcdaSession, "/slack/token/workflow-alerts") + if err != nil { + return slackToken, err + } + + return slackToken, nil +} diff --git a/bcdaworker/queueing/river_test.go b/bcdaworker/queueing/river_test.go index 72d691270..6df56177b 100644 --- a/bcdaworker/queueing/river_test.go +++ b/bcdaworker/queueing/river_test.go @@ -14,7 +14,10 @@ import ( "github.com/CMSgov/bcda-app/conf" "github.com/ccoveille/go-safecast" "github.com/pborman/uuid" + "github.com/riverqueue/river" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) // These are set in que_test.go @@ -27,7 +30,7 @@ func TestWork_Integration(t *testing.T) { // Set up the logger since we're using the real client client.SetLogger(logger) - // Reset our environment once we've finished with the test + // Reset our environment variables to their original values once we've finished with the test. defer func(origEnqueuer string) { conf.SetEnv(t, "QUEUE_LIBRARY", origEnqueuer) }(conf.GetEnv("QUEUE_LIBRARY")) @@ -98,36 +101,71 @@ func TestWork_Integration(t *testing.T) { } } -// Runs 100k (very simple) jobs to try to test performance, DB connections, etc -// Commented out as something we probably only want to run very occassionally -// func TestProcessJobPerformance_Integration(t *testing.T) { -// defer func(origEnqueuer string) { -// conf.SetEnv(t, "QUEUE_LIBRARY", origEnqueuer) -// }(conf.GetEnv("QUEUE_LIBRARY")) +type MockCleanupJob struct { + mock.Mock +} -// conf.SetEnv(t, "QUEUE_LIBRARY", "river") +type MockArchiveExpiring struct { + mock.Mock +} -// q := StartRiver(1) -// defer q.StopRiver() +func (m *MockCleanupJob) CleanupJob(maxDate time.Time, currentStatus, newStatus models.JobStatus, rootDirsToClean ...string) error { + args := m.Called(maxDate, currentStatus, newStatus, rootDirsToClean) + return args.Error(0) +} + +func (m *MockArchiveExpiring) ArchiveExpiring(maxDate time.Time) error { + args := m.Called(maxDate) + return args.Error(0) +} -// db := database.Connection +func TestCleanupJobWorker_Work(t *testing.T) { + // Set up the logger since we're using the real client + var logger = logrus.New() + client.SetLogger(logger) -// cmsID := testUtils.RandomHexID()[0:4] -// aco := models.ACO{UUID: uuid.NewRandom(), CMSID: &cmsID} -// postgrestest.CreateACO(t, db, aco) -// job := models.Job{ACOID: aco.UUID, Status: models.JobStatusPending} -// postgrestest.CreateJobs(t, db, &job) -// bbPath := uuid.New() + // Create mock objects + mockCleanupJob := new(MockCleanupJob) + mockArchiveExpiring := new(MockArchiveExpiring) + + const archivePath = "/path/to/archive" + const stagingPath = "/path/to/staging" + const payloadPath = "/path/to/payload" + + // Save and set environment variables using conf.SetEnv and defer to reset them + defer func(archiveDir, stagingDir, payloadDir string) { + conf.SetEnv(t, "FHIR_ARCHIVE_DIR", archiveDir) + conf.SetEnv(t, "FHIR_STAGING_DIR", stagingDir) + conf.SetEnv(t, "PAYLOAD_DIR", payloadDir) + }(conf.GetEnv("FHIR_ARCHIVE_DIR"), conf.GetEnv("FHIR_STAGING_DIR"), conf.GetEnv("PAYLOAD_DIR")) + + conf.SetEnv(t, "FHIR_ARCHIVE_DIR", archivePath) + conf.SetEnv(t, "FHIR_STAGING_DIR", stagingPath) + conf.SetEnv(t, "PAYLOAD_DIR", payloadPath) + + mockCleanupJob.On("CleanupJob", mock.AnythingOfType("time.Time"), models.JobStatusArchived, models.JobStatusExpired, []string{archivePath, stagingPath}).Return(nil) + mockCleanupJob.On("CleanupJob", mock.AnythingOfType("time.Time"), models.JobStatusFailed, models.JobStatusFailedExpired, []string{stagingPath, payloadPath}).Return(nil) + mockCleanupJob.On("CleanupJob", mock.AnythingOfType("time.Time"), models.JobStatusCancelled, models.JobStatusCancelledExpired, []string{stagingPath, payloadPath}).Return(nil) + mockArchiveExpiring.On("ArchiveExpiring", mock.AnythingOfType("time.Time")).Return(nil) + + // Create a worker instance + cleanupJobWorker := &CleanupJobWorker{ + cleanupJob: mockCleanupJob.CleanupJob, + archiveExpiring: mockArchiveExpiring.ArchiveExpiring, + } -// defer postgrestest.DeleteACO(t, db, aco.UUID) + // Create a mock river.Job + mockJob := &river.Job[CleanupJobArgs]{ + Args: CleanupJobArgs{}, + } -// jobID, _ := safecast.ToInt(job.ID) + // Call the Work function + err := cleanupJobWorker.Work(context.Background(), mockJob) -// enqueuer := NewEnqueuer() + // Assert that there was no error + assert.NoError(t, err) -// for i := 0; i <= 100_000; i++ { -// jobArgs := models.JobEnqueueArgs{ID: jobID, ACOID: cmsID, BBBasePath: bbPath} -// err := enqueuer.AddJob(context.Background(), jobArgs, 1) -// assert.NoError(t, err) -// } -// } + // Assert that all expectations were met + mockCleanupJob.AssertExpectations(t) + mockArchiveExpiring.AssertExpectations(t) +} diff --git a/go.mod b/go.mod index f82d9deff..74b84418c 100644 --- a/go.mod +++ b/go.mod @@ -48,6 +48,7 @@ require ( github.com/ccoveille/go-safecast v1.1.0 github.com/pashagolub/pgxmock/v4 v4.4.0 github.com/riverqueue/river v0.14.1 + github.com/robfig/cron/v3 v3.0.1 ) require (