Skip to content

Commit f0ec07b

Browse files
thetumbledfengwenzhi
andauthored
[fix][client] ControlledClusterFailover avoid unnecessary reconnection. (#25178)
Co-authored-by: fengwenzhi <[email protected]>
1 parent a2f888a commit f0ec07b

File tree

2 files changed

+15
-0
lines changed

2 files changed

+15
-0
lines changed

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.pulsar.client.api.ControlledClusterFailoverBuilder;
4545
import org.apache.pulsar.client.api.PulsarClient;
4646
import org.apache.pulsar.client.api.ServiceUrlProvider;
47+
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
4748
import org.apache.pulsar.client.util.ExecutorProvider;
4849
import org.apache.pulsar.common.util.ObjectMapperFactory;
4950
import org.asynchttpclient.AsyncHttpClient;
@@ -119,6 +120,15 @@ public void initialize(PulsarClient client) {
119120
.addHeader("Accept", "application/json");
120121
headers.forEach(requestBuilder::addHeader);
121122

123+
// Initialize currentControlledConfiguration from client's current configuration
124+
// to avoid unnecessary reconnection on first scheduled check when the configuration hasn't changed
125+
ClientConfigurationData conf = pulsarClient.getConfiguration();
126+
this.currentControlledConfiguration = new ControlledConfiguration();
127+
this.currentControlledConfiguration.setServiceUrl(currentPulsarServiceUrl);
128+
this.currentControlledConfiguration.setTlsTrustCertsFilePath(conf.getTlsTrustCertsFilePath());
129+
this.currentControlledConfiguration.setAuthPluginClassName(conf.getAuthPluginClassName());
130+
this.currentControlledConfiguration.setAuthParamsString(conf.getAuthParams());
131+
122132
// start to check service url every 30 seconds
123133
this.executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> {
124134
ControlledConfiguration controlledConfiguration = null;

pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import lombok.Cleanup;
2727
import org.apache.pulsar.client.api.Authentication;
2828
import org.apache.pulsar.client.api.ServiceUrlProvider;
29+
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
2930
import org.asynchttpclient.Request;
3031
import org.awaitility.Awaitility;
3132
import org.mockito.Mockito;
@@ -57,7 +58,9 @@ public void testBuildControlledClusterFailoverInstance() throws Exception {
5758

5859
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
5960
ConnectionPool connectionPool = mock(ConnectionPool.class);
61+
ClientConfigurationData clientConf = new ClientConfigurationData();
6062
when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
63+
when(pulsarClient.getConfiguration()).thenReturn(clientConf);
6164
controlledClusterFailover.initialize(pulsarClient);
6265

6366
Request request = controlledClusterFailover.getRequestBuilder().build();
@@ -97,7 +100,9 @@ public void testControlledClusterFailoverSwitch() throws Exception {
97100
ControlledClusterFailover controlledClusterFailover = Mockito.spy((ControlledClusterFailover) provider);
98101
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
99102
ConnectionPool connectionPool = mock(ConnectionPool.class);
103+
ClientConfigurationData clientConf = new ClientConfigurationData();
100104
when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
105+
when(pulsarClient.getConfiguration()).thenReturn(clientConf);
101106

102107
controlledClusterFailover.initialize(pulsarClient);
103108

0 commit comments

Comments
 (0)