Skip to content

Commit c098741

Browse files
jongyoulclaude
andcommitted
[ZEPPELIN-6400] Move InterpreterLauncher/RecoveryStorage to zeppelin-zengine
Move server-side-only classes (InterpreterLauncher, InterpreterClient, InterpreterLaunchContext, RecoveryStorage, InterpreterLauncherTest) from zeppelin-interpreter to zeppelin-zengine. This allows launcher plugins and recovery storage to keep using ZeppelinConfiguration directly, eliminating unnecessary Properties conversion code. Reverts launcher plugins and zengine launcher/recovery files to their original ZeppelinConfiguration-based implementations. Co-Authored-By: Claude Opus 4.6 <[email protected]>
1 parent 1050622 commit c098741

File tree

22 files changed

+142
-220
lines changed

22 files changed

+142
-220
lines changed

zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616
*/
1717
package org.apache.zeppelin.interpreter.launcher;
1818

19+
import org.apache.zeppelin.conf.ZeppelinConfiguration;
1920
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
2021
import org.slf4j.Logger;
2122
import org.slf4j.LoggerFactory;
2223

2324
import java.io.IOException;
2425
import java.util.Map;
25-
import java.util.Properties;
2626

2727
/**
2828
* Interpreter Launcher which use shell script to launch the interpreter process.
@@ -32,9 +32,9 @@ public class DockerInterpreterLauncher extends InterpreterLauncher {
3232

3333
private InterpreterLaunchContext context;
3434

35-
public DockerInterpreterLauncher(Properties zProperties, RecoveryStorage recoveryStorage)
35+
public DockerInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage)
3636
throws IOException {
37-
super(zProperties, recoveryStorage);
37+
super(zConf, recoveryStorage);
3838
}
3939

4040
@Override
@@ -53,17 +53,17 @@ public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws
5353

5454
StandardInterpreterLauncher interpreterLauncher = null;
5555
if (isSpark()) {
56-
interpreterLauncher = new SparkInterpreterLauncher(zProperties, recoveryStorage);
56+
interpreterLauncher = new SparkInterpreterLauncher(zConf, recoveryStorage);
5757
} else if (isFlink()) {
58-
interpreterLauncher = new FlinkInterpreterLauncher(zProperties, recoveryStorage);
58+
interpreterLauncher = new FlinkInterpreterLauncher(zConf, recoveryStorage);
5959
} else {
60-
interpreterLauncher = new StandardInterpreterLauncher(zProperties, recoveryStorage);
60+
interpreterLauncher = new StandardInterpreterLauncher(zConf, recoveryStorage);
6161
}
6262
Map<String, String> env = interpreterLauncher.buildEnvFromProperties(context);
6363

6464
return new DockerInterpreterProcess(
65-
zProperties,
66-
zProperties.getProperty("zeppelin.docker.container.image", "apache/zeppelin"),
65+
zConf,
66+
zConf.getDockerContainerImage(),
6767
context.getInterpreterGroupId(),
6868
context.getInterpreterSettingGroup(),
6969
context.getInterpreterSettingName(),

zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@
5353
import org.apache.commons.io.FileUtils;
5454
import org.apache.commons.io.filefilter.FileFilterUtils;
5555
import org.apache.commons.lang.StringUtils;
56+
import org.apache.zeppelin.conf.ZeppelinConfiguration;
57+
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
5658
import org.apache.zeppelin.interpreter.launcher.utils.TarFileEntry;
5759
import org.apache.zeppelin.interpreter.launcher.utils.TarUtils;
5860
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
@@ -61,6 +63,7 @@
6163
import org.slf4j.LoggerFactory;
6264

6365
import static java.nio.charset.StandardCharsets.UTF_8;
66+
import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB;
6467

6568
public class DockerInterpreterProcess extends RemoteInterpreterProcess {
6669
private static final Logger LOGGER = LoggerFactory.getLogger(DockerInterpreterProcess.class);
@@ -90,7 +93,7 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
9093
@VisibleForTesting
9194
boolean uploadLocalLibToContainter = true;
9295

93-
private Properties zProperties;
96+
private ZeppelinConfiguration zConf;
9497

9598
private String zeppelinHome;
9699
private final String containerZeppelinHome;
@@ -104,7 +107,7 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess {
104107
private static final String CONTAINER_UPLOAD_TAR_DIR = "/tmp/zeppelin-tar";
105108

106109
public DockerInterpreterProcess(
107-
Properties zProperties,
110+
ZeppelinConfiguration zConf,
108111
String containerImage,
109112
String interpreterGroupId,
110113
String interpreterGroupName,
@@ -125,22 +128,20 @@ public DockerInterpreterProcess(
125128
this.properties = properties;
126129
this.envs = new HashMap<>(envs);
127130

128-
this.zProperties = zProperties;
131+
this.zConf = zConf;
129132
this.containerName = interpreterGroupId.toLowerCase();
130133

131-
containerZeppelinHome = zProperties.getProperty(
132-
"zeppelin.docker.container.home", "/opt/zeppelin");
133-
containerSparkHome = zProperties.getProperty(
134-
"zeppelin.docker.container.spark.home", "/opt/spark");
135-
uploadLocalLibToContainter = Boolean.parseBoolean(
136-
zProperties.getProperty("zeppelin.docker.upload.local.lib.to.container", "true"));
134+
containerZeppelinHome = zConf.getString(ConfVars.ZEPPELIN_DOCKER_CONTAINER_HOME);
135+
containerSparkHome = zConf.getString(ConfVars.ZEPPELIN_DOCKER_CONTAINER_SPARK_HOME);
136+
uploadLocalLibToContainter = zConf.getBoolean(
137+
ConfVars.ZEPPELIN_DOCKER_UPLOAD_LOCAL_LIB_TO_CONTAINTER);
137138

138139
try {
139140
this.zeppelinHome = getZeppelinHome();
140141
} catch (IOException e) {
141142
LOGGER.error(e.getMessage(), e);
142143
}
143-
dockerHost = zProperties.getProperty("zeppelin.docker.host", "http://0.0.0.0:2375");
144+
dockerHost = zConf.getString(ConfVars.ZEPPELIN_DOCKER_HOST);
144145
}
145146

146147
@Override
@@ -327,8 +328,7 @@ List<String> getListEnvs() {
327328
envs.remove("PATH");
328329

329330
// set container time zone
330-
envs.put("TZ", zProperties.getProperty("zeppelin.docker.time.zone",
331-
java.util.TimeZone.getDefault().getID()));
331+
envs.put("TZ", zConf.getString(ConfVars.ZEPPELIN_DOCKER_TIME_ZONE));
332332

333333
List<String> listEnv = new ArrayList<>();
334334
for (Map.Entry<String, String> entry : this.envs.entrySet()) {
@@ -498,7 +498,7 @@ private void copyRunFileToContainer(String containerId)
498498
copyFiles.putIfAbsent(intpKeytab, intpKeytab);
499499
}
500500
// 3.5) zeppelin server keytab file
501-
String zeppelinServerKeytab = zProperties.getProperty("zeppelin.server.kerberos.keytab", "");
501+
String zeppelinServerKeytab = zConf.getString(ZEPPELIN_SERVER_KERBEROS_KEYTAB);
502502
if (!StringUtils.isBlank(zeppelinServerKeytab)) {
503503
copyFiles.putIfAbsent(zeppelinServerKeytab, zeppelinServerKeytab);
504504
}
@@ -636,10 +636,9 @@ boolean isSpark() {
636636

637637
private String getZeppelinHome() throws IOException {
638638
// check zeppelinHome is exist
639-
String zeppelinHomePath = zProperties.getProperty("zeppelin.home", "./");
640-
File fileZeppelinHome = new File(zeppelinHomePath);
639+
File fileZeppelinHome = new File(zConf.getZeppelinHome());
641640
if (fileZeppelinHome.exists() && fileZeppelinHome.isDirectory()) {
642-
return zeppelinHomePath;
641+
return zConf.getZeppelinHome();
643642
}
644643

645644
throw new IOException("Can't find zeppelin home path!");

zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,16 @@
3232
import static org.junit.jupiter.api.Assertions.assertNotNull;
3333
import static org.junit.jupiter.api.Assertions.assertTrue;
3434
import static org.mockito.Mockito.spy;
35+
import static org.mockito.Mockito.when;
3536

3637
class DockerInterpreterProcessTest {
3738

3839
protected static ZeppelinConfiguration zConf = spy(ZeppelinConfiguration.load());
3940

4041
@Test
4142
void testCreateIntpProcess() throws IOException {
42-
Properties zProperties = new Properties();
43-
zProperties.putAll(zConf.getCompleteConfiguration());
4443
DockerInterpreterLauncher launcher
45-
= new DockerInterpreterLauncher(zProperties, null);
44+
= new DockerInterpreterLauncher(zConf, null);
4645
Properties properties = new Properties();
4746
properties.setProperty(
4847
ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "5000");
@@ -63,25 +62,21 @@ void testCreateIntpProcess() throws IOException {
6362

6463
@Test
6564
void testEnv() throws IOException {
65+
when(zConf.getString(ConfVars.ZEPPELIN_DOCKER_CONTAINER_SPARK_HOME))
66+
.thenReturn("my-spark-home");
67+
when(zConf.getBoolean(ConfVars.ZEPPELIN_DOCKER_UPLOAD_LOCAL_LIB_TO_CONTAINTER))
68+
.thenReturn(false);
69+
when(zConf.getString(ConfVars.ZEPPELIN_DOCKER_HOST))
70+
.thenReturn("http://my-docker-host:2375");
71+
6672
Properties properties = new Properties();
6773
properties.setProperty(
6874
ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "5000");
6975
HashMap<String, String> envs = new HashMap<String, String>();
7076
envs.put("MY_ENV1", "V1");
7177

72-
Properties zProps = new Properties();
73-
zProps.putAll(zConf.getCompleteConfiguration());
74-
zProps.setProperty(
75-
ConfVars.ZEPPELIN_DOCKER_CONTAINER_SPARK_HOME.getVarName(),
76-
"my-spark-home");
77-
zProps.setProperty(
78-
ConfVars.ZEPPELIN_DOCKER_UPLOAD_LOCAL_LIB_TO_CONTAINTER.getVarName(),
79-
"false");
80-
zProps.setProperty(
81-
ConfVars.ZEPPELIN_DOCKER_HOST.getVarName(),
82-
"http://my-docker-host:2375");
8378
DockerInterpreterProcess intp = spy(new DockerInterpreterProcess(
84-
zProps,
79+
zConf,
8580
"interpreter-container:1.0",
8681
"shared_process",
8782
"sh",
@@ -106,10 +101,8 @@ void testTemplateBindings() throws IOException {
106101
HashMap<String, String> envs = new HashMap<String, String>();
107102
envs.put("MY_ENV1", "V1");
108103

109-
Properties zProps = new Properties();
110-
zProps.putAll(zConf.getCompleteConfiguration());
111104
DockerInterpreterProcess intp = new DockerInterpreterProcess(
112-
zProps,
105+
zConf,
113106
"interpreter-container:1.0",
114107
"shared_process",
115108
"sh",

zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.common.base.CharMatcher;
2121
import com.google.common.collect.Sets;
2222
import org.apache.commons.lang3.StringUtils;
23+
import org.apache.zeppelin.conf.ZeppelinConfiguration;
2324
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
2425
import org.slf4j.Logger;
2526
import org.slf4j.LoggerFactory;
@@ -42,8 +43,8 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
4243
"local", "remote", "yarn", "yarn-application", "kubernetes-application");
4344

4445

45-
public FlinkInterpreterLauncher(Properties zProperties, RecoveryStorage recoveryStorage) {
46-
super(zProperties, recoveryStorage);
46+
public FlinkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
47+
super(zConf, recoveryStorage);
4748
}
4849

4950
@Override
@@ -156,7 +157,7 @@ private String chooseFlinkAppJar(String flinkHome) throws IOException {
156157
}
157158
final String flinkScalaVersion = scalaVersion;
158159
File flinkInterpreterFolder =
159-
new File(zProperties.getProperty("zeppelin.interpreter.dir", "interpreter"), "flink");
160+
new File(zConf.getInterpreterDir(), "flink");
160161
List<File> flinkScalaJars =
161162
Arrays.stream(flinkInterpreterFolder
162163
.listFiles(file -> file.getName().endsWith(".jar")))

zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
import java.net.UnknownHostException;
2424
import java.util.HashMap;
2525
import java.util.Map;
26-
import java.util.Properties;
2726

27+
import org.apache.zeppelin.conf.ZeppelinConfiguration;
2828
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
2929
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
3030
import org.slf4j.Logger;
@@ -41,8 +41,8 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
4141
private static final Logger LOGGER = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class);
4242
private final KubernetesClient client;
4343

44-
public K8sStandardInterpreterLauncher(Properties zProperties, RecoveryStorage recoveryStorage) {
45-
super(zProperties, recoveryStorage);
44+
public K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
45+
super(zConf, recoveryStorage);
4646
client = new DefaultKubernetesClient();
4747
}
4848

@@ -68,7 +68,7 @@ private String getZeppelinService(InterpreterLaunchContext context) throws IOExc
6868
//The namespace of zeppelin server can only be read from Config.KUBERNETES_NAMESPACE_PATH while it runs in k8s cluster, it may be different from the namespace of interpreter
6969
String serverNamespace = K8sUtils.getCurrentK8sNamespace();
7070
return String.format("%s.%s.svc",
71-
zProperties.getProperty("zeppelin.k8s.service.name", "zeppelin-server"),
71+
zConf.getK8sServiceName(),
7272
serverNamespace);
7373
} else {
7474
return context.getIntpEventServerHost();
@@ -96,7 +96,7 @@ private int getZeppelinServiceRpcPort(InterpreterLaunchContext context) {
9696
* Only if a spark interpreter process is running, user impersonation should be possible for --proxy-user
9797
*/
9898
private boolean isUserImpersonateForSparkInterpreter(InterpreterLaunchContext context) {
99-
return Boolean.parseBoolean(zProperties.getProperty("zeppelin.impersonate.spark.proxy.user", "true")) &&
99+
return zConf.getZeppelinImpersonateSparkProxyUser() &&
100100
context.getOption().isUserImpersonate() &&
101101
"spark".equalsIgnoreCase(context.getInterpreterSettingGroup());
102102
}
@@ -107,22 +107,22 @@ public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws
107107

