-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17564: Move BrokerFeatures to server module #17228
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @m1a2st
I have few comments, PTAL
server/src/main/java/org/apache/kafka/server/BrokerFeatures.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/kafka/server/BrokerFeatures.java
Outdated
Show resolved
Hide resolved
@frankvicky thanks for your comments, addressed them. |
# Conflicts: # core/src/test/scala/unit/kafka/controller/KafkaControllerTest.scala # core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
public void setSupportedFeatures(Features<SupportedVersionRange> newFeatures) { | ||
Map<String, SupportedVersionRange> combined = new HashMap<>(supportedFeatures.features()); | ||
combined.putAll(newFeatures.features()); | ||
supportedFeatures = Features.supportedFeatures(combined); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is weird to make supportedFeatures
mutable for testing-only. Could you please add a new helper for testing?
public static BrokerFeatures createDefault(boolean unstableFeatureVersionsEnabled, Features<SupportedVersionRange> newFeatures) {
Map<String, SupportedVersionRange> combined = new HashMap<>(defaultSupportedFeatures(unstableFeatureVersionsEnabled).features());
combined.putAll(newFeatures.features());
return new BrokerFeatures(Features.supportedFeatures(combined));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but the volatile
can't use with final
keyword, in this case I think we should reserve the volatile
. If we can't make supportedFeatures
immutable, Do we still need to use this helper?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we can't make supportedFeatures immutable, Do we still need to use this helper?
Yes, I proposed making supportedFeatures immutable. Are there any production use cases where supportedFeatures would need to be changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right, I will make it to final.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@m1a2st thanks for your patch!
.collect(HashMap::new, (m, e) -> { | ||
String name = e.getKey(); | ||
SupportedVersionRange versionRange = e.getValue(); | ||
if ("kraft.version".equals(name)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return supportedFeatures.features().entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey,
e -> e.getKey().equals(KRaftVersion.FEATURE_NAME) ? 0 : e.getValue().max()));
.entrySet() | ||
.stream() | ||
.map(entry -> transferToIncompatibleFeaturesInfo(supportedFeatures, entry)) | ||
.filter(info -> info.errorReason != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe transferToIncompatibleFeaturesInfo
should return the feature with error to avoid extra filter?
} | ||
|
||
public static Map<String, VersionRange> createDefaultFeatureMap(BrokerFeatures features) { | ||
Map<String, SupportedVersionRange> supportedFeatures = features.supportedFeatures.features(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return features.supportedFeatures.features()
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> VersionRange.of(e.getValue().min(), e.getValue().max())));
Map<String, Short> finalizedFeatures, | ||
boolean logIncompatibilities) { | ||
Map<IncompatibleFeaturesInfo, String> incompatibleFeaturesInfoToErrorReason = | ||
transferToIncompatibleFeaturesInfoToErrorCode(supportedFeatures, finalizedFeatures); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please inline transferToIncompatibleFeaturesInfoToErrorCode? We can loop through the features and log any incompatible ones. This would simplify the code and provide a slight speed improvement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If use StringBuilder to resolve this errorReason, we should deal with last two ", " so the code will be like
private static Map<String, Short> incompatibleFeatures(Features<SupportedVersionRange> supportedFeatures,
Map<String, Short> finalizedFeatures,
boolean logIncompatibilities) {
IncompatibleFeaturesInfosToErrorReasons incompatibleFeaturesInfoToErrorReason =
transferToIncompatibleFeaturesInfoToErrorCode(supportedFeatures, finalizedFeatures);
if (logIncompatibilities && !incompatibleFeaturesInfoToErrorReason.errorReason.isEmpty()) {
log.warn("Feature incompatibilities seen: {}", incompatibleFeaturesInfoToErrorReason.errorReason);
}
return incompatibleFeaturesInfoToErrorReason.incompatibleFeaturesInfo;
}
private static IncompatibleFeaturesInfosToErrorReasons transferToIncompatibleFeaturesInfoToErrorCode(
Features<SupportedVersionRange> supportedFeatures,
Map<String, Short> finalizedFeatures
) {
Map<String, Short> incompatibleFeaturesInfo = new HashMap<>();
StringBuilder errorReasons = new StringBuilder();
finalizedFeatures.forEach((feature, versionLevels) -> {
SupportedVersionRange supportedVersions = supportedFeatures.get(feature);
if (supportedVersions == null) {
incompatibleFeaturesInfo.put(feature, versionLevels);
errorReasons.append(format("{feature=%s, reason='Unsupported feature'}, ", feature));
} else if (supportedVersions.isIncompatibleWith(versionLevels)) {
incompatibleFeaturesInfo.put(feature, versionLevels);
errorReasons.append(format("{feature=%s, reason='%s is incompatible with %s'}, ", feature, versionLevels, supportedVersions));
}
});
return new IncompatibleFeaturesInfosToErrorReasons(
incompatibleFeaturesInfo,
errorReasons.length() == 0 ? "" :
errorReasons.delete(errorReasons.length() - 2, errorReasons.length()).toString()
);
}
private static class IncompatibleFeaturesInfosToErrorReasons {
final Map<String, Short> incompatibleFeaturesInfos;
final String errorReason;
IncompatibleFeaturesInfosToErrorReasons(Map<String, Short> incompatibleFeaturesInfos, String errorReason) {
this.incompatibleFeaturesInfos = incompatibleFeaturesInfos;
this.errorReason = errorReason;
}
}
I prefer to use the List to resolve this, It is more readable
private static Map<String, Short> incompatibleFeatures(Features<SupportedVersionRange> supportedFeatures,
Map<String, Short> finalizedFeatures,
boolean logIncompatibilities) {
IncompatibleFeaturesInfosToErrorReasons incompatibleFeaturesInfoToErrorReason =
transferToIncompatibleFeaturesInfoToErrorCode(supportedFeatures, finalizedFeatures);
if (logIncompatibilities && !incompatibleFeaturesInfoToErrorReason.errorReason.isEmpty()) {
log.warn("Feature incompatibilities seen: {}", incompatibleFeaturesInfoToErrorReason.errorReason);
}
return incompatibleFeaturesInfoToErrorReason.incompatibleFeaturesInfos;
}
private static IncompatibleFeaturesInfosToErrorReasons transferToIncompatibleFeaturesInfoToErrorCode(
Features<SupportedVersionRange> supportedFeatures,
Map<String, Short> finalizedFeatures
) {
Map<String, Short> incompatibleFeaturesInfo = new HashMap<>();
List<String> errorReasons = new ArrayList<>();
finalizedFeatures.forEach((feature, versionLevels) -> {
SupportedVersionRange supportedVersions = supportedFeatures.get(feature);
if (supportedVersions == null) {
incompatibleFeaturesInfo.put(feature, versionLevels);
errorReasons.add(format("{feature=%s, reason='Unknown feature'}", feature));
} else if (supportedVersions.isIncompatibleWith(versionLevels)) {
incompatibleFeaturesInfo.put(feature, versionLevels);
errorReasons.add(format("{feature=%s, reason='%s is incompatible with %s'}", feature, versionLevels, supportedVersions));
}
});
return new IncompatibleFeaturesInfosToErrorReasons(
incompatibleFeaturesInfo,
String.join(", ", errorReasons)
);
}
private static class IncompatibleFeaturesInfosToErrorReasons {
final Map<String, Short> incompatibleFeaturesInfos;
final String errorReason;
IncompatibleFeaturesInfosToErrorReasons(Map<String, Short> incompatibleFeaturesInfos, String errorReason) {
this.incompatibleFeaturesInfos = incompatibleFeaturesInfos;
this.errorReason = errorReason;
}
}
WDYT?
Jira: https://issues.apache.org/jira/browse/KAFKA-17564
Committer Checklist (excluded from commit message)