Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#4278] feat(filesystem): Refactor the getFileLocation logics in hadoop GVFS #4320

Merged
merged 12 commits into from
Sep 23, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
Expand All @@ -40,12 +41,16 @@
import org.apache.gravitino.Schema;
import org.apache.gravitino.SchemaChange;
import org.apache.gravitino.StringIdentifier;
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.connector.CatalogInfo;
import org.apache.gravitino.connector.CatalogOperations;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.SupportsSchemas;
import org.apache.gravitino.exceptions.AlreadyExistsException;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
Expand Down Expand Up @@ -358,7 +363,6 @@ public boolean dropFileset(NameIdentifier ident) {
@Override
public String getFileLocation(NameIdentifier ident, String subPath)
throws NoSuchFilesetException {
// TODO we need move some check logics in the Hadoop / Python GVFS to here.
Preconditions.checkArgument(subPath != null, "subPath must not be null");
String processedSubPath;
if (!subPath.trim().isEmpty() && !subPath.trim().startsWith(SLASH)) {
Expand All @@ -369,11 +373,51 @@ public String getFileLocation(NameIdentifier ident, String subPath)

Fileset fileset = loadFileset(ident);

boolean isMountSingleFile = checkMountsSingleFile(fileset);
xloya marked this conversation as resolved.
Show resolved Hide resolved
if (isMountSingleFile) {
// if the storage location is a single file, it cannot have sub path to access.
Preconditions.checkArgument(
StringUtils.isBlank(processedSubPath),
"Sub path should always be blank, because the fileset only mounts a single file.");
}
xloya marked this conversation as resolved.
Show resolved Hide resolved

// do checks for some data operations.
CallerContext callerContext = CallerContext.CallerContextHolder.get();
if (callerContext != null
&& callerContext.context() != null
&& !callerContext.context().isEmpty()) {
Map<String, String> contextMap = CallerContext.CallerContextHolder.get().context();
String operation = contextMap.get(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION);
if (StringUtils.isNotBlank(operation)) {
Preconditions.checkArgument(
FilesetDataOperation.checkValid(operation),
String.format("The data operation: %s is not valid.", operation));
xloya marked this conversation as resolved.
Show resolved Hide resolved
FilesetDataOperation dataOperation = FilesetDataOperation.valueOf(operation);
if (dataOperation == FilesetDataOperation.RENAME) {
// Fileset only mounts a single file, the storage location of the fileset cannot be
// renamed;
// Otherwise the metadata in the Gravitino server may be inconsistent.
Preconditions.checkArgument(
StringUtils.isNotBlank(processedSubPath)
&& processedSubPath.startsWith(SLASH)
&& processedSubPath.length() > 1,
"subPath cannot be blank when need to rename a file or a directory.");
Preconditions.checkArgument(
!isMountSingleFile,
String.format(
"Cannot rename the fileset: %s which only mounts to a single file.", ident));
}
}
}
xloya marked this conversation as resolved.
Show resolved Hide resolved

String fileLocation;
// subPath cannot be null, so we only need check if it is blank
if (StringUtils.isBlank(processedSubPath)) {
// 1. if the storage location is a single file, we pass the storage location directly
// 2. if the processed sub path is blank, we pass the storage location directly
if (isMountSingleFile || StringUtils.isBlank(processedSubPath)) {
fileLocation = fileset.storageLocation();
} else {
// the processed sub path always starts with "/" if it is not blank,
// so we can safely remove the tailing slash if storage location ends with "/".
String storageLocation =
fileset.storageLocation().endsWith(SLASH)
? fileset.storageLocation().substring(0, fileset.storageLocation().length() - 1)
Expand Down Expand Up @@ -672,4 +716,18 @@ static Path formalizePath(Path path, Configuration configuration) throws IOExcep
FileSystem defaultFs = FileSystem.get(configuration);
return path.makeQualified(defaultFs.getUri(), defaultFs.getWorkingDirectory());
}

private boolean checkMountsSingleFile(Fileset fileset) {
try {
Path locationPath = new Path(fileset.storageLocation());
return locationPath.getFileSystem(hadoopConf).getFileStatus(locationPath).isFile();
} catch (FileNotFoundException e) {
// We should always return false here, same with the logic in `FileSystem.isFile(Path f)`.
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about the behavior of returning false here, can you explain why it should be false?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of the isFile() method of Hadoop FileSystem is referenced here. Since this method has been deprecated in Hadoop 3, its implementation is directly copied here.
image

} catch (IOException e) {
throw new GravitinoRuntimeException(
"Exception occurs when checking whether fileset: %s mounts a single file, exception: %s",
fileset.name(), e.getCause());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can change to like:

throw new GravitinoRuntimeException(e, "xxxxx", xx, xx)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@
import org.apache.gravitino.Schema;
import org.apache.gravitino.SchemaChange;
import org.apache.gravitino.StringIdentifier;
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.connector.CatalogInfo;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.PropertiesMetadata;
Expand Down Expand Up @@ -800,6 +803,85 @@ public void testGetFileLocation() throws IOException {
String fileLocation3 = ops.getFileLocation(filesetIdent, subPath4);
Assertions.assertEquals(fileset.storageLocation(), fileLocation3);
}

// test mount a single file
String filesetName2 = "test_get_file_location_2";
String filesetLocation2 =
TEST_ROOT_PATH + "/" + catalogName + "/" + schemaName + "/" + filesetName2;
Path filesetLocationPath2 = new Path(filesetLocation2);
createFileset(filesetName2, schemaName, comment, Fileset.Type.MANAGED, null, filesetLocation2);
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store);
FileSystem localFileSystem = filesetLocationPath2.getFileSystem(new Configuration())) {
ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA);
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, filesetName2);
// replace fileset location to a single file
Assertions.assertTrue(localFileSystem.exists(filesetLocationPath2));
Assertions.assertTrue(localFileSystem.getFileStatus(filesetLocationPath2).isDirectory());
localFileSystem.delete(filesetLocationPath2, true);
localFileSystem.create(filesetLocationPath2);
Assertions.assertTrue(localFileSystem.exists(filesetLocationPath2));
Assertions.assertTrue(localFileSystem.getFileStatus(filesetLocationPath2).isFile());

String subPath = "/year=2024/month=07/day=22/test.parquet";
Map<String, String> contextMap = Maps.newHashMap();
contextMap.put(
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION,
FilesetDataOperation.RENAME.name());
CallerContext callerContext = CallerContext.builder().withContext(contextMap).build();
CallerContext.CallerContextHolder.set(callerContext);

Assertions.assertThrows(
IllegalArgumentException.class, () -> ops.getFileLocation(filesetIdent, subPath));
} finally {
CallerContext.CallerContextHolder.remove();
}

// test rename with an empty subPath
String filesetName3 = "test_get_file_location_3";
String filesetLocation3 =
TEST_ROOT_PATH + "/" + catalogName + "/" + schemaName + "/" + filesetName3;
Path filesetLocationPath3 = new Path(filesetLocation3);
createFileset(filesetName3, schemaName, comment, Fileset.Type.MANAGED, null, filesetLocation3);
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store);
FileSystem localFileSystem = filesetLocationPath3.getFileSystem(new Configuration())) {
ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA);
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, filesetName3);
// replace fileset location to a single file
Assertions.assertTrue(localFileSystem.exists(filesetLocationPath3));
Assertions.assertTrue(localFileSystem.getFileStatus(filesetLocationPath3).isDirectory());
localFileSystem.delete(filesetLocationPath3, true);
localFileSystem.create(filesetLocationPath3);
Assertions.assertTrue(localFileSystem.exists(filesetLocationPath3));
Assertions.assertTrue(localFileSystem.getFileStatus(filesetLocationPath3).isFile());