108108
return new K8sRemoteInterpreterProcess(
109109
client,
110-
K8sUtils.getInterpreterNamespace(context.getProperties(), zProperties),
111-
new File(zProperties.getProperty("zeppelin.k8s.template.dir", "k8s"), "interpreter"),
112-
zProperties.getProperty("zeppelin.k8s.interpreter.container.image"),
110+
K8sUtils.getInterpreterNamespace(context.getProperties(), zConf),
111+
new File(zConf.getK8sTemplatesDir(), "interpreter"),
112+
zConf.getK8sContainerImage(),
113113
context.getInterpreterGroupId(),
114114
context.getInterpreterSettingGroup(),
115115
context.getInterpreterSettingName(),
116116
context.getProperties(),
117117
buildEnvFromProperties(context),
118118
getZeppelinService(context),
119119
getZeppelinServiceRpcPort(context),
120-
Boolean.parseBoolean(zProperties.getProperty("zeppelin.k8s.portforward", "false")),
121-
zProperties.getProperty("zeppelin.k8s.spark.container.image"),
120+
zConf.getK8sPortForward(),
121+
zConf.getK8sSparkContainerImage(),
122122
getConnectTimeout(context),
123123
getConnectPoolSize(context),
124124
isUserImpersonateForSparkInterpreter(context),
125-
Boolean.parseBoolean(zProperties.getProperty("zeppelin.k8s.timeout.during.pending", "true")));
125+
zConf.getK8sTimeoutDuringPending());
126126
}
127127

