Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[hotfix] Improve log message for becomeLeaderAndFollower #667

Merged
merged 1 commit into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ public final class Replica {
/** The manger to manger the isr expand and shrink. */
private final AdjustIsrManager adjustIsrManager;

private final List<String> partitionKeys;
private final Schema schema;
private final TableConfig tableConfig;
// logFormat and arrowCompressionInfo are used in hot-path, so cache them here.
Expand Down Expand Up @@ -234,7 +233,6 @@ public Replica(
this.tableConfig = tableInfo.getTableConfig();
this.logFormat = tableConfig.getLogFormat();
this.arrowCompressionInfo = tableConfig.getArrowCompressionInfo();
this.partitionKeys = tableInfo.getPartitionKeys();
this.snapshotContext = snapshotContext;
// create a closeable registry for the replica
this.closeableRegistry = new CloseableRegistry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.FencedLeaderEpochException;
import com.alibaba.fluss.exception.FlussRuntimeException;
import com.alibaba.fluss.exception.InvalidCoordinatorException;
import com.alibaba.fluss.exception.InvalidRequiredAcksException;
import com.alibaba.fluss.exception.LogOffsetOutOfRangeException;
Expand All @@ -31,8 +30,6 @@
import com.alibaba.fluss.exception.UnknownTableOrBucketException;
import com.alibaba.fluss.fs.FsPath;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.SchemaInfo;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePath;
Expand Down Expand Up @@ -331,7 +328,7 @@ public void becomeLeaderOrFollower(
replicaStateChangeLock,
() -> {
// check or apply coordinator epoch.
validateAndApplyCoordinatorEpoch(requestCoordinatorEpoch);
validateAndApplyCoordinatorEpoch(requestCoordinatorEpoch, "notifyLeaderAndIsr");

List<NotifyLeaderAndIsrData> replicasToBeLeader = new ArrayList<>();
List<NotifyLeaderAndIsrData> replicasToBeFollower = new ArrayList<>();
Expand Down Expand Up @@ -550,7 +547,7 @@ public void stopReplicas(
replicaStateChangeLock,
() -> {
// check or apply coordinator epoch.
validateAndApplyCoordinatorEpoch(requestCoordinatorEpoch);
validateAndApplyCoordinatorEpoch(requestCoordinatorEpoch, "stopReplicas");

// store the deleted table id and the table dir path to delete the table dir
// after delete all the buckets of this table.
Expand Down Expand Up @@ -629,7 +626,8 @@ public void notifyRemoteLogOffsets(
() -> {
// check or apply coordinator epoch.
validateAndApplyCoordinatorEpoch(
notifyRemoteLogOffsetsData.getCoordinatorEpoch());
notifyRemoteLogOffsetsData.getCoordinatorEpoch(),
"notifyRemoteLogOffsets");
// update the remote log offsets and delete local segments already copied to
// remote.
TableBucket tb = notifyRemoteLogOffsetsData.getTableBucket();
Expand All @@ -650,7 +648,8 @@ public void notifyKvSnapshotOffset(
() -> {
// check or apply coordinator epoch.
validateAndApplyCoordinatorEpoch(
notifyKvSnapshotOffsetData.getCoordinatorEpoch());
notifyKvSnapshotOffsetData.getCoordinatorEpoch(),
"notifyKvSnapshotOffset");
// update the snapshot offset.
TableBucket tb = notifyKvSnapshotOffsetData.getTableBucket();
LogTablet logTablet = getReplicaOrException(tb).getLogTablet();
Expand All @@ -668,7 +667,8 @@ public void notifyLakeTableOffset(
() -> {
// check or apply coordinator epoch.
validateAndApplyCoordinatorEpoch(
notifyLakeTableOffsetData.getCoordinatorEpoch());
notifyLakeTableOffsetData.getCoordinatorEpoch(),
"notifyLakeTableOffset");

Map<TableBucket, LakeBucketOffset> lakeBucketOffsets =
notifyLakeTableOffsetData.getLakeBucketOffsets();
Expand Down Expand Up @@ -821,10 +821,10 @@ private void addFetcherForReplicas(
new NotifyLeaderAndIsrResultForBucket(
tb,
ApiError.fromThrowable(
new NotLeaderOrFollowerException(
new StorageException(
String.format(
"Could not find leader for follower replica %s while make "
+ "leader for table bucket %s",
+ "follower for %s.",
serverId, tb)))));
} else {
// fetch from leader server node with internal endpoint.
Expand All @@ -835,16 +835,20 @@ private void addFetcherForReplicas(
// error to let CoordinatorServer retry sending makeLeaderOrFollower request.
// This situation will be happened if the leader serverNode is offline and
// didn't recovery now.
LOG.error(
"Could not find leader {} in server metadata for replica {} while make follower for {}.",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'll be much better to print the leader id ..

leaderId,
serverId,
tb);
result.put(
tb,
new NotifyLeaderAndIsrResultForBucket(
tb,
ApiError.fromThrowable(
new NotLeaderOrFollowerException(
new StorageException(
String.format(
"Could not find leader in server metadata by id "
+ "for replica %s while make follower",
replica)))));
"Could not find leader %d in server metadata for replica %d while make follower for %s.",
leaderId, serverId, tb)))));
} else {
// For these replicas whose leader id has been set and the server id is in the
// metadata. We need to add fetcher for these replicas.
Expand Down Expand Up @@ -1422,14 +1426,14 @@ private void truncateToHighWatermark(List<Replica> replicas) {
}
}

private void validateAndApplyCoordinatorEpoch(int requestCoordinatorEpoch) {
private void validateAndApplyCoordinatorEpoch(int requestCoordinatorEpoch, String requestName) {
if (requestCoordinatorEpoch < this.coordinatorEpoch) {
String errorMessage =
String.format(
"invalid coordinator epoch %s in stopReplica request, "
"invalid coordinator epoch %s in %s request, "
+ "The latest known coordinator epoch is %s.",
requestCoordinatorEpoch, this.coordinatorEpoch);
LOG.warn("Ignore the stopReplica request because {}", errorMessage);
requestCoordinatorEpoch, requestName, this.coordinatorEpoch);
LOG.warn("Ignore the {} request because {}", requestName, errorMessage);
throw new InvalidCoordinatorException(errorMessage);
} else {
this.coordinatorEpoch = requestCoordinatorEpoch;
Expand Down Expand Up @@ -1514,19 +1518,6 @@ private boolean isRequiredAcksInvalid(int requiredAcks) {
return requiredAcks != 0 && requiredAcks != 1 && requiredAcks != -1;
}

private Schema getSchemaFromZk(TablePath tablePath) throws Exception {
int schemaId = zkClient.getCurrentSchemaId(tablePath);
Optional<SchemaInfo> schemaInfoOpt = zkClient.getSchemaById(tablePath, schemaId);
SchemaInfo schemaInfo =
schemaInfoOpt.orElseThrow(
() ->
new FlussRuntimeException(
String.format(
"The schema of table %s not found in zookeeper.",
tablePath)));
return schemaInfo.getSchema();
}

@VisibleForTesting
public DelayedOperationManager<DelayedWrite<?>> getDelayedWriteManager() {
return delayedWriteManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ void testBecomeLeaderOrFollowerWithOneTabletServerOffline() throws Exception {
.get();
result = getNotifyLeaderAndIsrResponseData(notifyLeaderAndIsrResponse);
assertThat(result.size()).isEqualTo(1);
assertThat(result.get(0).getErrorCode()).isEqualTo(Errors.NOT_LEADER_OR_FOLLOWER.code());
assertThat(result.get(0).getErrorCode()).isEqualTo(Errors.STORAGE_EXCEPTION.code());
}

private static void assertPutKvResponse(PutKvResponse putKvResponse) {
Expand Down