Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17813: Moving broker endpoint class and common server connection id #17519

Merged
merged 10 commits into from
Oct 22, 2024

Conversation

apoorvmittal10
Copy link
Collaborator

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@chia7712
Copy link
Contributor

@apoorvmittal10 I love this cleanup and migration. Could you please rebase code to fix conflicts as #17473 is already merged

@apoorvmittal10
Copy link
Collaborator Author

@apoorvmittal10 I love this cleanup and migration. Could you please rebase code to fix conflicts as #17473 is already merged

Thanks @chia7712, sure. I ll force push the branch once #17474 is also merged, as the changes are on top of that. Hence keeping the PR in draft mode till then.

@apoorvmittal10
Copy link
Collaborator Author

@chia7712 Yeah, sorry #17474 is not required for this PR. I have rebased it and marked for review. Please if you could take a look.

@apoorvmittal10 apoorvmittal10 marked this pull request as ready for review October 17, 2024 07:50
@apoorvmittal10
Copy link
Collaborator Author

Looking into the tests failure.

Copy link
Member

@brandboat brandboat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the refactor !!!

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 thanks for this refactor

}

try {
return BrokerEndPoint.parseHostPort(split[0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

            return BrokerEndPoint.parseHostPort(split[0])
                .flatMap(localHost -> BrokerEndPoint.parseHostPort(split[1])
                    .map(remoteHost -> new ServerConnectionId(localHost, remoteHost, Integer.parseInt(split[2]), Integer.parseInt(split[3]))));

we don't need to wrap it by Optional.of

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, BrokerEndPoint.parseHostPort is used by ServerConnectionId only, so maybe we can inline it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, I have addressed the first comment. Sorry I didn't understand the inline comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Chia is suggesting to move BrokerEndPoint.parseHostPort as a static method in this class. Since the connection string doesn't contain brokerId. It doesn't quite fit into BrokerEndPoint.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved it same class now.

@@ -1221,7 +1206,7 @@ private[kafka] class Processor(
private def processDisconnected(): Unit = {
selector.disconnected.keySet.forEach { connectionId =>
try {
val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
val remoteHost = ServerConnectionId.fromString(connectionId).orElseThrow { () =>
throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe ServerConnectionId.fromString can throw IllegalStateException instead of returning Optional?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to maintain least refactor while keeping original context. I ll look into this as well.

@apoorvmittal10
Copy link
Collaborator Author

@chia7712 @brandboat Thanks for the review, can you please re-look.

Copy link
Member

@brandboat brandboat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks !

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the PR. Left a few comments.

* limitations under the License.
*/

package org.apache.kafka.common.network;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used on the server. Could we put it under the server-common module?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, but then we have to remove the generateConnectionId method from this class as that's used by NioEchoServer. And if we remove that then there is no consolidation between Selector.java and SocketServer.scala related connectionId methods. Wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. We can keep it in the client module then.

/**
* ServerConnectionId is used to uniquely identify a connection between a client and a server.
*/
public class ServerConnectionId {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ServerConnectionId => ConnectionId ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep this class in the client module, it's better to give it a more accurate name. Perhaps ConnectionIdOnServer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, I somehow felt ServerConnectionId is better as it prefixes with server itself. I can write the class description appropraitely. If you still think it's not appropriate then I ll move it to ConnectionIdOnServer. Please let me know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ServerConnectionId needs to remain in the clients module since it's also used by the clients' test code. Perhaps we could rewrite NioEchoServer to use a different connection ID scheme to decouple it. In #16460, we wanted to add a more meaningful string, but that's not necessary for aligning with the core server code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 what do you think abort moving ServerConnectionId to server module?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 That we can't do, as we discussed that here: #17519 (comment)

try {
return BrokerEndPoint.parseHostPort(split[0])
.flatMap(localHost -> BrokerEndPoint.parseHostPort(split[1])
.map(remoteHost -> new ServerConnectionId(localHost, remoteHost, Integer.parseInt(split[2]), Integer.parseInt(split[3]))));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed the way it's written now.

}

try {
return BrokerEndPoint.parseHostPort(split[0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Chia is suggesting to move BrokerEndPoint.parseHostPort as a static method in this class. Since the connection string doesn't contain brokerId. It doesn't quite fit into BrokerEndPoint.

* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.network;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used on the server. Could we put it under the server-common module?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the other class is in clients uses BrokerEnpoint now henc kept it in clients itself, under network. As that's the only place I found to aggregate where clients and server related classes can go.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to server-common as we have now moved to map entry as per @chia7712's comment.

@apoorvmittal10
Copy link
Collaborator Author

I have addressed the comments, 1 with the name is pending. If the name doesn't seems right for ServerConnectionId then I ll change it. Though I have added the class description.

* Note that unlike EndPoint (or listener) this URI has no security information.
*/
// Visible for testing
static Optional<BrokerEndPoint> parseHostPort(String connectionString) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please replace BrokerEndPoint by Map.Entry? we are under java 11, so it is easier to create Entry now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, though not a big fan of Map.Entry but seems sensible here as there was no broker id for BrokerEndpoint.

@apoorvmittal10
Copy link
Collaborator Author

I have resolved the comments, build passed.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. A couple of more comments.

@@ -25,7 +25,7 @@ import java.util
import java.util.Optional
import java.util.concurrent._
import java.util.concurrent.atomic._
import kafka.cluster.{BrokerEndPoint, EndPoint}
import kafka.cluster.{EndPoint}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the brackets since there is a single import.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed it, thought spotlessApply will fix. Done.

assertEquals(9092, brokerEndPoint.get().port());

// IPv6 endpoint
brokerEndPoint = ServerConnectionId.parseHostPort("[2001:db8::1]:9092");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, an interesting thing is that if we take a ServerConnectionId with host name of [2001:db8::1] and call ServerConnectionId.fromString, we will get a slightly different ServerConnectionId since the hostname will not have the bracket. It's not an issue now, but it may cause confusion in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ServerConnectionId doesn't face ambiguity like a URL does, so in my opinion, the hostname in ServerConnectionId doesn't need brackets.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I need to dix anything in this? I have just used the exisiting regex for parsing, from scala to java.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is not required. I have pushed an additional test for same.

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@apoorvmittal10
Copy link
Collaborator Author

@junrao Thanks for the feedback and review. Please let me know if I need to address more. Build has passes for the PR.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. A couple of more comments.

*/
public class ServerConnectionId {

private static final Pattern URI_PARSE_EXP = Pattern.compile("\\[?([0-9a-zA-Z\\-%._:]*)]?:([0-9]+)");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of things.

  1. Utils.HOST_PORT_PATTERN is more general than this. We could just reuse the former instead.
  2. It seems that we have an existing bug. InetAddress.getHostAddress() doesn't contain brackets. When we generate a parsable string for host:port, we need to add brackets for the host part since it can contain colon. For example, we need to do that for bootstrap.servers. However, in Processor, when generating a connection string, we just concatenate the host and the port without adding the brackets. This means that when we try to convert the connection string back to ConnectionId, we can't parse the hostname properly if it's IPV6. To fix this, it seems that we need to add brackets to the hostname when generating the connection string.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Utils.HOST_PORT_PATTERN is more general than this. We could just reuse the former instead.

I initially looked at it but didn't use that as it also has protocol parsing which we don't require. And moreover I thought to port same scala version to java.

This means that when we try to convert the connection string back to ConnectionId, we can't parse the hostname properly if it's IPV6. To fix this, it seems that we need to add brackets to the hostname when generating the connection string.

I don't think it's a bug, I am writing a test case for same as well. The regex splits on the end : hence the host and port will be constructed correctly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have pushed a test case as well for ipv6.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. It's fine to have a separate regex just for ServerConnectionId. If so, the host name in the connection string will never have brackets in it, right? Then, it would be useful to change the regex, the comment and the tests to remove the support on brackets accordingly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, the host name in the connection string will never have brackets in it, right?

I am not sure about this but the code handles with or without brackets. I think it's the right behaviour. The test cases I wrote additionally verifies that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also just trying to be consistent to the scala version of the previous code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this but the code handles with or without brackets. I think it's the right behaviour.

Well, we create the connection string in two paths. The first one gets the hostname from InetAddress.getHostAddress(), which shouldn't include brackets according to https://groups.google.com/g/golang-dev/c/AiC3MffM4-U. The second one gets the hostname from ServerConnectionId.parseHostPort() which stripes out the brackets if present.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure @junrao, I have made the change.

@apoorvmittal10
Copy link
Collaborator Author

@junrao I have addressed the comment, removed brackets, corrected tests and comments. Please can you review.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. Just a minor comment.

// The regex for parsing the URI string. The URI string should be in the format of "host:port",
// where host can be an IPv4 address or an IPv6 address.
// Note: The IPv6 address should not be enclosed in square brackets.
private static final Pattern URI_PARSE_EXP = Pattern.compile("([0-9a-zA-Z\\-%._:]*):([0-9]+)");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is not really a URI, should we rename URI_PARSE_EXP to sth like HOST_PORT_PARSE_EXP?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. LGTM

@junrao junrao merged commit 25a3590 into apache:trunk Oct 22, 2024
6 checks passed
abhishekgiri23 pushed a commit to abhishekgiri23/kafka that referenced this pull request Nov 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants