Skip to content

Commit

Permalink
[improve][proxy] Make keep-alive interval configurable in Pulsar Proxy (
Browse files Browse the repository at this point in the history
#23981)

(cherry picked from commit eb1391a)
  • Loading branch information
lhotari committed Feb 14, 2025
1 parent fb65e36 commit 594cad2
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 280 deletions.
7 changes: 7 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ bindAddress=0.0.0.0
# If not set, the value of `InetAddress.getLocalHost().getCanonicalHostName()` is used.
advertisedAddress=

# Specifies the interval (in seconds) for sending ping messages to the client. Set to 0 to disable
# ping messages. This setting applies to client connections used for topic lookups and
# partition metadata requests. When a client establishes a broker connection via the proxy,
# the client and broker will communicate directly without the proxy intercepting the messages.
# In that case, the broker's keepAliveIntervalSeconds configuration becomes relevant.
keepAliveIntervalSeconds=30

# Enable or disable the HAProxy protocol.
# If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data.
haProxyProtocolEnabled=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,15 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
private String advertisedAddress;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Specifies the interval (in seconds) for sending ping messages to the client. Set to 0 to disable "
+ "ping messages. This setting applies to client connections used for topic lookups and "
+ "partition metadata requests. When a client establishes a broker connection via the proxy, "
+ "the client and broker will communicate directly without the proxy intercepting the messages. "
+ "In that case, the broker's keepAliveIntervalSeconds configuration becomes relevant.")
private int keepAliveIntervalSeconds = 30;

@FieldContext(category = CATEGORY_SERVER,
doc = "Enable or disable the proxy protocol.")
private boolean haProxyProtocolEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ ConnectionPool getConnectionPool() {
}

