diff --git a/.gitignore b/.gitignore index 71fb2bf5d..b4bfff911 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,7 @@ bcdaworker/tmpdata/* bcda/pending_delete_dir/* .vault_password shared_files/decrypted/* -!shared_files/decrypted/README.md \ No newline at end of file +!shared_files/decrypted/README.md + +# Localstack volume +.localstack_volume/ diff --git a/bcda/bcdacli/cli.go b/bcda/bcdacli/cli.go index 0ba7c87fe..ef4d6561b 100644 --- a/bcda/bcdacli/cli.go +++ b/bcda/bcdacli/cli.go @@ -68,7 +68,7 @@ func setUpApp() *cli.App { r = postgres.NewRepository(db) return nil } - var acoName, acoCMSID, acoID, accessToken, acoSize, filePath, dirToDelete, environment, groupID, groupName, ips, fileType, alrFile string + var acoName, acoCMSID, acoID, accessToken, acoSize, filePath, fileSource, s3Endpoint, assumeRoleArn, dirToDelete, environment, groupID, groupName, ips, fileType, alrFile string var thresholdHr int var httpPort, httpsPort int app.Commands = []cli.Command{ @@ -475,17 +475,45 @@ func setUpApp() *cli.App { Usage: "Directory where suppression files are located", Destination: &filePath, }, + cli.StringFlag{ + Name: "filesource", + Usage: "Source of files. Must be one of 'local', 's3'. Defaults to 'local'", + Destination: &fileSource, + }, + cli.StringFlag{ + Name: "s3endpoint", + Usage: "Custom S3 endpoint", + Destination: &s3Endpoint, + }, + cli.StringFlag{ + Name: "assume-role-arn", + Usage: "Optional IAM role ARN to assume for S3", + Destination: &assumeRoleArn, + }, }, Action: func(c *cli.Context) error { ignoreSignals() db := database.Connection r := postgres.NewRepository(db) - importer := suppression.OptOutImporter{ - FileHandler: &optout.LocalFileHandler{ + + var file_handler optout.OptOutFileHandler + + if fileSource == "s3" { + file_handler = &optout.S3FileHandler{ + Logger: log.API, + Endpoint: s3Endpoint, + AssumeRoleArn: assumeRoleArn, + } + } else { + file_handler = &optout.LocalFileHandler{ Logger: log.API, PendingDeletionDir: conf.GetEnv("PENDING_DELETION_DIR"), FileArchiveThresholdHr: uint(utils.GetEnvInt("FILE_ARCHIVE_THRESHOLD_HR", 72)), - }, + } + } + + importer := suppression.OptOutImporter{ + FileHandler: file_handler, Saver: &suppression.BCDASaver{ Repo: r, }, diff --git a/bcda/bcdacli/cli_test.go b/bcda/bcdacli/cli_test.go index d56c316ea..18a8db441 100644 --- a/bcda/bcdacli/cli_test.go +++ b/bcda/bcdacli/cli_test.go @@ -776,7 +776,7 @@ func (s *CLITestSuite) TestDeleteDirectoryContents() { } -func (s *CLITestSuite) TestImportSuppressionDirectory() { +func (s *CLITestSuite) TestImportSuppressionDirectoryFromLocal() { assert := assert.New(s.T()) buf := new(bytes.Buffer) @@ -803,6 +803,33 @@ func (s *CLITestSuite) TestImportSuppressionDirectory() { } } +func (s *CLITestSuite) TestImportSuppressionDirectoryFromS3() { + assert := assert.New(s.T()) + + buf := new(bytes.Buffer) + s.testApp.Writer = buf + + path, cleanup := testUtils.CopyToS3(s.T(), "../../shared_files/synthetic1800MedicareFiles/test2/") + defer cleanup() + + args := []string{"bcda", constants.ImportSupDir, constants.DirectoryArg, path, constants.FileSourceArg, "s3", constants.S3EndpointArg, conf.GetEnv("BFD_S3_ENDPOINT")} + err := s.testApp.Run(args) + assert.Nil(err) + assert.Contains(buf.String(), constants.CompleteMedSupDataImp) + assert.Contains(buf.String(), "Files imported: 2") + assert.Contains(buf.String(), "Files failed: 0") + assert.Contains(buf.String(), "Files skipped: 0") + + fs := postgrestest.GetSuppressionFileByName(s.T(), s.db, + "T#EFT.ON.ACO.NGD1800.DPRF.D181120.T1000010", + "T#EFT.ON.ACO.NGD1800.DPRF.D190816.T0241391") + + assert.Len(fs, 2) + for _, f := range fs { + postgrestest.DeleteSuppressionFileByID(s.T(), s.db, f.ID) + } +} + func (s *CLITestSuite) TestImportSuppressionDirectory_Skipped() { assert := assert.New(s.T()) diff --git a/bcda/constants/test_constants.go b/bcda/constants/test_constants.go index 60d3e24b3..881158ba7 100644 --- a/bcda/constants/test_constants.go +++ b/bcda/constants/test_constants.go @@ -64,6 +64,8 @@ const ArchJobFiles = "archive-job-files" const DelDirContents = "delete-dir-contents" const ImportSupDir = "import-suppression-directory" const DirectoryArg = "--directory" +const FileSourceArg = "--filesource" +const S3EndpointArg = "--s3endpoint" const TestExcludeSAMHSA = "excludeSAMHSA=true" const TestSvcDate = "service-date" const FakeClientID = "fake-client-id" diff --git a/bcda/suppression/suppression_s3_test.go b/bcda/suppression/suppression_s3_test.go new file mode 100644 index 000000000..9fcba0386 --- /dev/null +++ b/bcda/suppression/suppression_s3_test.go @@ -0,0 +1,349 @@ +package suppression + +import ( + "fmt" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + log "github.com/sirupsen/logrus" + + "github.com/CMSgov/bcda-app/bcda/constants" + "github.com/CMSgov/bcda-app/bcda/database" + "github.com/CMSgov/bcda-app/bcda/models/postgres" + "github.com/CMSgov/bcda-app/bcda/models/postgres/postgrestest" + "github.com/CMSgov/bcda-app/bcda/testUtils" + "github.com/CMSgov/bcda-app/bcda/utils" + "github.com/CMSgov/bcda-app/conf" + "github.com/CMSgov/bcda-app/optout" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +type SuppressionS3TestSuite struct { + suite.Suite +} + +func (s *SuppressionS3TestSuite) createImporter() (OptOutImporter, *optout.FakeSaver) { + saver := optout.FakeSaver{} + return OptOutImporter{ + FileHandler: &optout.S3FileHandler{ + Logger: log.StandardLogger(), + Endpoint: conf.GetEnv("BFD_S3_ENDPOINT"), + }, + Saver: &saver, + Logger: log.StandardLogger(), + ImportStatusInterval: utils.GetEnvInt("SUPPRESS_IMPORT_STATUS_RECORDS_INTERVAL", 1000), + }, &saver +} + +func TestSuppressionS3TestSuite(t *testing.T) { + suite.Run(t, new(SuppressionS3TestSuite)) +} + +func (s *SuppressionS3TestSuite) TestImportSuppression() { + assert := assert.New(s.T()) + bucketName, cleanup := testUtils.CopyToS3(s.T(), "../../shared_files/synthetic1800MedicareFiles/test/T#EFT.ON.ACO.NGD1800.DPRF.D181120.T1000009") + defer cleanup() + + // 181120 file + fileTime, _ := time.Parse(time.RFC3339, "2018-11-20T10:00:00Z") + metadata := &optout.OptOutFilenameMetadata{ + Timestamp: fileTime, + FilePath: filepath.Join(bucketName, "synthetic1800MedicareFiles/test/T#EFT.ON.ACO.NGD1800.DPRF.D181120.T1000009"), + Name: constants.TestSuppressMetaFileName, + DeliveryDate: time.Now(), + } + + importer, saver := s.createImporter() + err := importer.ImportSuppressionData(metadata) + assert.Nil(err) + assert.Len(saver.Files, 1) + + suppressionFile := saver.Files[0] + assert.Equal(constants.TestSuppressMetaFileName, suppressionFile.Name) + assert.Equal(fileTime.Format("010203040506"), suppressionFile.Timestamp.UTC().Format("010203040506")) + assert.Equal(constants.ImportComplete, suppressionFile.ImportStatus) + + suppressions := saver.OptOutRecords + assert.Len(suppressions, 4) + assert.Equal("5SJ0A00AA00", suppressions[0].MBI) + assert.Equal("1-800", suppressions[0].SourceCode) + assert.Equal("4SF6G00AA00", suppressions[1].MBI) + assert.Equal("1-800", suppressions[1].SourceCode) + assert.Equal("4SH0A00AA00", suppressions[2].MBI) + assert.Equal("", suppressions[2].SourceCode) + assert.Equal("8SG0A00AA00", suppressions[3].MBI) + assert.Equal("1-800", suppressions[3].SourceCode) + + // 190816 file T#EFT.ON.ACO.NGD1800.DPRF.D190816.T0241390 + bucketName, cleanup = testUtils.CopyToS3(s.T(), "../../shared_files/synthetic1800MedicareFiles/test/T#EFT.ON.ACO.NGD1800.DPRF.D190816.T0241390") + defer cleanup() + + fileTime, _ = time.Parse(time.RFC3339, "2019-08-16T02:41:39Z") + metadata = &optout.OptOutFilenameMetadata{ + Timestamp: fileTime, + FilePath: filepath.Join(bucketName, "synthetic1800MedicareFiles/test/T#EFT.ON.ACO.NGD1800.DPRF.D190816.T0241390"), + Name: "T#EFT.ON.ACO.NGD1800.DPRF.D190816.T0241390", + DeliveryDate: time.Now(), + } + + importer, saver = s.createImporter() + err = importer.ImportSuppressionData(metadata) + assert.Nil(err) + assert.Len(saver.Files, 1) + + suppressionFile = saver.Files[0] + assert.Equal("T#EFT.ON.ACO.NGD1800.DPRF.D190816.T0241390", suppressionFile.Name) + assert.Equal(fileTime.Format("010203040506"), suppressionFile.Timestamp.UTC().Format("010203040506")) + + suppressions = saver.OptOutRecords + assert.Len(suppressions, 250) + assert.Equal("1000000019", suppressions[0].MBI) + assert.Equal("N", suppressions[0].PrefIndicator) + assert.Equal("1000039915", suppressions[20].MBI) + assert.Equal("N", suppressions[20].PrefIndicator) + assert.Equal("1000099805", suppressions[100].MBI) + assert.Equal("N", suppressions[100].PrefIndicator) + assert.Equal("1000026399", suppressions[200].MBI) + assert.Equal("N", suppressions[200].PrefIndicator) + assert.Equal("1000098787", suppressions[249].MBI) + assert.Equal("", suppressions[249].PrefIndicator) +} + +func (s *SuppressionS3TestSuite) TestImportSuppression_MissingData() { + assert := assert.New(s.T()) + + // Verify empty file is rejected + metadata := &optout.OptOutFilenameMetadata{} + importer, _ := s.createImporter() + err := importer.ImportSuppressionData(metadata) + assert.NotNil(err) + assert.Contains(err.Error(), "could not read file") + + tests := []struct { + name string + expErr string + dbError bool + }{ + {"T#EFT.ON.ACO.NGD1800.DPRF.D181120.T1000011", "failed to parse the effective date '20191301' from file", false}, + {"T#EFT.ON.ACO.NGD1800.DPRF.D181120.T1000012", "failed to parse the samhsa effective date '20191301' from file", false}, + {"T#EFT.ON.ACO.NGD1800.DPRF.D181120.T1000013", "failed to parse beneficiary link key from file", false}, + {"T#EFT.ON.ACO.NGD1800.DPRF.D181120.T1000011", "could not create suppression file record for file", true}, + } + + for _, tt := range tests { + s.T().Run(tt.name, func(t *testing.T) { + bucketName, cleanup := testUtils.CopyToS3(s.T(), fmt.Sprintf("../../shared_files/suppressionfile_MissingData/%s", tt.name)) + defer cleanup() + + fp := filepath.Join(bucketName, "suppressionfile_MissingData/"+tt.name) + metadata = &optout.OptOutFilenameMetadata{ + Timestamp: time.Now(), + FilePath: fp, + Name: tt.name, + DeliveryDate: time.Now(), + } + + importer, saver := s.createImporter() + + if tt.dbError { + db, _, err := sqlmock.New() + assert.NoError(err) + db.Close() + + importer.Saver = &BCDASaver{ + Repo: postgres.NewRepository(db), + } + } + + err = importer.ImportSuppressionData(metadata) + assert.NotNil(err) + assert.Contains(err.Error(), fmt.Sprintf("%s: %s", tt.expErr, fp)) + + if !tt.dbError { + suppressionFile := saver.Files[0] + assert.Equal(constants.ImportFail, suppressionFile.ImportStatus) + } + }) + } +} + +func (s *SuppressionS3TestSuite) TestValidate() { + assert := assert.New(s.T()) + importer, _ := s.createImporter() + + // positive + bucketName, cleanup := testUtils.CopyToS3(s.T(), "../../shared_files/synthetic1800MedicareFiles/test/T#EFT.ON.ACO.NGD1800.DPRF.D181120.T1000009") + defer cleanup() + + suppressionfilePath := filepath.Join(bucketName, "synthetic1800MedicareFiles/test/T#EFT.ON.ACO.NGD1800.DPRF.D181120.T1000009") + metadata := &optout.OptOutFilenameMetadata{Timestamp: time.Now(), FilePath: suppressionfilePath} + err := importer.validate(metadata) + assert.Nil(err) + + // bad file path + metadata.FilePath = metadata.FilePath + "/blah/" + err = importer.validate(metadata) + assert.NotNil(err) + assert.Contains(err.Error(), "could not read file "+metadata.FilePath) + + // invalid file header + bucketName, cleanup = testUtils.CopyToS3(s.T(), "../../shared_files/suppressionfile_BadHeader/T#EFT.ON.ACO.NGD1800.DPRF.D181120.T1000009") + defer cleanup() + + metadata.FilePath = filepath.Join(bucketName, "suppressionfile_BadHeader/T#EFT.ON.ACO.NGD1800.DPRF.D181120.T1000009") + err = importer.validate(metadata) + assert.EqualError(err, "invalid file header for file: "+metadata.FilePath) + + // missing record count + bucketName, cleanup = testUtils.CopyToS3(s.T(), "../../shared_files/suppressionfile_MissingData/T#EFT.ON.ACO.NGD1800.DPRF.D181120.T1000009") + defer cleanup() + + metadata.FilePath = filepath.Join(bucketName, "suppressionfile_MissingData/T#EFT.ON.ACO.NGD1800.DPRF.D181120.T1000009") + err = importer.validate(metadata) + assert.EqualError(err, "failed to parse record count from file: "+metadata.FilePath) + + // incorrect record count + bucketName, cleanup = testUtils.CopyToS3(s.T(), "../../shared_files/suppressionfile_MissingData/T#EFT.ON.ACO.NGD1800.DPRF.D181120.T1000010") + defer cleanup() + + metadata.FilePath = filepath.Join(bucketName, "suppressionfile_MissingData/T#EFT.ON.ACO.NGD1800.DPRF.D181120.T1000010") + err = importer.validate(metadata) + assert.EqualError(err, "incorrect number of records found from file: '"+metadata.FilePath+"'. Expected record count: 5, Actual record count: 4") +} + +func (s *SuppressionS3TestSuite) TestLoadOptOutFiles() { + assert := assert.New(s.T()) + importer, _ := s.createImporter() + + bucketName, cleanup := testUtils.CopyToS3(s.T(), fmt.Sprintf("../../shared_files/%s", constants.TestSynthMedFilesPath)) + defer cleanup() + + filePath := filepath.Join(bucketName, constants.TestSynthMedFilesPath) + suppresslist, skipped, err := importer.FileHandler.LoadOptOutFiles(filePath) + assert.Nil(err) + assert.Equal(2, len(*suppresslist)) + assert.Equal(0, skipped) + + bucketName, cleanup = testUtils.CopyToS3(s.T(), "../../shared_files/suppressionfile_BadFileNames/") + defer cleanup() + + filePath = filepath.Join(bucketName, "suppressionfile_BadFileNames/") + suppresslist, skipped, err = importer.FileHandler.LoadOptOutFiles(filePath) + assert.Nil(err) + assert.Equal(0, len(*suppresslist)) + assert.Equal(2, skipped) +} + +func (s *SuppressionS3TestSuite) TestCleanupSuppression() { + assert := assert.New(s.T()) + importer, _ := s.createImporter() + + var suppresslist []*optout.OptOutFilenameMetadata + + // failed import: file that's within the threshold - stay put + fileTime, _ := time.Parse(time.RFC3339, "2018-11-20T10:00:09Z") + + bucketName := "doesnt-matter" + + metadata := &optout.OptOutFilenameMetadata{ + Name: constants.TestSuppressMetaFileName, + Timestamp: fileTime, + FilePath: filepath.Join(bucketName, "suppressionfile_BadHeader/T#EFT.ON.ACO.NGD1800.DPRF.D181120.T1000009"), + Imported: false, + DeliveryDate: time.Now(), + } + + // failed import: file that's over the threshold - should still be kept until S3 auto-cleanup + fileTime, _ = time.Parse(time.RFC3339, "2018-11-20T10:00:00Z") + + metadata2 := &optout.OptOutFilenameMetadata{ + Name: constants.TestSuppressBadPath, + Timestamp: fileTime, + FilePath: filepath.Join(bucketName, "suppressionfile_BadFileNames/T#EFT.ON.ACO.NGD1800.FRPD.D191220.T1000009"), + Imported: false, + DeliveryDate: fileTime, + } + + bucketName, cleanup := testUtils.CopyToS3(s.T(), "../../shared_files/suppressionfile_BadFileNames/T#EFT.ON.ACO.NGD1800.DPRF.D190117.T9909420") + defer cleanup() + + // successful import: should move + metadata3 := &optout.OptOutFilenameMetadata{ + Name: "T#EFT.ON.ACO.NGD1800.DPRF.D190117.T9909420", + Timestamp: fileTime, + FilePath: filepath.Join(bucketName, "suppressionfile_BadFileNames/T#EFT.ON.ACO.NGD1800.DPRF.D190117.T9909420"), + Imported: true, + DeliveryDate: time.Now(), + } + + suppresslist = []*optout.OptOutFilenameMetadata{metadata, metadata2, metadata3} + err := importer.FileHandler.CleanupOptOutFiles(suppresslist) + assert.Nil(err) + + objects := testUtils.ListS3Objects(s.T(), bucketName, "") + assert.True(len(objects) == 0) +} + +func (s *SuppressionS3TestSuite) TestImportSuppressionDirectoryTable() { + assert := assert.New(s.T()) + importer, _ := s.createImporter() + db := database.Connection + + importer.Saver = &BCDASaver{ + Repo: postgres.NewRepository(db), + } + + tests := []struct { + name string + directory string + success int + failure int + skipped int + errorExpected bool + errMessage string + deleteFiles bool + insertCarriage bool + }{ + {name: "Valid test", directory: "../../shared_files/synthetic1800MedicareFiles/test2/", success: 2, failure: 0, skipped: 0, errorExpected: false, errMessage: "", deleteFiles: true}, + {name: "Import failure", directory: "../../shared_files/suppressionfile_BadHeader/", success: 0, failure: 1, skipped: 0, errorExpected: true, errMessage: "one or more suppression files failed to import correctly", deleteFiles: false}, + {name: "Skipped import", directory: "../../shared_files/suppressionfile_BadFileNames/", success: 0, failure: 0, skipped: 2, errorExpected: false, errMessage: "", deleteFiles: false}, + {name: "Carriage char in path", directory: "../../shared_files/suppressionfile_BadFileNames/", success: 0, failure: 0, skipped: 0, errorExpected: true, errMessage: "The specified bucket does not exist", deleteFiles: false, insertCarriage: true}, + } + + for _, tt := range tests { + s.T().Run(tt.name, func(t *testing.T) { + bucketName, cleanup := testUtils.CopyToS3(s.T(), tt.directory) + defer cleanup() + + if tt.insertCarriage { + bucketName += "\n" + } + + success, failure, skipped, err := importer.ImportSuppressionDirectory(bucketName) + if tt.errorExpected { + assert.Equal(true, strings.Contains(err.Error(), tt.errMessage)) + } else { + assert.Nil(err) + } + assert.Equal(tt.success, success) + assert.Equal(tt.failure, failure) + assert.Equal(tt.skipped, skipped) + + if tt.deleteFiles { + fs := postgrestest.GetSuppressionFileByName(s.T(), db, + "T#EFT.ON.ACO.NGD1800.DPRF.D181120.T1000010", + "T#EFT.ON.ACO.NGD1800.DPRF.D190816.T0241391") + assert.Len(fs, 2) + for _, f := range fs { + postgrestest.DeleteSuppressionFileByID(s.T(), db, f.ID) + } + } + + }) + } +} diff --git a/bcda/suppression/suppression_test.go b/bcda/suppression/suppression_test.go index a427c9ffa..b12589850 100644 --- a/bcda/suppression/suppression_test.go +++ b/bcda/suppression/suppression_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/DATA-DOG/go-sqlmock" log "github.com/sirupsen/logrus" "github.com/CMSgov/bcda-app/bcda/constants" @@ -167,13 +168,15 @@ func (s *SuppressionTestSuite) TestImportSuppression_MissingData() { } importer, saver := s.createImporter() - db := database.Connection if tt.dbError { + db, _, err := sqlmock.New() + assert.NoError(err) + db.Close() + importer.Saver = &BCDASaver{ Repo: postgres.NewRepository(db), } - db.Close() } err = importer.ImportSuppressionData(metadata) diff --git a/bcda/testUtils/utils.go b/bcda/testUtils/utils.go index 11bc26207..a4f897c91 100644 --- a/bcda/testUtils/utils.go +++ b/bcda/testUtils/utils.go @@ -14,12 +14,18 @@ import ( "os" "path/filepath" "strconv" + "strings" "testing" "time" "github.com/CMSgov/bcda-app/bcda/constants" "github.com/CMSgov/bcda-app/conf" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/go-chi/chi/v5" + "github.com/google/uuid" "github.com/sirupsen/logrus" "github.com/otiai10/copy" @@ -133,6 +139,121 @@ func CopyToTemporaryDirectory(t *testing.T, src string) (string, func()) { return newPath, cleanup } +// CopyToS3 copies all of the content found at src into a temporary S3 folder within localstack. +// The path to the temporary S3 directory is returned along with a function that can be called to clean up the data. +func CopyToS3(t *testing.T, src string) (string, func()) { + tempBucket, err := uuid.NewUUID() + + if err != nil { + t.Fatalf("Failed to generate temporary path name: %s", err.Error()) + } + + endpoint := conf.GetEnv("BFD_S3_ENDPOINT") + + config := aws.Config{ + Region: aws.String("us-east-1"), + S3ForcePathStyle: aws.Bool(true), + Endpoint: &endpoint, + } + + sess, err := session.NewSessionWithOptions(session.Options{ + Config: config, + }) + + if err != nil { + t.Fatalf("Failed to create new session for S3: %s", err.Error()) + } + + svc := s3.New(sess) + + _, err = svc.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(tempBucket.String()), + }) + + if err != nil { + t.Fatalf("Failed to create bucket %s: %s", tempBucket.String(), err.Error()) + } + + uploader := s3manager.NewUploader(sess) + + err = filepath.Walk(src, func(path string, info os.FileInfo, err error) error { + if err != nil { + t.Fatalf("Unexpected error reading path") + } + + if info.IsDir() { + return nil + } + + f, err := os.Open(filepath.Clean(path)) + if err != nil { + return err + } + + key := strings.TrimPrefix(path, "../../shared_files/") + + _, err = uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(tempBucket.String()), + Key: aws.String(key), + Body: f, + }) + + fmt.Printf("Uploaded file in bucket %s, key %s\n", tempBucket.String(), info.Name()) + return err + }) + + if err != nil { + t.Fatalf("Failed to upload files to S3: %s", err.Error()) + } + + cleanup := func() { + svc := s3.New(sess) + iter := s3manager.NewDeleteListIterator(svc, &s3.ListObjectsInput{ + Bucket: aws.String(tempBucket.String()), + }) + + // Traverse iterator deleting each object + if err := s3manager.NewBatchDeleteWithClient(svc).Delete(aws.BackgroundContext(), iter); err != nil { + log.Printf("Unable to delete objects from bucket %s, %s\n", tempBucket, err) + } + } + + return tempBucket.String(), cleanup +} + +func ListS3Objects(t *testing.T, bucket string, prefix string) []*s3.Object { + endpoint := conf.GetEnv("BFD_S3_ENDPOINT") + + config := aws.Config{ + Region: aws.String("us-east-1"), + S3ForcePathStyle: aws.Bool(true), + Endpoint: &endpoint, + } + + sess, err := session.NewSessionWithOptions(session.Options{ + Config: config, + }) + + if err != nil { + t.Fatalf("Failed to create new session for S3: %s", err.Error()) + } + + svc := s3.New(sess) + + fmt.Printf("Listing objects in bucket %s, prefix %s", bucket, prefix) + + resp, err := svc.ListObjects(&s3.ListObjectsInput{ + Bucket: aws.String(bucket), + Prefix: aws.String(prefix), + }) + + if err != nil { + t.Fatalf("Failed to list objects in S3 bucket %s, prefix %s: %s", bucket, prefix, err) + } + + return resp.Contents +} + // GetRandomIPV4Address returns a random IPV4 address using rand.Read() to generate the values. func GetRandomIPV4Address(t *testing.T) string { data := make([]byte, 3) @@ -180,12 +301,12 @@ func MakeTestServerWithIntrospectEndpoint(activeToken bool) *httptest.Server { ) buf, err := ioutil.ReadAll(r.Body) if err != nil { - fmt.Printf("Unexpected error creating test server: Error in reading request body: %s", err.Error()) + fmt.Printf("Unexpected error creating test server: Error in reading request body: %s\n", err.Error()) return } if unmarshalErr := json.Unmarshal(buf, &input); unmarshalErr != nil { - fmt.Printf("Unexpected error creating test server: Error in unmarshalling the buffered input to JSON: %s", unmarshalErr.Error()) + fmt.Printf("Unexpected error creating test server: Error in unmarshalling the buffered input to JSON: %s\n", unmarshalErr.Error()) return } @@ -195,7 +316,7 @@ func MakeTestServerWithIntrospectEndpoint(activeToken bool) *httptest.Server { _, responseWriterErr := w.Write(body) if responseWriterErr != nil { - fmt.Printf("Unexpected error creating test server: Error reading request body: %s", responseWriterErr.Error()) + fmt.Printf("Unexpected error creating test server: Error reading request body: %s\n", responseWriterErr.Error()) } }) diff --git a/docker-compose.test.yml b/docker-compose.test.yml index 88494fadc..6dba1b2bc 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -6,9 +6,17 @@ services: build: context: . dockerfile: Dockerfiles/Dockerfile.tests + depends_on: + localstack: + condition: service_healthy env_file: - ./shared_files/decrypted/local.env environment: + # Set default values for Localstack to work + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-foobar} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-foobar} + - BFD_S3_ENDPOINT=${BFD_S3_ENDPOINT:-http://localstack:4566} + - DATABASE_URL=postgresql://postgres:toor@db-unit-test:5432/bcda_test?sslmode=disable - QUEUE_DATABASE_URL=postgresql://postgres:toor@db-unit-test:5432/bcda_test?sslmode=disable - GOLANGCI_LINT_CACHE=/root/.cache/go-build @@ -27,7 +35,26 @@ services: volumes: - ./db/testing/docker-entrypoint-initdb.d/:/docker-entrypoint-initdb.d/ # Pass a flag so we'll log all queries executed on the test db. - command: ["postgres", "-c", "log_statement=all"] + command: [ "postgres", "-c", "log_statement=all" ] + # Spin up a local S3 server for CCLF and Opt Out File import testing + localstack: + image: localstack/localstack:latest + environment: + - AWS_DEFAULT_REGION=us-east-1 + - GATEWAY_LISTEN=0.0.0.0:4566 + - SERVICES=s3 + - DEBUG=1 + ports: + - '4566-4583:4566-4583' + volumes: + - "./.localstack_volume:/var/lib/localstack" + - '/var/run/docker.sock:/var/run/docker.sock' + healthcheck: + test: "curl --silent --fail localstack:4566/_localstack/health | grep '\"s3\": \"available\"'" + interval: 10s + retries: 12 + start_period: 30s + timeout: 10s postman_test: build: context: . diff --git a/docker-compose.yml b/docker-compose.yml index a017cce57..e7d4d6b58 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -25,7 +25,7 @@ services: args: ENVIRONMENT: development entrypoint: "" - command: ["../scripts/watch.sh", "api", "bcda", "start-api"] + command: [ "../scripts/watch.sh", "api", "bcda", "start-api" ] env_file: - ./shared_files/decrypted/local.env environment: @@ -48,7 +48,7 @@ services: args: ENVIRONMENT: development entrypoint: "" - command: ["../scripts/watch.sh", "worker", "bcdaworker"] + command: [ "../scripts/watch.sh", "worker", "bcdaworker" ] env_file: - ./shared_files/decrypted/local.env environment: diff --git a/go.mod b/go.mod index cd04cbaa1..12db73bde 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/BurntSushi/toml v0.4.1 github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/Pallinder/go-randomdata v1.2.0 - github.com/aws/aws-sdk-go v1.44.47 + github.com/aws/aws-sdk-go v1.48.9 github.com/bgentry/que-go v1.0.1 github.com/cenkalti/backoff/v4 v4.1.3 github.com/dgrijalva/jwt-go v3.2.1-0.20180309185540-3c771ce311b7+incompatible @@ -38,8 +38,8 @@ require ( github.com/tsenart/vegeta v12.7.0+incompatible github.com/urfave/cli v1.22.9 github.com/xo/usql v0.8.2 - golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d - golang.org/x/text v0.3.7 + golang.org/x/crypto v0.14.0 + golang.org/x/text v0.13.0 gotest.tools/gotestsum v1.6.2 ) @@ -198,13 +198,13 @@ require ( go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.17.0 // indirect - golang.org/x/mod v0.5.1 // indirect + golang.org/x/mod v0.8.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f // indirect - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect - golang.org/x/sys v0.0.0-20220823224334-20c2bfdbfe24 // indirect - golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect - golang.org/x/tools v0.1.9 // indirect + golang.org/x/sync v0.1.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/term v0.13.0 // indirect + golang.org/x/tools v0.6.0 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect gonum.org/v1/gonum v0.11.0 // indirect google.golang.org/api v0.56.0 // indirect diff --git a/go.sum b/go.sum index f43e77b83..6a6b801be 100644 --- a/go.sum +++ b/go.sum @@ -201,6 +201,8 @@ github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/ github.com/aws/aws-sdk-go v1.37.25/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/aws/aws-sdk-go v1.44.47 h1:uyiNvoR4wfZ8Bp4ghgbyzGFIg5knjZMUAd5S9ba9qNU= github.com/aws/aws-sdk-go v1.44.47/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= +github.com/aws/aws-sdk-go v1.48.9 h1:vqzjg5FCi/QDWTEenBs65gu57GJdvkqZ0+5steFb44g= +github.com/aws/aws-sdk-go v1.48.9/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= github.com/bazelbuild/rules_go v0.24.5 h1:8S5qilf+Il5/TPMZQIOfzQDAZtkhB4jALiAnwRuisDM= github.com/bazelbuild/rules_go v0.24.5/go.mod h1:MC23Dc/wkXEyk3Wpq6lCqz0ZAYOZDw2DR5y3N1q2i7M= github.com/beltran/gohive v1.4.0 h1:aACOAXFo1ok3D4eRXmKxWuqxJ/1aje7mzUhAADgPq0A= @@ -1538,6 +1540,7 @@ golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1589,6 +1592,7 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.1 h1:OJxoQ/rynoF0dcCdI7cLPktw/hR2cueqYfjm43oqK38= golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1654,6 +1658,7 @@ golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220822230855-b0a4917ee28c h1:JVAXQ10yGGVbSyoer5VILysz6YKjdNT2bsvlayjqhes= golang.org/x/net v0.0.0-20220822230855-b0a4917ee28c/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180227000427-d7d64896b5ff/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1685,6 +1690,7 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180224232135-f6cff0780e54/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1797,10 +1803,12 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220823224334-20c2bfdbfe24 h1:TyKJRhyo17yWxOMCTHKWrc5rddHORMlnZ/j57umaUd8= golang.org/x/sys v0.0.0-20220823224334-20c2bfdbfe24/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1812,6 +1820,7 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1908,6 +1917,7 @@ golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.9 h1:j9KsMiaP1c3B0OTQGth0/k+miLGTgLsAFUCrF2vLcF8= golang.org/x/tools v0.1.9/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/optout/file_handler.go b/optout/file_handler.go index dafafe05e..5ee4ad312 100644 --- a/optout/file_handler.go +++ b/optout/file_handler.go @@ -7,7 +7,14 @@ import ( // File handlers can load opt out files from a given source and can optionally clean them up afterwards. // This interface allows us to implement file loading from multiple sources, including local directories and AWS S3. type OptOutFileHandler interface { + // Load opt out files from the given path. + // + // Return a list of metadata parsed from valid filenames, + // and the number of files skipped due to unknown filenames. LoadOptOutFiles(path string) (suppressList *[]*OptOutFilenameMetadata, skipped int, err error) + // Cleanup any opt out files that were successfully imported, and handle + // any files that failed to be imported. CleanupOptOutFiles(suppressList []*OptOutFilenameMetadata) error + // Open a given opt out file, specified by the metadata struct. OpenFile(metadata *OptOutFilenameMetadata) (*bufio.Scanner, func(), error) } diff --git a/optout/local_file_handler.go b/optout/local_file_handler.go index d769764d9..225399a54 100644 --- a/optout/local_file_handler.go +++ b/optout/local_file_handler.go @@ -11,6 +11,7 @@ import ( "github.com/sirupsen/logrus" ) +// LocalFileHandler manages files from local directories. type LocalFileHandler struct { Logger logrus.FieldLogger PendingDeletionDir string diff --git a/optout/s3_file_handler.go b/optout/s3_file_handler.go index f87aa8488..89a5a77f2 100644 --- a/optout/s3_file_handler.go +++ b/optout/s3_file_handler.go @@ -1 +1,220 @@ package optout + +import ( + "bufio" + "bytes" + "fmt" + "strings" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials/stscreds" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/sirupsen/logrus" +) + +// S3FileHandler manages files located on AWS S3. +type S3FileHandler struct { + Logger logrus.FieldLogger + // Optional S3 endpoint to use for connection. + Endpoint string + // Optional role to assume when connecting to S3. + AssumeRoleArn string +} + +// Define logger functions to ensure that logs get sent to: +// 1. Splunk (Logger.*) +// 2. stdout (Jenkins) + +func (handler *S3FileHandler) Infof(format string, rest ...interface{}) { + fmt.Printf(format, rest...) + handler.Logger.Infof(format, rest...) +} + +func (handler *S3FileHandler) Warningf(format string, rest ...interface{}) { + fmt.Printf(format, rest...) + handler.Logger.Warningf(format, rest...) +} + +func (handler *S3FileHandler) Errorf(format string, rest ...interface{}) { + fmt.Printf(format, rest...) + handler.Logger.Errorf(format, rest...) +} + +func (handler *S3FileHandler) LoadOptOutFiles(path string) (suppressList *[]*OptOutFilenameMetadata, skipped int, err error) { + var result []*OptOutFilenameMetadata + + bucket, prefix, err := parseS3Uri(path) + if err != nil { + handler.Errorf("Failed to parse S3 path: %s", err) + return &result, skipped, err + } + + sess, err := handler.createSession() + if err != nil { + handler.Errorf("Failed to create S3 session: %s", err) + return &result, skipped, err + } + + svc := s3.New(sess) + + handler.Infof("Listing objects in bucket %s, prefix %s", bucket, prefix) + + resp, err := svc.ListObjects(&s3.ListObjectsInput{ + Bucket: aws.String(bucket), + Prefix: aws.String(prefix), + }) + + if err != nil { + handler.Errorf("Failed to list objects in S3 bucket %s, prefix %s: %s", bucket, prefix, err) + return &result, skipped, err + } + + for _, obj := range resp.Contents { + metadata, err := ParseMetadata(*obj.Key) + metadata.FilePath = fmt.Sprintf("s3://%s/%s", bucket, *obj.Key) + metadata.DeliveryDate = *obj.LastModified + + if err != nil { + // Skip files with a bad name. An unknown file in this dir isn't a blocker + handler.Warningf("Unknown file found: %s. Skipping.", metadata) + skipped = skipped + 1 + continue + } + + result = append(result, &metadata) + } + + return &result, skipped, err +} + +func (handler *S3FileHandler) OpenFile(metadata *OptOutFilenameMetadata) (*bufio.Scanner, func(), error) { + handler.Infof("Opening file %s", metadata.FilePath) + bucket, file, err := parseS3Uri(metadata.FilePath) + if err != nil { + return nil, nil, err + } + + sess, err := handler.createSession() + if err != nil { + return nil, nil, err + } + + downloader := s3manager.NewDownloader(sess) + buff := &aws.WriteAtBuffer{} + numBytes, err := downloader.Download(buff, &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(file), + }) + + if err != nil { + handler.Errorf("Failed to download bucket %s, key %s", bucket, file) + return nil, nil, err + } + + handler.Infof("file downloaded: size=%d", numBytes) + + byte_arr := buff.Bytes() + sc := bufio.NewScanner(bytes.NewReader(byte_arr)) + return sc, func() {}, err +} + +func (handler *S3FileHandler) CleanupOptOutFiles(suppresslist []*OptOutFilenameMetadata) error { + sess, err := handler.createSession() + if err != nil { + return err + } + + errCount := 0 + for _, suppressionFile := range suppresslist { + if !suppressionFile.Imported { + // Don't do anything. The S3 bucket should have a retention policy that + // automatically cleans up files after a specified period of time, + handler.Warningf("File %s was not imported successfully. Skipping cleanup.", suppressionFile) + continue + } + + handler.Infof("Cleaning up file %s", suppressionFile) + + bucket, file, err := parseS3Uri(suppressionFile.FilePath) + if err != nil { + return err + } + + svc := s3.New(sess) + _, err = svc.DeleteObject(&s3.DeleteObjectInput{Bucket: aws.String(bucket), Key: aws.String(file)}) + + if err != nil { + handler.Errorf("File %s failed to clean up properly, error occurred while deleting object: %v", suppressionFile, err) + errCount++ + continue + } + + err = svc.WaitUntilObjectNotExists(&s3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(file), + }) + + if err != nil { + handler.Errorf("File %s failed to clean up properly, error occurred while waiting for object to be deleted: %v", suppressionFile, err) + errCount++ + continue + } + + handler.Infof("File %s successfully ingested and deleted from S3.", suppressionFile) + } + + if errCount > 0 { + return fmt.Errorf("%d files could not be cleaned up", errCount) + } + + return nil +} + +// Creates a new AWS S3 session. If the handler is given a custom S3 endpoint +// and/or IAM role ARN to assume, the new session connects using those parameters. +func (handler *S3FileHandler) createSession() (*session.Session, error) { + sess := session.Must(session.NewSession()) + + config := aws.Config{ + Region: aws.String("us-east-1"), + } + + if handler.Endpoint != "" { + config.S3ForcePathStyle = aws.Bool(true) + config.Endpoint = &handler.Endpoint + } + + if handler.AssumeRoleArn != "" { + config.Credentials = stscreds.NewCredentials( + sess, + handler.AssumeRoleArn, + ) + } + + return session.NewSessionWithOptions(session.Options{ + Config: config, + }) +} + +// Parses an S3 URI and returns the bucket and key. +// +// @example: +// input: s3://my-bucket/path/to/file +// output: "my-bucket", "path/to/file" +// +// @example +// input: s3://my-bucket +// output: "my-bucket", "" +// +func parseS3Uri(str string) (bucket string, key string, err error) { + workingString := strings.TrimPrefix(str, "s3://") + resultArr := strings.SplitN(workingString, "/", 2) + + if len(resultArr) == 1 { + return resultArr[0], "", nil + } + + return resultArr[0], resultArr[1], nil +} diff --git a/shared_files/encrypted/local.env b/shared_files/encrypted/local.env index e8a85fb6f..956050224 100644 --- a/shared_files/encrypted/local.env +++ b/shared_files/encrypted/local.env @@ -1,120 +1,122 @@ $ANSIBLE_VAULT;1.1;AES256 -30363438373263356261393039326337306530663461643064633935363463636135383366633561 -6336626332313264616363356438626138323939626133380a633561386664306466373765363537 -30373566663566393131656332343063636566313465313530343937636636383032393731613061 -6564313139383439320aa656439353834613832343030313735 +38393064323536323265653164316663353461396639346436326162326536333334656265363339 +6362343135616331310a343165393236613163306664333635356434383532336536363733623162 +64643065396638343638616434653532616366306531346331626162613033333631643533323762 +38363839353565366566646561626436663031663239383335316533343263663534303263356539 +61373231376566393732336339636534343834626561356266366562643064313537303262313164 +66653165326662323139613662386637323539343565396436663661363562623665323533396533 +39626639306266643062373761343834633633623163666265396265346537616332393732363336 +63626234653738613462666132626163366237373466653564663136663531666139323139303836 +35666363363261303835303535636164653437333464333362313931643762333933323135366461 +30633065303861363334376533363365643739333332336336616536346436363437623330353338 +35626332393537326336616563343865396537313739343330633537356531383331653133616130 +65333137646637613866633464323630303765653463353532336439353663653661633661643635 +62643463373465313766326333613363386538373034393437373030373662366332326362613166 +30383731396233383037303532383336366361343835323735396334343337626138616661646132 +35356238363866613036623365316464663562303634396264653866373766643736653739316461 +62373364323533656336623533383836663132346333363264663964653634653734326134383666 +37373734346333396236303937653262326666303234666635306533383836656532386134653364 +66363065393736306265306633656230333238376439623738663062333163366234323737326437 +38343666343235343865393863306334366539616461313030613435646539373239613062373338 +30626235336666376437383132633232336137313137363862623633366331333736653130653261 +34636463346531343839336437323932333530613933353835303364353437383362363463336137 +33336435613434616530373131356164623134616436323736663532323864326431396265396138 +37306565363763663436356166653366373436393436396232323031643565363131643366313835 +63613930383439306231363039633232373862373330323632396138366432326436363962313364 +31356266346163373163383233663330633835373063666138306465646565636266373633363336 +34623833613865656136626238373238333863646133303031666530613865333063303134396261 +38386432353361616335316665363033646434653962306630383963626466613739343831323330 +37646162633934643365663537646261396430613033633461353565386363633231376365313636 +61626135663737303037343434306335643136373361643033386430316365326534303032303331 +37623736363965333432316366373661303064616165343832626536343131396439343033636639 +34363634356461353137396164623562646331306139373334376263626263663265663232326535 +37643530613338346134326232366438623539333437356130343561653330623464646462346136 +38626239656134613962353166323932336432313635653430616566393863356666613065636136 +35313133313137306161353764633835316161353833393533653739653033373534313266356264 +33366131643039613539316535303533333937353738363933646431633634643762623733623934 +61643561396139653633383738343639646135666134366433376538323863363762303866313266 +37376461363836356565383561316430323763643930623233313631383061396633663665343261 +37373833646436343738303538316163643036316630333863353136363036393432653863343335 +31323239383732653433336666393233373235393163623339323166323531396237643261303666 +62323036353739613635633436323237383561653731363631393234393965626330343761336263 +62303138343465616234383637363730663638646530323264323763306437363762653132353061 +66646435353562333038313162396330323634663865386635313635393765336530646530373835 +39633461653462353862356365396437656432623433316461613130306362333230653362616261 +62393836323939313165356339366438316332356161346336333733333464653163623238663363 +61613036323138343534373839613539313030316363313462626561316632323332313434656365 +34653434633337663061343431393336363637643433373862633833326339643664646630336336 +38393936643139343830666237393538633635343730316433633862353836346163646333323638 +62663230393262353633326238393164643765353035323465613766663461373139343062653239 +34613539393832316664303139646235336666373664356338363162663030383636386539626365 +30623965303130393433363038353838663964313138386166646432653132393534303461353437 +33666637333566386635343731326337343866663164313262623131626164303639313233373738 +63663362343836363462313831333432373866306662303337343130353839393232323331306566 +38393233656231313262336238633534313833636364376535333766346439663033643731633634 +35376163616435386139613962613366393436393933633437616664626138656132376266633031 +30316161396235306539396464383065386230663931366237643138326434613531383130333464 +38626262393663396365373937623666306365373663303934663432613066316462316434623964 +62343934636464323934333661633834633333313864613332333362333834333534363635616162 +31663265383435303438303063633464633538663162383236346666383836383463623634633736 +65376632373338393466316362626364356539383861343562326364623665626464653633306666 +30646335646432383738303236353065383037313066643366363433316566333862396238346638 +32303933346631356230353965363266653163303439313732336339613231666333663138643635 +63313966656366623337386466366634623365303863396137323166636166373734623836326238 +31313438613536386565333764393466653332396162336232303461386639373037303832643464 +36616361393736343336343664316462636532336335303164313966653839333438366431653462 +36643637373538643365336363393534313438646664376663316233353138373233363161643737 +33363231626566613936316539303334333730323265663163396339653166633931306432303934 +30363433363063303262383162646663346661373531306661326666353737356264663763323265 +30656563363561313064393532613465333235346630313332323432353836636132363939386562 +39643232373064356466663766636261393164653037623732636236343533343034376666316537 +36323432663539306466313732313462613036373436373135333134353936333438633336363965 +30323066663432623533656336356165323261393437653331663066313437623964306266386564 +62376562336663393236303336656430626530386338666565373339396234643766623731613864 +39383139303337656139626139653033616531633538386535353664313631326264383761643738 +65383965373663366139343535376432666665373365343964373331386566323031643738383830 +61663165336235353136383137646138643337653266343063653838326366613562376163326564 +62623635386366653466666435346339333339356661386335376566656237323735613265316234 +66373135623433316463376435613763376636343333643539306437383437636563613265643561 +63333038383865383839383839626637643565666131333462363864326636373462386265663665 +61323033363036363638613137663334623230383266663833373764386166353266376232393631 +66303638656230653565616231643135313934356562303966353531343431613537656462663331 +63663434353162306434326332393534356663663662326138333631316632656435383239626265 +62313565366439646435616637386536643332313963666537386464623864396438373531363262 +37353663306637373662643632636463643761386365626133383034656530333161646437323961 +38383435623363666565353637326230373637626636333436333231666363643233373731343939 +62653965363730373737303961333037666661666664363232653438386661323334336335313239 +31316633333431313631616632323936333365373137313362653938396534343766346437613338 +36386339373364316539376235373839646432313134653764663664633866323563666630333530 +35336632396435363863653939363661373239323761663131313365313036626439373262376166 +65613436633031323062316136393234366630386532633032363731386464613738393064393164 +62346665646432663436646364323531663931323839396532346339373661656135313932623764 +33376532316561333533613134653966656363303436363163666563626539326662353364303861 +35356331383264633937353733663139316364666336613539666232633465636439383562306630 +33313265343138383834396236383363613536346461366436333663376533393864323738353230 +37306464396661373266306139656539373435383266393032356561656661653731643261643734 +66393539393065356164353263363532376366663561306534653535326163646366346565643634 +35386531346666643062373935653133323238333565343765633232383633366233643162653133 +35373866363832316432336234333232363331316333333636666565653739356166373833653332 +34323835383136323964313035336435366565663164386137343832353263376139623665373337 +33346565393366393563636566636131616339663564653830373231363032666530613161613438 +63613339393065333964386438313732646164346634616430383266333238303166633262633730 +32633065373762393330323231383237333364613362643134393962653464393233323330343133 +62333630353163333365633264343134646635356436633734383962623233366330383133623863 +30323364666463633736323663326638663933333638363833633734353065616439623431653061 +30386263306136643532353835376262323065343438626331306335353561323530643261363634 +34646565663130653061653631646234626530626636643538373937313362353631353735376130 +65373031613964623635303863626264626538626366336663396334336236623635363232633539 +30616537313038353166313866383862306335336234323336663463313338346239633633316231 +34383463323434303365383636663366306237346466333138616465633331306164613932613062 +66653464393235633737336666623931666234386664613365663564353535346262383439653664 +66313532656134383038313033643633646133643764646135653233356135383765313265323466 +37663133343865353664613433613139313636646635336232333562313264333334393463373731 +30303134663364626532396534396665613332316465303332303332333762396461373038333232 +63303430366433343364613863303834643130613634356633393831656636353763346438613064 +66383239313363343866313763373461623134303762383366386536666363373061613732303663 +39666365383836393061356161343936643330323263316535393930666633393637393833613937 +33643237646134316631363533333738326565306237326666393239363563646631653635383962 +36646439643134643632316564383563636566626537643939663666316138613836323130316430 +34313837303539663936633037373039353834376537623238623661636664643565363737613633 +36323930306462653331616135343031636631316137373233616533643633373539