Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter invalid partitionID #3025

Merged
merged 18 commits into from
Mar 12, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -16,6 +16,8 @@
import com.github.ambry.clustermap.DataNodeConfigSourceType;
import org.json.JSONObject;

import static com.github.ambry.config.RouterConfig.*;


/**
* The configs for resource state.
@@ -43,6 +45,7 @@ public class ClusterMapConfig {
"clustermap.delete.data.from.datanode.config.in.property.store.clean.up.task";
private static final String MAX_REPLICAS_ALL_DATACENTERS = "max-replicas-all-datacenters";
public static final String IS_AUTO_REGISTRATION_ENABLED = "clustermap.auto.registration.enabled";
public static final String PARTITION_FILTERING_ENABLED = "clustermap.enable.partition.filtering";

/**
* The factory class used to get the resource state policies.
@@ -384,6 +387,17 @@ public class ClusterMapConfig {
@Default("false")
public final boolean clusterMapAutoRegistrationEnabled;

/**
* The minimum number of successful responses required for a put operation.
*/
@Config(ROUTER_PUT_SUCCESS_TARGET)
@Default("2")
public final int routerPutSuccessTarget;

@Config(PARTITION_FILTERING_ENABLED)
@Default("false")
public final boolean clusterMapPartitionFilteringEnabled;

