Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#4278] feat(filesystem): Refactor the getFileLocation logics in hadoop GVFS #4320

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
Expand All @@ -40,12 +41,16 @@
import org.apache.gravitino.Schema;
import org.apache.gravitino.SchemaChange;
import org.apache.gravitino.StringIdentifier;
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.connector.CatalogInfo;
import org.apache.gravitino.connector.CatalogOperations;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.SupportsSchemas;
import org.apache.gravitino.exceptions.AlreadyExistsException;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
Expand Down Expand Up @@ -358,7 +363,6 @@ public boolean dropFileset(NameIdentifier ident) {
@Override
public String getFileLocation(NameIdentifier ident, String subPath)
throws NoSuchFilesetException {
// TODO we need move some check logics in the Hadoop / Python GVFS to here.
Preconditions.checkArgument(subPath != null, "subPath must not be null");
String processedSubPath;
if (!subPath.trim().isEmpty() && !subPath.trim().startsWith(SLASH)) {
Expand All @@ -369,11 +373,56 @@ public String getFileLocation(NameIdentifier ident, String subPath)

Fileset fileset = loadFileset(ident);

boolean isSingleFile = checkSingleFile(fileset);
// if the storage location is a single file, it cannot have sub path to access.
if (isSingleFile && StringUtils.isBlank(processedSubPath)) {
throw new GravitinoRuntimeException(
"Sub path should always be blank, because the fileset only mounts a single file.");
}
xloya marked this conversation as resolved.
Show resolved Hide resolved

// do checks for some data operations.
if (hasCallerContext()) {
Map<String, String> contextMap = CallerContext.CallerContextHolder.get().context();
String operation =
contextMap.getOrDefault(
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION,
FilesetDataOperation.UNKNOWN.name());
if (!FilesetDataOperation.checkValid(operation)) {
LOG.warn(
"The data operation: {} is not valid, we cannot do some checks for this operation.",
operation);
} else {
FilesetDataOperation dataOperation = FilesetDataOperation.valueOf(operation);
switch (dataOperation) {
case RENAME:
// Fileset only mounts a single file, the storage location of the fileset cannot be
// renamed; Otherwise the metadata in the Gravitino server may be inconsistent.
if (isSingleFile) {
throw new GravitinoRuntimeException(
"Cannot rename the fileset: %s which only mounts to a single file.", ident);
}
// if the sub path is blank, it cannot be renamed,
// otherwise the metadata in the Gravitino server may be inconsistent.
if (StringUtils.isBlank(processedSubPath)
|| (processedSubPath.startsWith(SLASH) && processedSubPath.length() == 1)) {
throw new GravitinoRuntimeException(
"subPath cannot be blank when need to rename a file or a directory.");
}
break;
default:
break;
}
}
}
xloya marked this conversation as resolved.
Show resolved Hide resolved

String fileLocation;
// subPath cannot be null, so we only need check if it is blank
if (StringUtils.isBlank(processedSubPath)) {
// 1. if the storage location is a single file, we pass the storage location directly
// 2. if the processed sub path is blank, we pass the storage location directly
if (isSingleFile || StringUtils.isBlank(processedSubPath)) {
fileLocation = fileset.storageLocation();
} else {
// the processed sub path always starts with "/" if it is not blank,
// so we can safely remove the tailing slash if storage location ends with "/".
String storageLocation =
fileset.storageLocation().endsWith(SLASH)
? fileset.storageLocation().substring(0, fileset.storageLocation().length() - 1)
Expand Down Expand Up @@ -672,4 +721,24 @@ static Path formalizePath(Path path, Configuration configuration) throws IOExcep
FileSystem defaultFs = FileSystem.get(configuration);
return path.makeQualified(defaultFs.getUri(), defaultFs.getWorkingDirectory());
}

private boolean hasCallerContext() {
return CallerContext.CallerContextHolder.get() != null
&& CallerContext.CallerContextHolder.get().context() != null
&& !CallerContext.CallerContextHolder.get().context().isEmpty();
}

private boolean checkSingleFile(Fileset fileset) {
try {
Path locationPath = new Path(fileset.storageLocation());
return locationPath.getFileSystem(hadoopConf).getFileStatus(locationPath).isFile();
} catch (FileNotFoundException e) {
// We should always return false here, same with the logic in `FileSystem.isFile(Path f)`.
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about the behavior of returning false here, can you explain why it should be false?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of the isFile() method of Hadoop FileSystem is referenced here. Since this method has been deprecated in Hadoop 3, its implementation is directly copied here.
image

} catch (IOException e) {
throw new GravitinoRuntimeException(
"Exception occurs when checking whether fileset: %s mounts a single file, exception: %s",
fileset.name(), e.getCause());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can change to like:

throw new GravitinoRuntimeException(e, "xxxxx", xx, xx)

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,13 @@
import org.apache.gravitino.Schema;
import org.apache.gravitino.SchemaChange;
import org.apache.gravitino.StringIdentifier;
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.connector.CatalogInfo;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.PropertiesMetadata;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NonEmptySchemaException;
Expand Down Expand Up @@ -800,6 +804,85 @@ public void testGetFileLocation() throws IOException {
String fileLocation3 = ops.getFileLocation(filesetIdent, subPath4);
Assertions.assertEquals(fileset.storageLocation(), fileLocation3);
}

// test mount a single file
String filesetName2 = "test_get_file_location_2";
String filesetLocation2 =
TEST_ROOT_PATH + "/" + catalogName + "/" + schemaName + "/" + filesetName2;
Path filesetLocationPath2 = new Path(filesetLocation2);
createFileset(filesetName2, schemaName, comment, Fileset.Type.MANAGED, null, filesetLocation2);
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store);
FileSystem localFileSystem = filesetLocationPath2.getFileSystem(new Configuration())) {
ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA);
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, filesetName2);
// replace fileset location to a single file
Assertions.assertTrue(localFileSystem.exists(filesetLocationPath2));
Assertions.assertTrue(localFileSystem.getFileStatus(filesetLocationPath2).isDirectory());
localFileSystem.delete(filesetLocationPath2, true);
localFileSystem.create(filesetLocationPath2);
Assertions.assertTrue(localFileSystem.exists(filesetLocationPath2));
Assertions.assertTrue(localFileSystem.getFileStatus(filesetLocationPath2).isFile());

String subPath = "/year=2024/month=07/day=22/test.parquet";
Map<String, String> contextMap = Maps.newHashMap();
contextMap.put(
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION,
FilesetDataOperation.RENAME.name());
CallerContext callerContext = CallerContext.builder().withContext(contextMap).build();
CallerContext.CallerContextHolder.set(callerContext);

Assertions.assertThrows(
GravitinoRuntimeException.class, () -> ops.getFileLocation(filesetIdent, subPath));
} finally {
CallerContext.CallerContextHolder.remove();
}

// test rename with an empty subPath
String filesetName3 = "test_get_file_location_3";
String filesetLocation3 =
TEST_ROOT_PATH + "/" + catalogName + "/" + schemaName + "/" + filesetName3;
Path filesetLocationPath3 = new Path(filesetLocation3);
createFileset(filesetName3, schemaName, comment, Fileset.Type.MANAGED, null, filesetLocation3);
try (HadoopCatalogOperations ops = new HadoopCatalogOperations(store);
FileSystem localFileSystem = filesetLocationPath3.getFileSystem(new Configuration())) {
ops.initialize(Maps.newHashMap(), randomCatalogInfo(), HADOOP_PROPERTIES_METADATA);
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, filesetName3);
// replace fileset location to a single file
Assertions.assertTrue(localFileSystem.exists(filesetLocationPath3));
Assertions.assertTrue(localFileSystem.getFileStatus(filesetLocationPath3).isDirectory());
localFileSystem.delete(filesetLocationPath3, true);
localFileSystem.create(filesetLocationPath3);
Assertions.assertTrue(localFileSystem.exists(filesetLocationPath3));
Assertions.assertTrue(localFileSystem.getFileStatus(filesetLocationPath3).isFile());

Map<String, String> contextMap = Maps.newHashMap();
contextMap.put(
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION,
FilesetDataOperation.RENAME.name());
CallerContext callerContext = CallerContext.builder().withContext(contextMap).build();
CallerContext.CallerContextHolder.set(callerContext);
Assertions.assertThrows(
GravitinoRuntimeException.class, () -> ops.getFileLocation(filesetIdent, ""));
}

// test storage location end with "/"
String filesetName4 = "test_get_file_location_4";
String filesetLocation4 =
TEST_ROOT_PATH + "/" + catalogName + "/" + schemaName + "/" + filesetName4 + "/";
NameIdentifier filesetIdent = NameIdentifier.of("m1", "c1", schemaName, name);
Fileset mockFileset = Mockito.mock(Fileset.class);
when(mockFileset.name()).thenReturn(filesetName4);
when(mockFileset.storageLocation()).thenReturn(filesetLocation4);

try (HadoopCatalogOperations mockOps = Mockito.mock(HadoopCatalogOperations.class)) {
mockOps.hadoopConf = new Configuration();
when(mockOps.loadFileset(filesetIdent)).thenReturn(mockFileset);
String subPath = "/test/test.parquet";
when(mockOps.getFileLocation(filesetIdent, subPath)).thenCallRealMethod();
String fileLocation = mockOps.getFileLocation(filesetIdent, subPath);
Assertions.assertEquals(
String.format("%s%s", mockFileset.storageLocation(), subPath.substring(1)), fileLocation);
}
}