public ProxyConnection(ProxyService proxyService, DnsAddressResolverGroup dnsAddressResolverGroup) {
super(30, TimeUnit.SECONDS);
super(proxyService.getConfiguration().getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
this.service = proxyService;
this.dnsAddressResolverGroup = dnsAddressResolverGroup;
this.state = State.Init;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void testBrokerUrlCheck() throws IOException {
theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any()))
.thenReturn(configuration);
try {
new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
new ProxyServiceStarter(ProxyServiceStarterTest.getArgs());
fail("brokerServiceURL must start with pulsar://");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("brokerServiceURL must start with pulsar://"));
Expand All @@ -161,7 +161,7 @@ public void testBrokerUrlCheck() throws IOException {
theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any()))
.thenReturn(configuration);
try {
new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
new ProxyServiceStarter(ProxyServiceStarterTest.getArgs());
fail("brokerServiceURLTLS must start with pulsar+ssl://");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("brokerServiceURLTLS must start with pulsar+ssl://"));
Expand All @@ -174,7 +174,7 @@ public void testBrokerUrlCheck() throws IOException {
theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any()))
.thenReturn(configuration);
try {
new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
new ProxyServiceStarter(ProxyServiceStarterTest.getArgs());
fail("brokerServiceURL does not support multi urls yet");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("does not support multi urls yet"));
Expand All @@ -188,7 +188,7 @@ public void testBrokerUrlCheck() throws IOException {
theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any()))
.thenReturn(configuration);
try {
new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
new ProxyServiceStarter(ProxyServiceStarterTest.getArgs());
fail("brokerServiceURLTLS does not support multi urls yet");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("does not support multi urls yet"));
Expand All @@ -202,7 +202,7 @@ public void testBrokerUrlCheck() throws IOException {
theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any()))
.thenReturn(configuration);
try {
new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
new ProxyServiceStarter(ProxyServiceStarterTest.getArgs());
fail("brokerWebServiceURL does not support multi urls yet");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("does not support multi urls yet"));
Expand All @@ -216,7 +216,7 @@ public void testBrokerUrlCheck() throws IOException {
theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any()))
.thenReturn(configuration);
try {
new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
new ProxyServiceStarter(ProxyServiceStarterTest.getArgs());
fail("brokerWebServiceURLTLS does not support multi urls yet");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("does not support multi urls yet"));
Expand All @@ -230,7 +230,7 @@ public void testBrokerUrlCheck() throws IOException {
theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any()))
.thenReturn(configuration);
try {
new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
new ProxyServiceStarter(ProxyServiceStarterTest.getArgs());
fail("functionWorkerWebServiceURL does not support multi urls yet");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("does not support multi urls yet"));
Expand All @@ -244,7 +244,7 @@ public void testBrokerUrlCheck() throws IOException {
theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(), Mockito.any()))
.thenReturn(configuration);
try {
new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
new ProxyServiceStarter(ProxyServiceStarterTest.getArgs());
fail("functionWorkerWebServiceURLTLS does not support multi urls yet");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("does not support multi urls yet"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
@Slf4j
@Test(groups = "broker")
public class ProxyOriginalClientIPTest extends MockedPulsarServiceBaseTest {
static final String[] ARGS = new String[]{"-c", "./src/test/resources/proxy.conf"};
HttpClient httpClient;
ProxyServiceStarter serviceStarter;
String webServiceUrl;
Expand All @@ -49,7 +48,7 @@ public class ProxyOriginalClientIPTest extends MockedPulsarServiceBaseTest {
@BeforeClass
protected void setup() throws Exception {
internalSetup();
serviceStarter = new ProxyServiceStarter(ARGS, proxyConfig -> {
serviceStarter = new ProxyServiceStarter(ProxyServiceStarterTest.getArgs(), proxyConfig -> {
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
proxyConfig.setBrokerWebServiceURL(pulsar.getWebServiceAddress());
proxyConfig.setWebServicePort(Optional.of(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class ProxyServiceStarterDisableZeroCopyTest extends ProxyServiceStarterT
@BeforeClass
protected void setup() throws Exception {
internalSetup();
serviceStarter = new ProxyServiceStarter(ARGS, null, true);
serviceStarter = new ProxyServiceStarter(getArgs(), null, true);
serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
serviceStarter.getConfig().setWebServicePort(Optional.of(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,23 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Producer;
Expand All @@ -50,17 +57,38 @@
import org.testng.annotations.Test;

public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {

public static final String[] ARGS = new String[]{"-c", "./src/test/resources/proxy.conf"};

protected ProxyServiceStarter serviceStarter;
protected String serviceUrl;
private static File proxyConfFileForTests;

@SneakyThrows
public static String[] getArgs() {
if (proxyConfFileForTests == null) {
// load the properties from the proxy.conf file
Properties properties = new Properties();
try (InputStream inputStream = new FileInputStream("../conf/proxy.conf")) {
properties.load(inputStream);
}
// set dummy values for the required properties so that validation is passed
properties.setProperty("brokerServiceURL", "pulsar://0.0.0.0:0");
properties.setProperty("brokerWebServiceURL", "http://0.0.0.0:0");
// change keepAliveIntervalSeconds default value so that it's possible to validate that it's configured
properties.setProperty("keepAliveIntervalSeconds", "25");
// write the properties to a temporary file
proxyConfFileForTests = File.createTempFile("proxy", ".conf");
proxyConfFileForTests.deleteOnExit();
try (OutputStream out = new FileOutputStream(proxyConfFileForTests)) {
properties.store(out, null);
}
}
return new String[] { "-c", proxyConfFileForTests.getAbsolutePath() };
}

@Override
@BeforeClass
protected void setup() throws Exception {
internalSetup();
serviceStarter = new ProxyServiceStarter(ARGS, null, true);
serviceStarter = new ProxyServiceStarter(getArgs(), null, true);
serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
serviceStarter.getConfig().setWebServicePort(Optional.of(0));
Expand Down Expand Up @@ -100,6 +128,11 @@ public void testProducer() throws Exception {
}
}

@Test
public void testKeepAliveIntervalSecondsIsConfigured() throws Exception {
assertEquals(serviceStarter.getConfig().getKeepAliveIntervalSeconds(), 25);
}

@Test
public void testProduceAndConsumeMessageWithWebsocket() throws Exception {
@Cleanup("stop")
Expand Down Expand Up @@ -180,7 +213,7 @@ public void testProxyClientAuthentication() throws Exception {



ProxyServiceStarter serviceStarter = new ProxyServiceStarter(ARGS, null, true);
ProxyServiceStarter serviceStarter = new ProxyServiceStarter(getArgs(), null, true);
initConfig.accept(serviceStarter.getConfig());
// ProxyServiceStarter will throw an exception when Authentication#start is failed
serviceStarter.getConfig().setBrokerClientAuthenticationPlugin(ExceptionAuthentication1.class.getName());
Expand All @@ -192,7 +225,7 @@ public void testProxyClientAuthentication() throws Exception {
assertTrue(serviceStarter.getProxyClientAuthentication() instanceof ExceptionAuthentication1);
}

serviceStarter = new ProxyServiceStarter(ARGS, null, true);
serviceStarter = new ProxyServiceStarter(getArgs(), null, true);
initConfig.accept(serviceStarter.getConfig());
// ProxyServiceStarter will throw an exception when Authentication#start and Authentication#close are failed
serviceStarter.getConfig().setBrokerClientAuthenticationPlugin(ExceptionAuthentication2.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@
*/
package org.apache.pulsar.proxy.server;

import static org.apache.pulsar.proxy.server.ProxyServiceStarterTest.getArgs;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Producer;
Expand All @@ -35,17 +44,6 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;

import static org.apache.pulsar.proxy.server.ProxyServiceStarterTest.ARGS;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
private ProxyServiceStarter serviceStarter;
private String serviceUrl;
Expand All @@ -55,7 +53,7 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
@BeforeClass
protected void setup() throws Exception {
internalSetup();
serviceStarter = new ProxyServiceStarter(ARGS, null, true);
serviceStarter = new ProxyServiceStarter(getArgs(), null, true);
serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
serviceStarter.getConfig().setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
Expand Down
Loading

0 comments on commit 594cad2

Please sign in to comment.