Map<String, String> contextMap = Maps.newHashMap();
contextMap.put(
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION,
FilesetDataOperation.RENAME.name());
CallerContext callerContext = CallerContext.builder().withContext(contextMap).build();
CallerContext.CallerContextHolder.set(callerContext);
Assertions.assertThrows(
IllegalArgumentException.class, () -> ops.getFileLocation(filesetIdent, ""));
}

// test storage location end with "/"
String filesetName4 = "test_get_file_location_4";
String filesetLocation4 =
TEST_ROOT_PATH + "/" + catalogName + "/" + schemaName + "/" + filesetName4 + "/";
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name);
Fileset mockFileset = Mockito.mock(Fileset.class);
when(mockFileset.name()).thenReturn(filesetName4);
when(mockFileset.storageLocation()).thenReturn(filesetLocation4);

try (HadoopCatalogOperations mockOps = Mockito.mock(HadoopCatalogOperations.class)) {
mockOps.hadoopConf = new Configuration();
when(mockOps.loadFileset(filesetIdent)).thenReturn(mockFileset);
String subPath = "/test/test.parquet";
when(mockOps.getFileLocation(filesetIdent, subPath)).thenCallRealMethod();
String fileLocation = mockOps.getFileLocation(filesetIdent, subPath);
Assertions.assertEquals(
String.format("%s%s", mockFileset.storageLocation(), subPath.substring(1)), fileLocation);
}
}

private static Stream<Arguments> locationArguments() {
Expand Down

This file was deleted.

Loading
Loading