-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MINOR: migrate BrokerCompressionTest to storage module #19277
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Leave a minor comment.
/* Configure broker-side compression */ | ||
UnifiedLog log = UnifiedLog.create( | ||
logDir, | ||
new LogConfig(logProps), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change this like following, so we don't need logProps
?
new LogConfig(Map.of(TopicConfig.COMPRESSION_TYPE_CONFIG, brokerCompressionType.name)),
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TaiJuWu : Thanks for the patch.
I have a few comments.
List<Arguments> args = new ArrayList<>(); | ||
for (BrokerCompressionType brokerCompression : BrokerCompressionType.values()) { | ||
for (CompressionType messageCompression : CompressionType.values()) { | ||
args.add(Arguments.of(messageCompression, brokerCompression)); | ||
} | ||
} | ||
return args.stream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return Arrays.stream(BrokerCompressionType.values())
.flatMap(brokerCompression -> Arrays.stream(CompressionType.values())
.map(messageCompression -> Arguments.of(messageCompression, brokerCompression)));
return fetchInfo.records.batches().iterator().next(); | ||
} | ||
|
||
private static Stream<Arguments> parameters() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please give it a meaningful name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the name to allCompressionParameters
.
Thanks for suggesting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this patch, left some nit comments
} | ||
} | ||
|
||
private static RecordBatch readBatch(UnifiedLog log, int offset) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parameter offset
is always 0, so I think we can remove it.
if (brokerCompressionType != BrokerCompressionType.PRODUCER) { | ||
RecordBatch batch = readBatch(log, 0); | ||
Compression targetCompression = BrokerCompressionType.targetCompression(log.config().compression, null); | ||
assertEquals(targetCompression.type(), batch.compressionType(), "Compression at offset 0 should produce " + brokerCompressionType); | ||
} else { | ||
assertEquals(messageCompressionType, readBatch(log, 0).compressionType(), "Compression at offset 0 should produce " + messageCompressionType); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RecordBatch batch = readBatch(log, 0);
can outside the if-else condition.
There are two change for this PR.
BrokerCompressionTest
from core to storageBrokerCompressionTest
from scala to java