Skip to content

Commit

Permalink
KAFKA-17813: Moving broker endpoint class and common server connectio…
Browse files Browse the repository at this point in the history
…n id
  • Loading branch information
apoorvmittal10 committed Oct 16, 2024
1 parent 7f5238b commit 94ac636
Show file tree
Hide file tree
Showing 29 changed files with 436 additions and 127 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.network;

import java.util.Objects;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* BrokerEndPoint is used to connect to specific host:port pair.
* It is typically used by clients (or brokers when connecting to other brokers)
* and contains no information about the security protocol used on the connection.
* Clients should know which security protocol to use from configuration.
* This allows us to keep the wire protocol with the clients unchanged where the protocol is not needed.
*/
public class BrokerEndPoint {

private static final Pattern URI_PARSE_EXP = Pattern.compile("\\[?([0-9a-zA-Z\\-%._:]*)]?:([0-9]+)");

private final int id;
private final String host;
private final int port;

public BrokerEndPoint(int id, String host, int port) {
this.id = id;
this.host = host;
this.port = port;
}

/**
* This constructor is only used by the static parseHostPort method, which acts as a factory method
* to create a BrokerEndPoint from a host:port string.
*/
private BrokerEndPoint(String host, int port) {
this(0, host, port);
}

public int id() {
return id;
}

public String host() {
return host;
}

public int port() {
return port;
}

/**
* BrokerEndPoint URI is host:port or [ipv6_host]:port
* Note that unlike EndPoint (or listener) this URI has no security information.
*/
public static Optional<BrokerEndPoint> parseHostPort(String connectionString) {
Matcher matcher = URI_PARSE_EXP.matcher(connectionString);
if (matcher.matches()) {
try {
return Optional.of(new BrokerEndPoint(matcher.group(1), Integer.parseInt(matcher.group(2))));
} catch (NumberFormatException e) {
// Ignore
}
}
return Optional.empty();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

BrokerEndPoint that = (BrokerEndPoint) o;
return id != that.id && host.equals(that.host) && port != that.port;
}

@Override
public int hashCode() {
return Objects.hash(id, host, port);
}

public String toString() {
return String.format("BrokerEndPoint(id=%s, host=%s:%s)", id, host, port);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,22 +233,6 @@ public Selector(long connectionMaxIdleMS, int failedAuthenticationDelayMs, Metri
this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, failedAuthenticationDelayMs, metrics, time, metricGrpPrefix, Collections.emptyMap(), true, channelBuilder, logContext);
}

/**
* Generates a unique connection ID for the given socket.
*
* @param socket The socket for which the connection ID is to be generated.
* @param processorId The ID of the server processor that will handle this connection.
* @param connectionIndex The index to be used in the connection ID to ensure uniqueness.
* @return A string representing the unique connection ID.
*/
public static String generateConnectionId(Socket socket, int processorId, int connectionIndex) {
String localHost = socket.getLocalAddress().getHostAddress();
int localPort = socket.getLocalPort();
String remoteHost = socket.getInetAddress().getHostAddress();
int remotePort = socket.getPort();
return localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort + "-" + processorId + "-" + connectionIndex;
}

/**
* Begin connecting to the given address and add the connection to this nioSelector associated with the given id
* number.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.network;

import java.net.Socket;
import java.util.Optional;

/**
* ServerConnectionId is used to uniquely identify a connection between a client and a server.
*/
public class ServerConnectionId {

private final String localHost;
private final int localPort;
private final String remoteHost;
private final int remotePort;
private final int processorId;
private final int index;

public ServerConnectionId(
BrokerEndPoint localEndpoint,
BrokerEndPoint remoteEndpoint,
int processorId,
int index
) {
this(localEndpoint.host(), localEndpoint.port(), remoteEndpoint.host(), remoteEndpoint.port(), processorId, index);
}

public ServerConnectionId(
String localHost,
int localPort,
String remoteHost,
int remotePort,
int processorId,
int index
) {
this.localHost = localHost;
this.localPort = localPort;
this.remoteHost = remoteHost;
this.remotePort = remotePort;
this.processorId = processorId;
this.index = index;
}

public String localHost() {
return localHost;
}

public int localPort() {
return localPort;
}

public String remoteHost() {
return remoteHost;
}

public int remotePort() {
return remotePort;
}

public int processorId() {
return processorId;
}

public int index() {
return index;
}

/**
* Returns an optional ServerConnectionId object from the given connection ID string.
*
* @param connectionIdString The connection ID string to parse.
* @return An optional ServerConnectionId object.
*/
public static Optional<ServerConnectionId> fromString(String connectionIdString) {
String[] split = connectionIdString.split("-");
if (split.length != 4) {
return Optional.empty();
}

try {
return BrokerEndPoint.parseHostPort(split[0])
.flatMap(localHost -> BrokerEndPoint.parseHostPort(split[1])
.flatMap(remoteHost -> Optional.of(new ServerConnectionId(localHost, remoteHost, Integer.parseInt(split[2]), Integer.parseInt(split[3])))));
} catch (NumberFormatException e) {
return Optional.empty();
}
}

/**
* Generates a unique connection ID for the given socket.
*
* @param socket The socket for which the connection ID is to be generated.
* @param processorId The ID of the server processor that will handle this connection.
* @param connectionIndex The index to be used in the connection ID to ensure uniqueness.
* @return A string representing the unique connection ID.
*/
public static String generateConnectionId(Socket socket, int processorId, int connectionIndex) {
String localHost = socket.getLocalAddress().getHostAddress();
int localPort = socket.getLocalPort();
String remoteHost = socket.getInetAddress().getHostAddress();
int remotePort = socket.getPort();
return localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort + "-" + processorId + "-" + connectionIndex;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.network;

import org.junit.jupiter.api.Test;

import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class BrokerEndPointTest {

@Test
public void testParseHostPort() {
Optional<BrokerEndPoint> brokerEndPoint = BrokerEndPoint.parseHostPort("myhost:9092");
assertTrue(brokerEndPoint.isPresent());
assertEquals("myhost", brokerEndPoint.get().host());
assertEquals(9092, brokerEndPoint.get().port());

brokerEndPoint = BrokerEndPoint.parseHostPort("127.0.0.1:9092");
assertTrue(brokerEndPoint.isPresent());
assertEquals("127.0.0.1", brokerEndPoint.get().host());
assertEquals(9092, brokerEndPoint.get().port());
}

@Test
public void testParseHostPortInvalid() {
// Invalid separator
Optional<BrokerEndPoint> brokerEndPoint = BrokerEndPoint.parseHostPort("myhost-9092");
assertFalse(brokerEndPoint.isPresent());

// No separator
brokerEndPoint = BrokerEndPoint.parseHostPort("myhost9092");
assertFalse(brokerEndPoint.isPresent());

// Invalid port
brokerEndPoint = BrokerEndPoint.parseHostPort("myhost:abcd");
assertFalse(brokerEndPoint.isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ private static boolean maybeBeginServerReauthentication(KafkaChannel channel, Ne
}

private String id(SocketChannel channel) {
String connectionId = Selector.generateConnectionId(channel.socket(), 0, nextConnectionIndex);
String connectionId = ServerConnectionId.generateConnectionId(channel.socket(), 0, nextConnectionIndex);
if (nextConnectionIndex == Integer.MAX_VALUE)
nextConnectionIndex = 0;
else
Expand Down
Loading

0 comments on commit 94ac636

Please sign in to comment.