private static Stream<Arguments> locationArguments() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,16 +664,25 @@ public void testGetFileLocationWithInvalidAuditHeaders() {
try {
String filesetName = GravitinoITUtils.genRandomName("fileset");
NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName);
Assertions.assertFalse(catalog.asFilesetCatalog().filesetExists(filesetIdent));
Fileset expectedFileset =
catalog
.asFilesetCatalog()
.createFileset(
filesetIdent,
"fileset comment",
Fileset.Type.MANAGED,
generateLocation(filesetName),
Maps.newHashMap());

Map<String, String> context = new HashMap<>();
// this is an invalid internal client type.
// this is an invalid internal client type, but the server will return normally
context.put(FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE, "test");
CallerContext callerContext = CallerContext.builder().withContext(context).build();
CallerContext.CallerContextHolder.set(callerContext);

Assertions.assertThrows(
IllegalArgumentException.class,
() -> catalog.asFilesetCatalog().getFileLocation(filesetIdent, "/test.par"));
String fileLocation = catalog.asFilesetCatalog().getFileLocation(filesetIdent, "/test.par");
Assertions.assertEquals(expectedFileset.storageLocation() + "/test.par", fileLocation);
} finally {
CallerContext.CallerContextHolder.remove();
}
Expand Down

This file was deleted.

Loading
Loading