Skip to content

Commit

Permalink
code review and unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
muyangye committed Feb 7, 2024
1 parent f1684a9 commit eebfd54
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ public class FileMetadata {

private String createdByUser;

public String test;

public String getFileId() {
return fileId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@
package org.apache.streampipes.manager.file;

import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;

public class FileHandler {

Logger logger = LoggerFactory.getLogger(FileHandler.class);

public void storeFile(String filename, InputStream fileInputStream) throws IOException {
File targetFile = makeFile(filename);
FileUtils.copyInputStreamToFile(fileInputStream, targetFile);
Expand All @@ -45,8 +50,12 @@ public void renameFile(String oldFilename, String newFilename) {
var fileInputStream = new FileInputStream(getFile(oldFilename));
deleteFile(oldFilename);
storeFile(newFilename, fileInputStream);
} catch (Exception e) {
System.out.println(e.getMessage());
} catch (FileNotFoundException e) {
logger.error(
"Failed to find the old file locally with internalFilename as the identifier, this is most likely a mismatch "
+ "between local file and FileMetadata stored in CouchDB. Raw exception message: " + e.getMessage());
} catch (IOException e) {
logger.error("Failed to save renamed file locally: " + e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,31 @@

public class MergeFilenamesAndRenameDuplicatesMigration implements Migration {

private static final String ORIGINAL_FILENAME = "originalFilename";
private static final String INTERNAL_FILENAME = "internalFilename";
private static final String ID = "_id";
protected static final String ORIGINAL_FILENAME = "originalFilename";
protected static final String INTERNAL_FILENAME = "internalFilename";
protected static final String ID = "_id";
protected static final String FILETYPE = "filetype";

private CouchDbClient couchDbClient;

private CouchDbClient couchDbClient = Utils.getCouchDbFileMetadataClient();
private ObjectMapper mapper = new ObjectMapper();

private IFileMetadataStorage fileMetadataStorage =
StorageDispatcher.INSTANCE.getNoSqlStore().getFileMetadataStorage();

private FileHandler fileHandler = new FileHandler();
// This map contains all fileMetadata grouped by originalFilename
private Map<String, List<FileMetadata>> duplicateFilenamesFileMetadataMap = new HashMap<>();

protected Map<String, List<FileMetadata>> fileMetadataGroupedByOriginalName = new HashMap<>();

private boolean isTesting = false;

public MergeFilenamesAndRenameDuplicatesMigration(boolean testing) {
isTesting = testing;
}

public MergeFilenamesAndRenameDuplicatesMigration() {
couchDbClient = Utils.getCouchDbFileMetadataClient();
}

// Starting from v0.95, StreamPipes will use a single file name as the unique identifier of files instead of an
// internal filename and an original filename. This migration merges them and renames all the files that have
Expand All @@ -59,11 +73,20 @@ public boolean shouldExecute() {
@Override
public void executeMigration() {
var couchDbRawFileMetadata = getCouchDbRawFileMetadata(getAllFileIds(fileMetadataStorage));
couchDbRawFileMetadata.forEach(rawFileMetadata -> checkDuplicateOriginalFilename(rawFileMetadata));
duplicateFilenamesFileMetadataMap.forEach(
getFileMetadataToUpdate(couchDbRawFileMetadata);
fileMetadataGroupedByOriginalName.forEach(
(originalFilename, fileMetadataList) -> update(originalFilename, fileMetadataList));
}

/**
* Gets all fileMetadata that need to be updated grouped by originalFilename
* key is (possibly) duplicated originalFilename and value is that file's FileMetadata list (if duplicated)
*/
protected void getFileMetadataToUpdate(List<Map<String, Object>> couchDbRawFileMetadata) {
couchDbRawFileMetadata.forEach(
rawFileMetadata -> checkDuplicateOriginalFilename(rawFileMetadata));
}

/**
* Fetches all fileIds stored in CouchDB
*/
Expand Down Expand Up @@ -93,45 +116,58 @@ private Map<String, Object> convertInputStreamToMap(InputStream inputStream) {
}

/**
* Takes raw data stored in CouchDB and constructs duplicateFilenamesFileMetadataMap,
* key is (possibly) duplicated originalFilename and value is that file's FileMetadata
* Takes raw data stored in CouchDB and constructs fileMetadataGroupedByOriginalName,
* key is (possibly) duplicated originalFilename and value is that file's FileMetadata list (if duplicated)
*/
private void checkDuplicateOriginalFilename(Map<String, Object> rawFileMetadata) {
// If this file was already migrated or there was an error when converting InputStream to Map, skip it
if (rawFileMetadata.containsKey(ORIGINAL_FILENAME)) {
var originalFilename = rawFileMetadata.get(ORIGINAL_FILENAME).toString().toLowerCase();
if (!duplicateFilenamesFileMetadataMap.containsKey(originalFilename)) {
duplicateFilenamesFileMetadataMap.put(originalFilename, new ArrayList<>());
if (!fileMetadataGroupedByOriginalName.containsKey(originalFilename)) {
fileMetadataGroupedByOriginalName.put(originalFilename, new ArrayList<>());
}
var fileMetadata = fileMetadataStorage.getMetadataById(rawFileMetadata.get(ID).toString());
duplicateFilenamesFileMetadataMap.get(originalFilename).add(fileMetadata);
FileMetadata fileMetadata;
if (isTesting) {
fileMetadata = new FileMetadata();
fileMetadata.setFileId(rawFileMetadata.get(ID).toString());
fileMetadata.setFiletype(rawFileMetadata.get(FILETYPE).toString());
} else {
fileMetadata = fileMetadataStorage.getMetadataById(rawFileMetadata.get(ID).toString());
}
fileMetadataGroupedByOriginalName.get(originalFilename).add(fileMetadata);
}
}

/**
* For each of the file, calls updateFileMetadata() and updateLocalFile()
*/
private void update(String originalFilename, List<FileMetadata> fileMetadataList) {
protected void update(String originalFilename, List<FileMetadata> fileMetadataList) {
var fileMetadata = fileMetadataList.get(0);
var internalFilename = getInternalFilenameFromFileMetadata(fileMetadata);
// just name the 1st one to its originalFilename
updateFileMetadata(fileMetadata, originalFilename);
updateLocalFile(internalFilename, originalFilename);
if (!isTesting) {
var internalFilename = getInternalFilenameFromFileMetadata(fileMetadata);
updateLocalFile(internalFilename, originalFilename);
}
updateFileMetadata(fileMetadata, originalFilename, isTesting);
for (int i = 1; i < fileMetadataList.size(); ++i) {
fileMetadata = fileMetadataList.get(i);
internalFilename = getInternalFilenameFromFileMetadata(fileMetadataList.get(i));
var newFilename = createNewFileName(i, removeFileType(originalFilename), fileMetadata.getFiletype());
updateFileMetadata(fileMetadata, newFilename);
updateLocalFile(internalFilename, newFilename);
if (!isTesting) {
var internalFilename = getInternalFilenameFromFileMetadata(fileMetadata);
updateLocalFile(internalFilename, newFilename);
}
updateFileMetadata(fileMetadata, newFilename, isTesting);
}
}

/**
* Updates FileMetadata: sets new merged filename to the given filename
*/
private void updateFileMetadata(FileMetadata fileMetadata, String filename) {
private void updateFileMetadata(FileMetadata fileMetadata, String filename, boolean isTesting) {
fileMetadata.setFilename(filename);
fileMetadataStorage.updateFileMetadata(fileMetadata);
if (!isTesting) {
fileMetadataStorage.updateFileMetadata(fileMetadata);
}
}

/**
Expand All @@ -145,7 +181,7 @@ private void updateLocalFile(String internalFilename, String filename) {
/**
* Gets the old internalFilename after merging
*/
private String getInternalFilenameFromFileMetadata (FileMetadata fileMetadata) {
private String getInternalFilenameFromFileMetadata(FileMetadata fileMetadata) {
return convertInputStreamToMap(couchDbClient.find(fileMetadata.getFileId())).get(INTERNAL_FILENAME).toString();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.service.core.migrations.v095;

import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.streampipes.service.core.migrations.v095.MergeFilenamesAndRenameDuplicatesMigration.FILETYPE;
import static org.apache.streampipes.service.core.migrations.v095.MergeFilenamesAndRenameDuplicatesMigration.ID;
import static org.apache.streampipes.service.core.migrations.v095.MergeFilenamesAndRenameDuplicatesMigration.INTERNAL_FILENAME;
import static org.apache.streampipes.service.core.migrations.v095.MergeFilenamesAndRenameDuplicatesMigration.ORIGINAL_FILENAME;
import static org.junit.Assert.assertEquals;

public class MergeFilenamesAndRenameDuplicatesMigrationTest {

private static final Map RAW_FILEMETADATA_1 = new HashMap<String, Object>() {
{
put(ID, "id1");
put(ORIGINAL_FILENAME, "file.txt");
put(INTERNAL_FILENAME, "doesn't matter");
put(FILETYPE, "txt");
}
};

private static final Map RAW_FILEMETADATA_2 = new HashMap<String, Object>() {
{
put(ID, "id2");
put(ORIGINAL_FILENAME, "FILE.txt");
put(INTERNAL_FILENAME, "doesn't matter");
put(FILETYPE, "TXT");
}
};

private static final Map RAW_FILEMETADATA_3 = new HashMap<String, Object>() {
{
put(ID, "id2");
put(ORIGINAL_FILENAME, "fIlE.TxT");
put(INTERNAL_FILENAME, "doesn't matter");
put(FILETYPE, "TxT");
}
};

private static final Map RAW_FILEMETADATA_4 = new HashMap<String, Object>() {
{
put(ID, "id3");
put(ORIGINAL_FILENAME, "file.csv");
put(INTERNAL_FILENAME, "doesn't matter");
put(FILETYPE, "csv");
}
};

private List<Map<String, Object>> couchDbRawFileMetadata;

private MergeFilenamesAndRenameDuplicatesMigration migration;

@Before
public void setUp() {
migration = new MergeFilenamesAndRenameDuplicatesMigration(true);
couchDbRawFileMetadata = new ArrayList<>() {
{
add(RAW_FILEMETADATA_1);
add(RAW_FILEMETADATA_2);
add(RAW_FILEMETADATA_3);
add(RAW_FILEMETADATA_4);
}
};
}

@Test
public void testMigration() {
// Test that the migration successfully groups FileMetadata by originalFilename
migration.getFileMetadataToUpdate(couchDbRawFileMetadata);
assertEquals(2, migration.fileMetadataGroupedByOriginalName.size());
assertEquals(3, migration.fileMetadataGroupedByOriginalName.get("file.txt").size());
assertEquals(1, migration.fileMetadataGroupedByOriginalName.get("file.csv").size());

// Test that the migration successfully renames duplicate files
migration.fileMetadataGroupedByOriginalName.forEach(
(originalFilename, fileMetadataList) -> migration.update(originalFilename, fileMetadataList));
assertEquals("file.txt", migration.fileMetadataGroupedByOriginalName.get("file.txt").get(0).getFilename());
assertEquals("file(2).TXT", migration.fileMetadataGroupedByOriginalName.get("file.txt").get(1).getFilename());
assertEquals("file(3).TxT", migration.fileMetadataGroupedByOriginalName.get("file.txt").get(2).getFilename());
assertEquals("file.csv", migration.fileMetadataGroupedByOriginalName.get("file.csv").get(0).getFilename());
}
}

0 comments on commit eebfd54

Please sign in to comment.