Skip to content

Commit

Permalink
KAFKA-17813: Moving broker endpoint class and common server connectio…
Browse files Browse the repository at this point in the history
…n id (apache#17519)


Reviewers: Chia-Ping Tsai <[email protected]>, Kuan-Po Tseng <[email protected]>, Jun Rao <[email protected]>
  • Loading branch information
apoorvmittal10 authored and agiri23 committed Nov 2, 2024
1 parent c8a5a22 commit 59eb5dd
Show file tree
Hide file tree
Showing 28 changed files with 472 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,22 +233,6 @@ public Selector(long connectionMaxIdleMS, int failedAuthenticationDelayMs, Metri
this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, failedAuthenticationDelayMs, metrics, time, metricGrpPrefix, Collections.emptyMap(), true, channelBuilder, logContext);
}

/**
* Generates a unique connection ID for the given socket.
*
* @param socket The socket for which the connection ID is to be generated.
* @param processorId The ID of the server processor that will handle this connection.
* @param connectionIndex The index to be used in the connection ID to ensure uniqueness.
* @return A string representing the unique connection ID.
*/
public static String generateConnectionId(Socket socket, int processorId, int connectionIndex) {
String localHost = socket.getLocalAddress().getHostAddress();
int localPort = socket.getLocalPort();
String remoteHost = socket.getInetAddress().getHostAddress();
int remotePort = socket.getPort();
return localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort + "-" + processorId + "-" + connectionIndex;
}

