Skip to content

Commit 4e9241b

Browse files
rmdmattinglyRay Mattingly
authored andcommitted
Improved operation interceptor instantiation (not yet merged upstream) (#210)
Co-authored-by: Ray Mattingly <rmattingly@hubspot.com>
1 parent e6cb400 commit 4e9241b

2 files changed

Lines changed: 29 additions & 24 deletions

File tree

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,6 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
257257

258258
private final RetryingCallerInterceptor interceptor;
259259

260-
private final OperationInterceptorFactory operationInterceptorFactory;
261-
262260
/**
263261
* Cluster registry of basic info such as clusterid and meta region location.
264262
*/
@@ -339,7 +337,6 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
339337

340338
this.stats = ServerStatisticTracker.create(conf);
341339
this.interceptor = new RetryingCallerInterceptorFactory(conf).build();
342-
this.operationInterceptorFactory = createOperationInterceptorFactory(conf);
343340

344341
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
345342

@@ -373,7 +370,7 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
373370
connectionAttributes);
374371
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
375372
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, connectionConfig,
376-
interceptor, this.stats, this.metrics, operationInterceptorFactory);
373+
interceptor, this.stats, this.metrics);
377374
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
378375

379376
// Do we publish the status?
@@ -2344,7 +2341,7 @@ public TableState getTableState(TableName tableName) throws IOException {
23442341
@Override
23452342
public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
23462343
return RpcRetryingCallerFactory.instantiate(conf, connectionConfig, this.interceptor,
2347-
this.stats, metrics, createOperationInterceptorFactory(conf));
2344+
this.stats, metrics);
23482345
}
23492346

23502347
@Override
@@ -2422,18 +2419,4 @@ public String getClusterId() {
24222419
}
24232420
return null;
24242421
}
2425-
2426-
private static OperationInterceptorFactory createOperationInterceptorFactory(Configuration conf) {
2427-
String clazz = conf.get(OperationInterceptorFactory.HBASE_CLIENT_OPERATION_INTERCEPTOR_IMPL);
2428-
if (clazz == null || clazz.isEmpty()) {
2429-
return OperationInterceptorFactory.NO_OP;
2430-
}
2431-
try {
2432-
Class<? extends OperationInterceptorFactory> factoryClass =
2433-
conf.getClassByName(clazz).asSubclass(OperationInterceptorFactory.class);
2434-
return ReflectionUtils.newInstance(factoryClass, conf);
2435-
} catch (ClassNotFoundException e) {
2436-
throw new RuntimeException("Failed to load OperationInterceptorFactory class: " + clazz, e);
2437-
}
2438-
}
24392422
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@
2121
import org.apache.hadoop.conf.Configuration;
2222
import org.apache.hadoop.hbase.util.ReflectionUtils;
2323
import org.apache.yetus.audience.InterfaceAudience;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
2426

2527
/**
2628
* Factory to create an {@link RpcRetryingCaller}
2729
*/
2830
@InterfaceAudience.Private
2931
public class RpcRetryingCallerFactory {
3032

33+
private static final Logger LOG = LoggerFactory.getLogger(RpcRetryingCallerFactory.class);
34+
3135
/** Configuration key for a custom {@link RpcRetryingCaller} */
3236
public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class";
3337
private final ConnectionConfiguration connectionConf;
@@ -38,12 +42,12 @@ public class RpcRetryingCallerFactory {
3842

3943
public RpcRetryingCallerFactory(Configuration conf, ConnectionConfiguration connectionConf) {
4044
this(conf, connectionConf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null,
41-
OperationInterceptorFactory.NO_OP);
45+
createOperationInterceptorFactory(conf));
4246
}
4347

4448
public RpcRetryingCallerFactory(Configuration conf, ConnectionConfiguration connectionConf,
4549
RetryingCallerInterceptor interceptor, MetricsConnection metrics) {
46-
this(conf, connectionConf, interceptor, metrics, OperationInterceptorFactory.NO_OP);
50+
this(conf, connectionConf, interceptor, metrics, createOperationInterceptorFactory(conf));
4751
}
4852

4953
public RpcRetryingCallerFactory(Configuration conf, ConnectionConfiguration connectionConf,
@@ -90,21 +94,23 @@ public static RpcRetryingCallerFactory instantiate(Configuration configuration,
9094
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
9195
ConnectionConfiguration connectionConf, MetricsConnection metrics) {
9296
return instantiate(configuration, connectionConf,
93-
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, metrics);
97+
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null, metrics,
98+
createOperationInterceptorFactory(configuration));
9499
}
95100

96101
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
97102
ConnectionConfiguration connectionConf, ServerStatisticTracker stats,
98103
MetricsConnection metrics) {
99104
return instantiate(configuration, connectionConf,
100-
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats, metrics);
105+
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, stats, metrics,
106+
createOperationInterceptorFactory(configuration));
101107
}
102108

103109
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
104110
ConnectionConfiguration connectionConf, RetryingCallerInterceptor interceptor,
105111
ServerStatisticTracker stats, MetricsConnection metrics) {
106112
return instantiate(configuration, connectionConf, interceptor, stats, metrics,
107-
OperationInterceptorFactory.NO_OP);
113+
createOperationInterceptorFactory(configuration));
108114
}
109115

110116
public static RpcRetryingCallerFactory instantiate(Configuration configuration,
@@ -125,4 +131,20 @@ public static RpcRetryingCallerFactory instantiate(Configuration configuration,
125131
}
126132
return factory;
127133
}
134+
135+
private static OperationInterceptorFactory createOperationInterceptorFactory(Configuration conf) {
136+
String clazz = conf.get(OperationInterceptorFactory.HBASE_CLIENT_OPERATION_INTERCEPTOR_IMPL);
137+
if (clazz == null || clazz.isEmpty()) {
138+
return OperationInterceptorFactory.NO_OP;
139+
}
140+
try {
141+
Class<? extends OperationInterceptorFactory> factoryClass =
142+
conf.getClassByName(clazz).asSubclass(OperationInterceptorFactory.class);
143+
return ReflectionUtils.newInstance(factoryClass, conf);
144+
} catch (ClassNotFoundException e) {
145+
LOG.warn("Failed to load OperationInterceptorFactory class: {}, using NO_OP instead", clazz,
146+
e);
147+
return OperationInterceptorFactory.NO_OP;
148+
}
149+
}
128150
}

0 commit comments

Comments
 (0)