public ClusterMapConfig(VerifiableProperties verifiableProperties) {
clusterMapFixedTimeoutDatanodeErrorThreshold =
verifiableProperties.getIntInRange("clustermap.fixedtimeout.datanode.error.threshold", 3, 1, 100);
@@ -470,5 +484,7 @@ public ClusterMapConfig(VerifiableProperties verifiableProperties) {
0, Long.MAX_VALUE);
clusterMapIgnoreDownwardStateTransition = verifiableProperties.getBoolean(IGNORE_DOWNWARD_STATE_TRANSITION, false);
clusterMapAutoRegistrationEnabled = verifiableProperties.getBoolean(IS_AUTO_REGISTRATION_ENABLED, false);
routerPutSuccessTarget = verifiableProperties.getIntInRange(ROUTER_PUT_SUCCESS_TARGET, 2, 1, Integer.MAX_VALUE);
clusterMapPartitionFilteringEnabled = verifiableProperties.getBoolean(PARTITION_FILTERING_ENABLED, false);
}
}
Original file line number Diff line number Diff line change
@@ -77,4 +77,15 @@ void getReplicaIdsByStates(Map<ReplicaState, List<R>> replicasByState, P partiti
* @return a collection of partitions in this cluster.
*/
Collection<P> getPartitions();

/**
* Return whether the external view count of replicas match those from ideal state.
*
*/
boolean isValidPartition(String partitionID);

/**
* Return whether we want to filter partitions
*/
boolean isPartitionFilteringEnabled();
}
Original file line number Diff line number Diff line change
@@ -715,7 +715,19 @@ static class PartitionSelectionHelper implements ClusterMapChangeListener {
this.clusterManagerQueryHelper = clusterManagerQueryHelper;
this.defaultPartitionClass = defaultPartitionClass;
this.clusterManagerMetrics = clusterManagerMetrics;
updatePartitions(clusterManagerQueryHelper.getPartitions(), localDatacenterName);

Collection<? extends PartitionId> partitions = clusterManagerQueryHelper.getPartitions();
Collection<PartitionId> filteredPartions = new ArrayList<>();
if (clusterManagerQueryHelper.isPartitionFilteringEnabled()) {
for (PartitionId partition : partitions) {
if (clusterManagerQueryHelper.isValidPartition(partition.toString())){
filteredPartions.add(partition);
}
}
updatePartitions(filteredPartions, localDatacenterName);
} else {
updatePartitions(partitions, localDatacenterName);
}
logger.debug("Number of partitions in data center {} {}", localDatacenterName, allPartitions.size());
for (Map.Entry<String, SortedMap<Integer, List<PartitionId>>> entry : partitionIdsByClassAndLocalReplicaCount.entrySet()) {
logger.debug("Partition class {}, partitions {}", entry.getKey(), entry.getValue().values());
Original file line number Diff line number Diff line change
@@ -16,6 +16,8 @@
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.github.ambry.config.ClusterMapConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.router.Router;
import com.github.ambry.utils.Pair;
import com.github.ambry.utils.SystemTime;
import java.io.IOException;
@@ -65,6 +67,7 @@
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.ambry.config.RouterConfig;

import static com.github.ambry.clustermap.ClusterMapSnapshotConstants.*;
import static com.github.ambry.clustermap.ClusterMapUtils.*;
@@ -1409,6 +1412,32 @@ public Collection<AmbryPartition> getPartitions() {
return new ArrayList<>(partitionMap.values());
}

/**
*
* @return boolean regarding whether we want to filter partitions
*/
public boolean isPartitionFilteringEnabled() {
return clusterMapConfig.clusterMapPartitionFilteringEnabled;
}

/**
*
* @param partitionID
* @return boolean regarding whether we can write to it
*/
@Override
public boolean isValidPartition(String partitionID) {
try {
String resource = getResourceForPartitionInLocalDc(partitionID).iterator().next();
String tag = dcToResourceNameToTag.get(clusterMapConfig.clusterMapDatacenterName).get(resource);
ResourceProperty resourceProperty =
dcToTagToResourceProperty.get(clusterMapConfig.clusterMapDatacenterName).get(tag);
return resourceProperty.replicationFactor >= clusterMapConfig.routerPutSuccessTarget;
} catch (Exception e) {
return false;
}
}

/**
* @return the count of partitions in this cluster.
*/
Original file line number Diff line number Diff line change
@@ -371,5 +371,15 @@ public Collection<Disk> getDisks(DataNode dataNode) {
public Collection<Partition> getPartitions() {
return partitionMap.values();
}

@Override
public boolean isValidPartition(String partitionID) {
throw new UnsupportedOperationException("Not supported in static cluster map");
}

@Override
public boolean isPartitionFilteringEnabled() {
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -156,6 +156,16 @@ public Collection<P> getPartitions() {
return null;
}

@Override
public boolean isValidPartition(String partitionID) {
return true;
}

@Override
public boolean isPartitionFilteringEnabled() {
return false;
}

/**
* Increment the sealed state counter by one.
*/
Original file line number Diff line number Diff line change
@@ -62,6 +62,36 @@ public void areAllReplicasForPartitionUpTest() {
assertTrue("All replicas should be up", ClusterMapUtils.areAllReplicasForPartitionUp(partitionId));
}

@Test
public void partitionSelectionFilterTest() {
final String dc1 = "DC1";
final String maxReplicasAllSites = "max-replicas-all-sites";
final int minimumLocalReplicaCount = 3;

MockDataNodeId dc1Dn1 = getDataNodeId("dc1dn1", dc1);
MockDataNodeId dc1Dn2 = getDataNodeId("dc1dn2", dc1);
MockDataNodeId dc1Dn3 = getDataNodeId("dc1dn3", dc1);

List<MockDataNodeId> allDataNodes = Arrays.asList(dc1Dn1, dc1Dn2, dc1Dn3);
MockPartitionId everywhere1 = new MockPartitionId(1, maxReplicasAllSites, allDataNodes, 0);
MockPartitionId everywhere2 = new MockPartitionId(2, maxReplicasAllSites, allDataNodes, 0);

Collection<MockPartitionId> allPartitionIdsMain = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(everywhere1, everywhere2)));
ClusterManagerQueryHelper mockClusterManagerQueryHelper = Mockito.mock(ClusterManagerQueryHelper.class);
doReturn(allPartitionIdsMain).when(mockClusterManagerQueryHelper).getPartitions();
doReturn(true).when(mockClusterManagerQueryHelper).isValidPartition("Partition[1]");
doReturn(false).when(mockClusterManagerQueryHelper).isValidPartition("Partition[2]");
doReturn(true).when(mockClusterManagerQueryHelper).isPartitionFilteringEnabled();

ClusterMapUtils.PartitionSelectionHelper psh =
new ClusterMapUtils.PartitionSelectionHelper(mockClusterManagerQueryHelper, dc1, minimumLocalReplicaCount,
maxReplicasAllSites, null);

Set<MockPartitionId> allPartitionIds = new HashSet<>(Arrays.asList(everywhere1));
assertCollectionEquals("Partitions returned not as expected", allPartitionIds, psh.getPartitions(maxReplicasAllSites));
}

/**
* Tests for all functions in {@link ClusterMapUtils.PartitionSelectionHelper}
*/
@@ -100,6 +130,7 @@ public void partitionSelectionHelperTest() {
new HashSet<>(Arrays.asList(everywhere1, everywhere2, majorDc11, majorDc12, majorDc21, majorDc22)));
ClusterManagerQueryHelper mockClusterManagerQueryHelper = Mockito.mock(ClusterManagerQueryHelper.class);
doReturn(allPartitionIdsMain).when(mockClusterManagerQueryHelper).getPartitions();
doReturn(false).when(mockClusterManagerQueryHelper).isPartitionFilteringEnabled();
ClusterMapUtils.PartitionSelectionHelper psh =
new ClusterMapUtils.PartitionSelectionHelper(mockClusterManagerQueryHelper, null, minimumLocalReplicaCount,
maxReplicasAllSites, null);
@@ -188,6 +219,7 @@ public void partitionWithDifferentReplicaCntTest() {
MockPartitionId partition3 = new MockPartitionId(3, partitionClass, dataNodeIdList.subList(0, 4), 0);
List<MockPartitionId> allPartitions = Arrays.asList(partition1, partition2, partition3);
ClusterManagerQueryHelper mockClusterManagerQueryHelper = Mockito.mock(ClusterManagerQueryHelper.class);
doReturn(false).when(mockClusterManagerQueryHelper).isPartitionFilteringEnabled();
doReturn(allPartitions).when(mockClusterManagerQueryHelper).getPartitions();
int minimumLocalReplicaCount = 3;
ClusterMapUtils.PartitionSelectionHelper psh =
@@ -260,7 +292,6 @@ public void testReserveMetadataBlobId() throws Exception {
properties.setProperty(RouterConfig.ROUTER_HOSTNAME, "localhost");
properties.setProperty(RouterConfig.ROUTER_DATACENTER_NAME, "DEV");
RouterConfig routerConfig = new RouterConfig(new VerifiableProperties(properties));

// test a simple success case.
BlobId blobId =
ClusterMapUtils.reserveMetadataBlobId(partitionClass, partitionsToExclude, reservedMetadataIdMetrics,
Original file line number Diff line number Diff line change
@@ -309,6 +309,11 @@ public List<AmbryReplica> getReplicaIdsForPartition(AmbryPartition partition) {
return new ArrayList<>(partitionToReplicas.get(partition));
}

@Override
public List<String> getResourceNamesForPartition(AmbryPartition partition) {
throw new UnsupportedOperationException("Temporarily unsupported");
}

@Override
public List<AmbryReplica> getReplicaIdsByState(AmbryPartition partition, ReplicaState state, String dcName) {
throw new UnsupportedOperationException("Temporarily unsupported");
@@ -342,6 +347,16 @@ public Collection<AmbryPartition> getPartitions() {
return partitionToReplicas.keySet();
}

@Override
public boolean isValidPartition(String partitionID) {
return true;
}

@Override
public boolean isPartitionFilteringEnabled() {
return false;
}

/**
* Associate the replica with the given partition.
* @param partition the {@link AmbryPartition}.
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.ArrayList;
@@ -41,6 +42,7 @@
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixManager;
@@ -69,6 +71,7 @@
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
import static org.junit.Assume.*;
import static org.mockito.Mockito.*;


/**
@@ -898,6 +901,37 @@ public void helixInitiatedIdealStateChangeTest() throws Exception {
.containsKey(partitionName));
}

/**
* Tests the isValidPartitionFilter function
*/
@Test
public void helixClusterPartitionFilterTest() throws Exception {
assumeTrue(!useComposite && !overrideEnabled);
HelixClusterManager helixClusterManager = (HelixClusterManager) clusterManager;
HelixClusterManager.HelixClusterManagerQueryHelper clusterHelper = helixClusterManager.new HelixClusterManagerQueryHelper();
verifyInitialClusterChanges(helixClusterManager, helixCluster, new String[]{localDc});

String partitionID = String.valueOf(clusterHelper.getPartitions().iterator().next().getId());
List<String> resourceNames = helixCluster.getResources(localDc);
String resourceName = resourceNames.get(0);
String tag = "TAG_100000";
IdealState idealState = helixCluster.getResourceIdealState(resourceName, localDc);
idealState.setInstanceGroupTag(tag);
helixCluster.refreshIdealState();

// set to 10 to make function fail
Field field = ClusterMapConfig.class.getDeclaredField("routerPutSuccessTarget");
field.setAccessible(true);
field.set(clusterMapConfig, 10);
boolean isValid = clusterHelper.isValidPartition(partitionID);
assertFalse(isValid);

// set to 3 to make function pass
field.set(clusterMapConfig, 3);
isValid = clusterHelper.isValidPartition(partitionID);
assertTrue(isValid);
}

/**
* Test duplicate partition ids in ideal state.
* @throws Exception
Original file line number Diff line number Diff line change
@@ -209,6 +209,7 @@ public MockClusterMap(boolean enableSSLPorts, boolean enableHttp2Ports, int numN
.findFirst()
.get();
doReturn(partitions.values()).when(mockClusterManagerQueryHelper).getPartitions();
doReturn(false).when(mockClusterManagerQueryHelper).isPartitionFilteringEnabled();
partitionSelectionHelper =
new ClusterMapUtils.PartitionSelectionHelper(mockClusterManagerQueryHelper, localDatacenterName,
Math.min(defaultPartition.getReplicaIds().size(), 3), DEFAULT_PARTITION_CLASS, null);
Original file line number Diff line number Diff line change
@@ -67,6 +67,7 @@
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
import static org.junit.Assume.*;
import static org.mockito.Mockito.*;


@RunWith(Parameterized.class)