/**
* Begin connecting to the given address and add the connection to this nioSelector associated with the given id
* number.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.network;

import java.net.Socket;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* ServerConnectionId is used to uniquely identify a connection on server for the client. The
* connection id is in the format of "localHost:localPort-remoteHost:remotePort-processorId-index".
* The processorId is the id of the processor that will handle this connection and the index is
* used to ensure uniqueness.
*/
public class ServerConnectionId {

// The regex for parsing the host:port string, where host can be an IPv4 address or an IPv6 address.
// Note: The IPv6 address should not be enclosed in square brackets.
private static final Pattern HOST_PORT_PARSE_EXP = Pattern.compile("([0-9a-zA-Z\\-%._:]*):([0-9]+)");

private final String localHost;
private final int localPort;
private final String remoteHost;
private final int remotePort;
private final int processorId;
private final int index;

public ServerConnectionId(
String localHost,
int localPort,
String remoteHost,
int remotePort,
int processorId,
int index
) {
this.localHost = localHost;
this.localPort = localPort;
this.remoteHost = remoteHost;
this.remotePort = remotePort;
this.processorId = processorId;
this.index = index;
}

private ServerConnectionId(
Map.Entry<String, Integer> localEndpoint,
Map.Entry<String, Integer> remoteEndpoint,
int processorId,
int index
) {
this(localEndpoint.getKey(), localEndpoint.getValue(), remoteEndpoint.getKey(), remoteEndpoint.getValue(), processorId, index);
}

public String localHost() {
return localHost;
}

public int localPort() {
return localPort;
}

public String remoteHost() {
return remoteHost;
}

public int remotePort() {
return remotePort;
}

public int processorId() {
return processorId;
}

public int index() {
return index;
}

/**
* Returns an optional ServerConnectionId object from the given connection ID string.
*
* @param connectionIdString The connection ID string to parse.
* @return An optional ServerConnectionId object.
*/
public static Optional<ServerConnectionId> fromString(String connectionIdString) {
String[] split = connectionIdString.split("-");
if (split.length != 4) {
return Optional.empty();
}

try {
return parseHostPort(split[0]).flatMap(localHost -> parseHostPort(split[1]).map(
remoteHost -> new ServerConnectionId(localHost, remoteHost, Integer.parseInt(split[2]), Integer.parseInt(split[3]))));
} catch (NumberFormatException e) {
return Optional.empty();
}
}

/**
* Generates a unique connection ID for the given socket.
*
* @param socket The socket for which the connection ID is to be generated.
* @param processorId The ID of the server processor that will handle this connection.
* @param connectionIndex The index to be used in the connection ID to ensure uniqueness.
* @return A string representing the unique connection ID.
*/
public static String generateConnectionId(Socket socket, int processorId, int connectionIndex) {
String localHost = socket.getLocalAddress().getHostAddress();
int localPort = socket.getLocalPort();
String remoteHost = socket.getInetAddress().getHostAddress();
int remotePort = socket.getPort();
return localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort + "-" + processorId + "-" + connectionIndex;
}

/**
* Map entry consists of host:port or ipv6_host:port
*/
// Visible for testing
static Optional<Map.Entry<String, Integer>> parseHostPort(String connectionString) {
Matcher matcher = HOST_PORT_PARSE_EXP.matcher(connectionString);
if (matcher.matches()) {
try {
return Optional.of(Map.entry(matcher.group(1), Integer.parseInt(matcher.group(2))));
} catch (NumberFormatException e) {
// Ignore
}
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ private static boolean maybeBeginServerReauthentication(KafkaChannel channel, Ne
}

private String id(SocketChannel channel) {
String connectionId = Selector.generateConnectionId(channel.socket(), 0, nextConnectionIndex);
String connectionId = ServerConnectionId.generateConnectionId(channel.socket(), 0, nextConnectionIndex);
if (nextConnectionIndex == Integer.MAX_VALUE)
nextConnectionIndex = 0;
else
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.network;

import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.Map;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ServerConnectionIdTest {

@Test
public void testFromString() {
// Test valid connection id
String connectionIdString = "localhost:9092-localhost:9093-1-2";
Optional<ServerConnectionId> serverConnectionIdOptional = ServerConnectionId.fromString(connectionIdString);
assertTrue(serverConnectionIdOptional.isPresent());
ServerConnectionId serverConnectionId = serverConnectionIdOptional.get();

assertEquals("localhost", serverConnectionId.localHost());
assertEquals(9092, serverConnectionId.localPort());
assertEquals("localhost", serverConnectionId.remoteHost());
assertEquals(9093, serverConnectionId.remotePort());
assertEquals(1, serverConnectionId.processorId());
assertEquals(2, serverConnectionId.index());

connectionIdString = "localhost:9092-127.0.0.1:9093-0-0";
serverConnectionIdOptional = ServerConnectionId.fromString(connectionIdString);
assertTrue(serverConnectionIdOptional.isPresent());
serverConnectionId = serverConnectionIdOptional.get();

assertEquals("localhost", serverConnectionId.localHost());
assertEquals(9092, serverConnectionId.localPort());
assertEquals("127.0.0.1", serverConnectionId.remoteHost());
assertEquals(9093, serverConnectionId.remotePort());
assertEquals(0, serverConnectionId.processorId());
assertEquals(0, serverConnectionId.index());

// IPv6 endpoints
connectionIdString = "2001:db8:0:0:0:0:0:1:9092-127.0.0.1:9093-1-2";
serverConnectionIdOptional = ServerConnectionId.fromString(connectionIdString);
assertTrue(serverConnectionIdOptional.isPresent());
serverConnectionId = serverConnectionIdOptional.get();

assertEquals("2001:db8:0:0:0:0:0:1", serverConnectionId.localHost());
assertEquals(9092, serverConnectionId.localPort());
assertEquals("127.0.0.1", serverConnectionId.remoteHost());
assertEquals(9093, serverConnectionId.remotePort());
assertEquals(1, serverConnectionId.processorId());
assertEquals(2, serverConnectionId.index());

connectionIdString = "2002:db9:1:0:0:0:0:1:9092-2001:db8::1:9093-0-1";
serverConnectionIdOptional = ServerConnectionId.fromString(connectionIdString);
assertTrue(serverConnectionIdOptional.isPresent());
serverConnectionId = serverConnectionIdOptional.get();

assertEquals("2002:db9:1:0:0:0:0:1", serverConnectionId.localHost());
assertEquals(9092, serverConnectionId.localPort());
assertEquals("2001:db8::1", serverConnectionId.remoteHost());
assertEquals(9093, serverConnectionId.remotePort());
assertEquals(0, serverConnectionId.processorId());
assertEquals(1, serverConnectionId.index());
}

@Test
public void testFromStringInvalid() {
// Test invalid connection id params length
String connectionIdString = "localhost:9092-localhost:9093-1";
assertFalse(ServerConnectionId.fromString(connectionIdString).isPresent());

connectionIdString = "localhost:9092-localhost:9093-1-2-3";
assertFalse(ServerConnectionId.fromString(connectionIdString).isPresent());

// Invalid separator
connectionIdString = "localhost-9092-localhost:9093-1-2";
assertFalse(ServerConnectionId.fromString(connectionIdString).isPresent());

connectionIdString = "localhost:9092:localhost-9093-1-2";
assertFalse(ServerConnectionId.fromString(connectionIdString).isPresent());

// No separator in port
connectionIdString = "localhost9092-localhost:9093-1-2";
assertFalse(ServerConnectionId.fromString(connectionIdString).isPresent());

connectionIdString = "localhost:9092-localhost9093-1-2";
assertFalse(ServerConnectionId.fromString(connectionIdString).isPresent());

// Invalid port
connectionIdString = "localhost:abcd-localhost:9093-1-2";
assertFalse(ServerConnectionId.fromString(connectionIdString).isPresent());

connectionIdString = "localhost:9092-localhost:abcd-1-2";
assertFalse(ServerConnectionId.fromString(connectionIdString).isPresent());

// Invalid processorId
connectionIdString = "localhost:9092-localhost:9093-a-2";
assertFalse(ServerConnectionId.fromString(connectionIdString).isPresent());

// Invalid index
connectionIdString = "localhost:9092-localhost:9093-1-b";
assertFalse(ServerConnectionId.fromString(connectionIdString).isPresent());

// Invalid IPv6 address
connectionIdString = "[2001:db8:0:0:0:0:0:1]:9092-127.0.0.1:9093-1-2";
assertFalse(ServerConnectionId.fromString(connectionIdString).isPresent());
}

@Test
public void testGenerateConnectionId() throws IOException {
Socket socket = mock(Socket.class);
when(socket.getLocalAddress()).thenReturn(InetAddress.getByName("127.0.0.1"));
when(socket.getLocalPort()).thenReturn(9092);
when(socket.getInetAddress()).thenReturn(InetAddress.getByName("127.0.0.1"));
when(socket.getPort()).thenReturn(9093);

assertEquals("127.0.0.1:9092-127.0.0.1:9093-0-0", ServerConnectionId.generateConnectionId(socket, 0, 0));
assertEquals("127.0.0.1:9092-127.0.0.1:9093-1-2", ServerConnectionId.generateConnectionId(socket, 1, 2));
}

@Test
public void testGenerateConnectionIdIpV6() throws IOException {
Socket socket = mock(Socket.class);
// The test should pass when the address is enclosed in brackets for socket. As getHostAddress()
// returns the address without brackets.
when(socket.getLocalAddress()).thenReturn(InetAddress.getByName("[2001:db8::1]"));
when(socket.getLocalPort()).thenReturn(9092);
when(socket.getInetAddress()).thenReturn(InetAddress.getByName("127.0.0.1"));
when(socket.getPort()).thenReturn(9093);

assertEquals("2001:db8:0:0:0:0:0:1:9092-127.0.0.1:9093-1-2", ServerConnectionId.generateConnectionId(socket, 1, 2));

when(socket.getLocalAddress()).thenReturn(InetAddress.getByName("[2002:db9:1::1]"));
when(socket.getLocalPort()).thenReturn(9092);
when(socket.getInetAddress()).thenReturn(InetAddress.getByName("[2001:db8::1]"));
when(socket.getPort()).thenReturn(9093);

assertEquals("2002:db9:1:0:0:0:0:1:9092-2001:db8:0:0:0:0:0:1:9093-1-2", ServerConnectionId.generateConnectionId(socket, 1, 2));

// Without brackets
when(socket.getLocalAddress()).thenReturn(InetAddress.getByName("2002:db9:1::1"));
when(socket.getLocalPort()).thenReturn(9092);
when(socket.getInetAddress()).thenReturn(InetAddress.getByName("2001:db8::1"));
when(socket.getPort()).thenReturn(9093);

assertEquals("2002:db9:1:0:0:0:0:1:9092-2001:db8:0:0:0:0:0:1:9093-1-2", ServerConnectionId.generateConnectionId(socket, 1, 2));
}

@Test
public void testParseHostPort() {
Optional<Map.Entry<String, Integer>> hostPortEntry = ServerConnectionId.parseHostPort("myhost:9092");
assertTrue(hostPortEntry.isPresent());
assertEquals("myhost", hostPortEntry.get().getKey());
assertEquals(9092, hostPortEntry.get().getValue());

hostPortEntry = ServerConnectionId.parseHostPort("127.0.0.1:9092");
assertTrue(hostPortEntry.isPresent());
assertEquals("127.0.0.1", hostPortEntry.get().getKey());
assertEquals(9092, hostPortEntry.get().getValue());

// IPv6 endpoint
hostPortEntry = ServerConnectionId.parseHostPort("2001:db8::1:9092");
assertTrue(hostPortEntry.isPresent());
assertEquals("2001:db8::1", hostPortEntry.get().getKey());
assertEquals(9092, hostPortEntry.get().getValue());
}

@Test
public void testParseHostPortInvalid() {
// Invalid separator
Optional<Map.Entry<String, Integer>> hostPortEntry = ServerConnectionId.parseHostPort("myhost-9092");
assertFalse(hostPortEntry.isPresent());

// No separator
hostPortEntry = ServerConnectionId.parseHostPort("myhost9092");
assertFalse(hostPortEntry.isPresent());

// Invalid port
hostPortEntry = ServerConnectionId.parseHostPort("myhost:abcd");
assertFalse(hostPortEntry.isPresent());

// Invalid IPv6 endpoint
hostPortEntry = ServerConnectionId.parseHostPort("[2001:db8::1]:9092");
assertFalse(hostPortEntry.isPresent());
}
}
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/cluster/Broker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.metadata.{BrokerRegistration, VersionRange}
import org.apache.kafka.server.authorizer.AuthorizerServerInfo
import org.apache.kafka.server.network.BrokerEndPoint

import scala.collection.Seq
import scala.jdk.CollectionConverters._
Expand Down
Loading

0 comments on commit 59eb5dd

Please sign in to comment.