Skip to content

Commit d9137de

Browse files
authored
Get Kafka env vars using Strimzi approach in data-generator (#79)
1 parent 2c0d7c6 commit d9137de

File tree

2 files changed

+45
-21
lines changed

2 files changed

+45
-21
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copied from https://github.com/strimzi/client-examples/blob/main/java/common/src/main/java/io/strimzi/common/ConfigUtil.java
2+
package com.github.streamshub.kafka.data.generator;
3+
4+
import java.util.Locale;
5+
import java.util.Properties;
6+
import java.util.stream.Collectors;
7+
import java.util.Map;
8+
9+
10+
/**
11+
* Provides utility methods for managing common configuration properties.
12+
*/
13+
public class ConfigUtil {
14+
private static final String KAFKA_PREFIX = "KAFKA_";
15+
16+
/**
17+
* Converts environment variables into a corresponding property key format.
18+
*
19+
* @param envVar Name of the environment variable to be converted to property key format.
20+
* @return Returns a String which removes a prefix containing '_', converts to lower case and replaces '_' with '.'.
21+
*/
22+
public static String convertEnvVarToPropertyKey(String envVar) {
23+
return envVar.substring(envVar.indexOf("_") + 1).toLowerCase(Locale.ENGLISH).replace("_", ".");
24+
}
25+
26+
/**
27+
* Retrieves Kafka-related properties from environment variables.
28+
* This method scans all environment variables, filters those that start with the prefix "KAFKA_",
29+
* converts them to a property key format, and collects them into a Properties object.
30+
*
31+
* @return Properties object containing Kafka-related properties derived from environment variables.
32+
*/
33+
public static Properties getKafkaPropertiesFromEnv() {
34+
Properties properties = new Properties();
35+
properties.putAll(System.getenv()
36+
.entrySet()
37+
.stream()
38+
.filter(mapEntry -> mapEntry.getKey().startsWith(KAFKA_PREFIX))
39+
.collect(Collectors.toMap(mapEntry -> convertEnvVarToPropertyKey(mapEntry.getKey()), Map.Entry::getValue)));
40+
return properties;
41+
}
42+
}

tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/KafkaCommonProps.java

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,17 @@
22

33
import java.util.Properties;
44

5-
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
65
import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
7-
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
86
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
9-
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
10-
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG;
11-
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
12-
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
137

148
public class KafkaCommonProps {
159
static final int KAFKA_CLIENT_REQUEST_TIMEOUT_MS_CONFIG = 5000;
1610

1711
public static Properties get(String clientId) {
18-
Properties props = new Properties();
19-
props.put(BOOTSTRAP_SERVERS_CONFIG, System.getenv("KAFKA_BOOTSTRAP_SERVERS"));
12+
Properties props = ConfigUtil.getKafkaPropertiesFromEnv();
2013
props.put(CLIENT_ID_CONFIG, clientId);
21-
props.put(REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(KAFKA_CLIENT_REQUEST_TIMEOUT_MS_CONFIG));
22-
23-
String securityProtocol = System.getenv("KAFKA_SECURITY_PROTOCOL");
24-
25-
if (securityProtocol != null && securityProtocol.equals("SSL")) {
26-
props.put(SECURITY_PROTOCOL_CONFIG, securityProtocol);
27-
props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, System.getenv("KAFKA_SSL_TRUSTSTORE_LOCATION"));
28-
props.put(SSL_TRUSTSTORE_TYPE_CONFIG, System.getenv("KAFKA_SSL_TRUSTSTORE_TYPE"));
29-
props.put(SSL_KEYSTORE_LOCATION_CONFIG, System.getenv("KAFKA_SSL_KEYSTORE_LOCATION"));
30-
props.put(SSL_KEYSTORE_PASSWORD_CONFIG, System.getenv("KAFKA_SSL_KEYSTORE_PASSWORD"));
31-
}
32-
33-
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
14+
props.putIfAbsent(REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(KAFKA_CLIENT_REQUEST_TIMEOUT_MS_CONFIG));
15+
props.putIfAbsent("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
3416

3517
return props;
3618
}

0 commit comments

Comments
 (0)