128128
protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) {

zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sUtils.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929

3030
import org.apache.commons.lang3.RandomStringUtils;
3131
import org.apache.commons.lang3.StringUtils;
32+
import org.apache.zeppelin.conf.ZeppelinConfiguration;
33+
3234
import io.fabric8.kubernetes.client.Config;
3335

3436
public class K8sUtils {
@@ -102,21 +104,21 @@ public static String getCurrentK8sNamespace() {
102104

103105
/**
104106
* Get the namespace of the interpreter.
105-
* Check Order: zeppelin.k8s.interpreter.namespace -> getCurrentK8sNamespace() -> zProperties.getProperty("zeppelin.k8s.namespace")
107+
* Check Order: zeppelin.k8s.interpreter.namespace -> getCurrentK8sNamespace() -> zConf.getK8sNamepsace()
106108
* @param properties
107-
* @param zProperties
109+
* @param zConf
108110
* @return the interpreter namespace
109111
* @throws IOException
110112
*/
111-
public static String getInterpreterNamespace(Properties properties, Properties zProperties) throws IOException {
113+
public static String getInterpreterNamespace(Properties properties, ZeppelinConfiguration zConf) throws IOException {
112114
if(properties.containsKey("zeppelin.k8s.interpreter.namespace")){
113115
return properties.getProperty("zeppelin.k8s.interpreter.namespace");
114116
}
115117

116118
if (isRunningOnKubernetes()) {
117119
return getCurrentK8sNamespace();
118120
} else {
119-
return zProperties.getProperty("zeppelin.k8s.namespace", "default");
121+
return zConf.getK8sNamepsace();
120122
}
121123
}
122124

zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,7 @@ void setUp() {
4343
void testK8sLauncher() throws IOException {
4444
// given
4545
ZeppelinConfiguration zConf = ZeppelinConfiguration.load();
46-
Properties zProperties = new Properties();
47-
zProperties.putAll(zConf.getCompleteConfiguration());
48-
K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zProperties, null);
46+
K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null);
4947
Properties properties = new Properties();
5048
properties.setProperty("ENV_1", "VALUE_1");
5149
properties.setProperty("property_1", "value_1");
@@ -74,9 +72,7 @@ void testK8sLauncher() throws IOException {
7472
void testK8sLauncherWithSparkAndUserImpersonate() throws IOException {
7573
// given
7674
ZeppelinConfiguration zConf = ZeppelinConfiguration.load();
77-
Properties zProperties = new Properties();
78-
zProperties.putAll(zConf.getCompleteConfiguration());
79-
K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zProperties, null);
75+
K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null);
8076
Properties properties = new Properties();
8177
properties.setProperty("ENV_1", "VALUE_1");
8278
properties.setProperty("property_1", "value_1");
@@ -110,9 +106,7 @@ void testK8sLauncherWithSparkAndUserImpersonate() throws IOException {
110106
void testK8sLauncherWithSparkAndWithoutUserImpersonate() throws IOException {
111107
// given
112108
ZeppelinConfiguration zConf = ZeppelinConfiguration.load();
113-
Properties zProperties = new Properties();
114-
zProperties.putAll(zConf.getCompleteConfiguration());
115-
K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zProperties, null);
109+
K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null);
116110
Properties properties = new Properties();
117111
properties.setProperty("ENV_1", "VALUE_1");
118112
properties.setProperty("property_1", "value_1");

0 commit comments

Comments
 (0)