diff --git a/infrastructure/build.gradle b/infrastructure/build.gradle new file mode 100644 index 00000000..12c62cab --- /dev/null +++ b/infrastructure/build.gradle @@ -0,0 +1 @@ +bootJar.enabled = false \ No newline at end of file diff --git a/infrastructure/kafka/build.gradle b/infrastructure/kafka/build.gradle new file mode 100644 index 00000000..1826a9c8 --- /dev/null +++ b/infrastructure/kafka/build.gradle @@ -0,0 +1,54 @@ +plugins { + id 'java' + id 'org.springframework.boot' version '3.2.4' + id 'io.spring.dependency-management' version '1.1.4' + id 'com.github.davidmc24.gradle.plugin.avro' version '1.3.0' +} + +group = 'org.ecommerce' +version = '0.0.1-SNAPSHOT' + +java { + toolchain { + languageVersion = JavaLanguageVersion.of(17) + } +} + +configurations { + compileOnly { + extendsFrom annotationProcessor + } +} + +repositories { + mavenCentral() + maven { + url 'https://packages.confluent.io/maven/' + } +} + +dependencies { + implementation 'org.springframework.boot:spring-boot-starter' + implementation 'org.springframework.kafka:spring-kafka' + compileOnly 'org.projectlombok:lombok' + annotationProcessor 'org.projectlombok:lombok' + implementation 'org.apache.avro:avro:1.11.3' + implementation "io.confluent:kafka-avro-serializer:7.0.1" +} + +generateAvroJava { + source("src/main/resources/avro") + include("**/*.avsc") + outputDir = file("src/main/generated/model") +} + +avro { + outputCharacterEncoding = 'UTF-8' +} + +tasks.named('test') { + useJUnitPlatform() +} + +bootJar.enabled = false + diff --git a/infrastructure/kafka/src/main/generated/model/org/ecommerce/StockOperationModel.java b/infrastructure/kafka/src/main/generated/model/org/ecommerce/StockOperationModel.java new file mode 100644 index 00000000..f5d36cb3 --- /dev/null +++ b/infrastructure/kafka/src/main/generated/model/org/ecommerce/StockOperationModel.java @@ -0,0 +1,425 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.ecommerce; + +import org.apache.avro.generic.GenericArray; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.util.Utf8; +import org.apache.avro.message.BinaryMessageEncoder; +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.SchemaStore; + +@org.apache.avro.specific.AvroGenerated +public class StockOperationModel extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = -8597328948343056404L; + + + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"StockOperationModel\",\"namespace\":\"org.ecommerce\",\"fields\":[{\"name\":\"productId\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"total\",\"type\":[\"null\",\"int\"],\"default\":null}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + + private static final SpecificData MODEL$ = new SpecificData(); + + private static final BinaryMessageEncoder ENCODER = + new BinaryMessageEncoder(MODEL$, SCHEMA$); + + private static final BinaryMessageDecoder DECODER = + new BinaryMessageDecoder(MODEL$, SCHEMA$); + + /** + * Return the BinaryMessageEncoder instance used by this class. + * @return the message encoder used by this class + */ + public static BinaryMessageEncoder getEncoder() { + return ENCODER; + } + + /** + * Return the BinaryMessageDecoder instance used by this class. + * @return the message decoder used by this class + */ + public static BinaryMessageDecoder getDecoder() { + return DECODER; + } + + /** + * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}. + * @param resolver a {@link SchemaStore} used to find schemas by fingerprint + * @return a BinaryMessageDecoder instance for this class backed by the given SchemaStore + */ + public static BinaryMessageDecoder createDecoder(SchemaStore resolver) { + return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver); + } + + /** + * Serializes this StockOperationModel to a ByteBuffer. + * @return a buffer holding the serialized data for this instance + * @throws java.io.IOException if this instance could not be serialized + */ + public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { + return ENCODER.encode(this); + } + + /** + * Deserializes a StockOperationModel from a ByteBuffer. + * @param b a byte buffer holding serialized data for an instance of this class + * @return a StockOperationModel instance decoded from the given buffer + * @throws java.io.IOException if the given bytes could not be deserialized into an instance of this class + */ + public static StockOperationModel fromByteBuffer( + java.nio.ByteBuffer b) throws java.io.IOException { + return DECODER.decode(b); + } + + private java.lang.Integer productId; + private java.lang.Integer total; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use newBuilder(). + */ + public StockOperationModel() {} + + /** + * All-args constructor. + * @param productId The new value for productId + * @param total The new value for total + */ + public StockOperationModel(java.lang.Integer productId, java.lang.Integer total) { + this.productId = productId; + this.total = total; + } + + public org.apache.avro.specific.SpecificData getSpecificData() { return MODEL$; } + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return productId; + case 1: return total; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: productId = (java.lang.Integer)value$; break; + case 1: total = (java.lang.Integer)value$; break; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + /** + * Gets the value of the 'productId' field. + * @return The value of the 'productId' field. + */ + public java.lang.Integer getProductId() { + return productId; + } + + + /** + * Sets the value of the 'productId' field. + * @param value the value to set. + */ + public void setProductId(java.lang.Integer value) { + this.productId = value; + } + + /** + * Gets the value of the 'total' field. + * @return The value of the 'total' field. + */ + public java.lang.Integer getTotal() { + return total; + } + + + /** + * Sets the value of the 'total' field. + * @param value the value to set. + */ + public void setTotal(java.lang.Integer value) { + this.total = value; + } + + /** + * Creates a new StockOperationModel RecordBuilder. + * @return A new StockOperationModel RecordBuilder + */ + public static org.ecommerce.StockOperationModel.Builder newBuilder() { + return new org.ecommerce.StockOperationModel.Builder(); + } + + /** + * Creates a new StockOperationModel RecordBuilder by copying an existing Builder. + * @param other The existing builder to copy. + * @return A new StockOperationModel RecordBuilder + */ + public static org.ecommerce.StockOperationModel.Builder newBuilder(org.ecommerce.StockOperationModel.Builder other) { + if (other == null) { + return new org.ecommerce.StockOperationModel.Builder(); + } else { + return new org.ecommerce.StockOperationModel.Builder(other); + } + } + + /** + * Creates a new StockOperationModel RecordBuilder by copying an existing StockOperationModel instance. + * @param other The existing instance to copy. + * @return A new StockOperationModel RecordBuilder + */ + public static org.ecommerce.StockOperationModel.Builder newBuilder(org.ecommerce.StockOperationModel other) { + if (other == null) { + return new org.ecommerce.StockOperationModel.Builder(); + } else { + return new org.ecommerce.StockOperationModel.Builder(other); + } + } + + /** + * RecordBuilder for StockOperationModel instances. + */ + @org.apache.avro.specific.AvroGenerated + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + private java.lang.Integer productId; + private java.lang.Integer total; + + /** Creates a new Builder */ + private Builder() { + super(SCHEMA$, MODEL$); + } + + /** + * Creates a Builder by copying an existing Builder. + * @param other The existing Builder to copy. + */ + private Builder(org.ecommerce.StockOperationModel.Builder other) { + super(other); + if (isValidValue(fields()[0], other.productId)) { + this.productId = data().deepCopy(fields()[0].schema(), other.productId); + fieldSetFlags()[0] = other.fieldSetFlags()[0]; + } + if (isValidValue(fields()[1], other.total)) { + this.total = data().deepCopy(fields()[1].schema(), other.total); + fieldSetFlags()[1] = other.fieldSetFlags()[1]; + } + } + + /** + * Creates a Builder by copying an existing StockOperationModel instance + * @param other The existing instance to copy. + */ + private Builder(org.ecommerce.StockOperationModel other) { + super(SCHEMA$, MODEL$); + if (isValidValue(fields()[0], other.productId)) { + this.productId = data().deepCopy(fields()[0].schema(), other.productId); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.total)) { + this.total = data().deepCopy(fields()[1].schema(), other.total); + fieldSetFlags()[1] = true; + } + } + + /** + * Gets the value of the 'productId' field. + * @return The value. + */ + public java.lang.Integer getProductId() { + return productId; + } + + + /** + * Sets the value of the 'productId' field. + * @param value The value of 'productId'. + * @return This builder. + */ + public org.ecommerce.StockOperationModel.Builder setProductId(java.lang.Integer value) { + validate(fields()[0], value); + this.productId = value; + fieldSetFlags()[0] = true; + return this; + } + + /** + * Checks whether the 'productId' field has been set. + * @return True if the 'productId' field has been set, false otherwise. + */ + public boolean hasProductId() { + return fieldSetFlags()[0]; + } + + + /** + * Clears the value of the 'productId' field. + * @return This builder. + */ + public org.ecommerce.StockOperationModel.Builder clearProductId() { + productId = null; + fieldSetFlags()[0] = false; + return this; + } + + /** + * Gets the value of the 'total' field. + * @return The value. + */ + public java.lang.Integer getTotal() { + return total; + } + + + /** + * Sets the value of the 'total' field. + * @param value The value of 'total'. + * @return This builder. + */ + public org.ecommerce.StockOperationModel.Builder setTotal(java.lang.Integer value) { + validate(fields()[1], value); + this.total = value; + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'total' field has been set. + * @return True if the 'total' field has been set, false otherwise. + */ + public boolean hasTotal() { + return fieldSetFlags()[1]; + } + + + /** + * Clears the value of the 'total' field. + * @return This builder. + */ + public org.ecommerce.StockOperationModel.Builder clearTotal() { + total = null; + fieldSetFlags()[1] = false; + return this; + } + + @Override + @SuppressWarnings("unchecked") + public StockOperationModel build() { + try { + StockOperationModel record = new StockOperationModel(); + record.productId = fieldSetFlags()[0] ? this.productId : (java.lang.Integer) defaultValue(fields()[0]); + record.total = fieldSetFlags()[1] ? this.total : (java.lang.Integer) defaultValue(fields()[1]); + return record; + } catch (org.apache.avro.AvroMissingFieldException e) { + throw e; + } catch (java.lang.Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumWriter + WRITER$ = (org.apache.avro.io.DatumWriter)MODEL$.createDatumWriter(SCHEMA$); + + @Override public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + WRITER$.write(this, SpecificData.getEncoder(out)); + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumReader + READER$ = (org.apache.avro.io.DatumReader)MODEL$.createDatumReader(SCHEMA$); + + @Override public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + READER$.read(this, SpecificData.getDecoder(in)); + } + + @Override protected boolean hasCustomCoders() { return true; } + + @Override public void customEncode(org.apache.avro.io.Encoder out) + throws java.io.IOException + { + if (this.productId == null) { + out.writeIndex(0); + out.writeNull(); + } else { + out.writeIndex(1); + out.writeInt(this.productId); + } + + if (this.total == null) { + out.writeIndex(0); + out.writeNull(); + } else { + out.writeIndex(1); + out.writeInt(this.total); + } + + } + + @Override public void customDecode(org.apache.avro.io.ResolvingDecoder in) + throws java.io.IOException + { + org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff(); + if (fieldOrder == null) { + if (in.readIndex() != 1) { + in.readNull(); + this.productId = null; + } else { + this.productId = in.readInt(); + } + + if (in.readIndex() != 1) { + in.readNull(); + this.total = null; + } else { + this.total = in.readInt(); + } + + } else { + for (int i = 0; i < 2; i++) { + switch (fieldOrder[i].pos()) { + case 0: + if (in.readIndex() != 1) { + in.readNull(); + this.productId = null; + } else { + this.productId = in.readInt(); + } + break; + + case 1: + if (in.readIndex() != 1) { + in.readNull(); + this.total = null; + } else { + this.total = in.readInt(); + } + break; + + default: + throw new java.io.IOException("Corrupt ResolvingDecoder."); + } + } + } + } +} + + + + + + + + + + diff --git a/infrastructure/kafka/src/main/java/org/ecommerce/kafka/config/KafkaConfigData.java b/infrastructure/kafka/src/main/java/org/ecommerce/kafka/config/KafkaConfigData.java new file mode 100644 index 00000000..a9401b84 --- /dev/null +++ b/infrastructure/kafka/src/main/java/org/ecommerce/kafka/config/KafkaConfigData.java @@ -0,0 +1,17 @@ +package org.ecommerce.kafka.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +import lombok.Data; + +@Data +@Configuration +@ConfigurationProperties(prefix = "kafka-config") +public class KafkaConfigData { + private String bootstrapServers; + private String schemaRegistryUrlKey; + private String schemaRegistryUrl; + private Integer numOfPartitions; + private Short replicationFactor; +} diff --git a/infrastructure/kafka/src/main/java/org/ecommerce/kafka/config/KafkaConsumerConfigData.java b/infrastructure/kafka/src/main/java/org/ecommerce/kafka/config/KafkaConsumerConfigData.java new file mode 100644 index 00000000..05ecbbee --- /dev/null +++ b/infrastructure/kafka/src/main/java/org/ecommerce/kafka/config/KafkaConsumerConfigData.java @@ -0,0 +1,27 @@ +package org.ecommerce.kafka.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +import lombok.Data; + +@Data +@Configuration +@ConfigurationProperties(prefix = "kafka-consumer-config") +public class KafkaConsumerConfigData { + private String keyDeserializer; + private String valueDeserializer; + private String autoOffsetReset; + private String specificAvroReaderKey; + private String specificAvroReader; + private Boolean batchListener; + private Boolean autoStartup; + private Integer concurrencyLevel; + private Integer sessionTimeoutMs; + private Integer heartbeatIntervalMs; + private Integer maxPollIntervalMs; + private Long pollTimeoutMs; + private Integer maxPollRecords; + private Integer maxPartitionFetchBytesDefault; + private Integer maxPartitionFetchBytesBoostFactor; +} \ No newline at end of file diff --git a/infrastructure/kafka/src/main/java/org/ecommerce/kafka/config/KafkaProducerConfigData.java b/infrastructure/kafka/src/main/java/org/ecommerce/kafka/config/KafkaProducerConfigData.java new file mode 100644 index 00000000..e75784e4 --- /dev/null +++ b/infrastructure/kafka/src/main/java/org/ecommerce/kafka/config/KafkaProducerConfigData.java @@ -0,0 +1,21 @@ +package org.ecommerce.kafka.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +import lombok.Data; + +@Data +@Configuration +@ConfigurationProperties(prefix = "kafka-producer-config") +public class KafkaProducerConfigData { + private String keySerializerClass; + private String valueSerializerClass; + private String compressionType; + private String acks; + private Integer batchSize; + private Integer batchSizeBoostFactor; + private Integer lingerMs; + private Integer requestTimeoutMs; + private Integer retryCount; +} diff --git a/infrastructure/kafka/src/main/java/org/ecommerce/kafka/consumer/config/KafkaConsumerConfig.java b/infrastructure/kafka/src/main/java/org/ecommerce/kafka/consumer/config/KafkaConsumerConfig.java new file mode 100644 index 00000000..ebd16f25 --- /dev/null +++ b/infrastructure/kafka/src/main/java/org/ecommerce/kafka/consumer/config/KafkaConsumerConfig.java @@ -0,0 +1,62 @@ +package org.ecommerce.kafka.consumer.config; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.ecommerce.kafka.config.KafkaConfigData; +import org.ecommerce.kafka.config.KafkaConsumerConfigData; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; + +import lombok.RequiredArgsConstructor; + +@Configuration +@RequiredArgsConstructor +public class KafkaConsumerConfig { + + private final KafkaConfigData kafkaConfigData; + private final KafkaConsumerConfigData kafkaConsumerConfigData; + + @Bean + public Map consumerConfigs() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigData.getBootstrapServers()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaConsumerConfigData.getKeyDeserializer()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaConsumerConfigData.getValueDeserializer()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConsumerConfigData.getAutoOffsetReset()); + props.put(kafkaConfigData.getSchemaRegistryUrlKey(), kafkaConfigData.getSchemaRegistryUrl()); + props.put(kafkaConsumerConfigData.getSpecificAvroReaderKey(), kafkaConsumerConfigData.getSpecificAvroReader()); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConsumerConfigData.getSessionTimeoutMs()); + props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaConsumerConfigData.getHeartbeatIntervalMs()); + props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerConfigData.getMaxPollIntervalMs()); + props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, + kafkaConsumerConfigData.getMaxPartitionFetchBytesDefault() * + kafkaConsumerConfigData.getMaxPartitionFetchBytesBoostFactor()); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerConfigData.getMaxPollRecords()); + return props; + } + + @Bean + public ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerConfigs()); + } + + @Bean + public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.setBatchListener(kafkaConsumerConfigData.getBatchListener()); + factory.setConcurrency(kafkaConsumerConfigData.getConcurrencyLevel()); + factory.setAutoStartup(kafkaConsumerConfigData.getAutoStartup()); + factory.getContainerProperties().setPollTimeout(kafkaConsumerConfigData.getPollTimeoutMs()); + return factory; + } +} diff --git a/infrastructure/kafka/src/main/java/org/ecommerce/kafka/producer/config/KafkaProducerConfig.java b/infrastructure/kafka/src/main/java/org/ecommerce/kafka/producer/config/KafkaProducerConfig.java new file mode 100644 index 00000000..2c2b0741 --- /dev/null +++ b/infrastructure/kafka/src/main/java/org/ecommerce/kafka/producer/config/KafkaProducerConfig.java @@ -0,0 +1,52 @@ +package org.ecommerce.kafka.producer.config; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.ecommerce.kafka.config.KafkaConfigData; +import org.ecommerce.kafka.config.KafkaProducerConfigData; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +import lombok.RequiredArgsConstructor; + +@Configuration +@RequiredArgsConstructor +public class KafkaProducerConfig { + + private final KafkaConfigData kafkaConfigData; + private final KafkaProducerConfigData kafkaProducerConfigData; + + @Bean + public Map producerConfig() { + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigData.getBootstrapServers()); + props.put(kafkaConfigData.getSchemaRegistryUrlKey(), kafkaConfigData.getSchemaRegistryUrl()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaProducerConfigData.getKeySerializerClass()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaProducerConfigData.getValueSerializerClass()); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaProducerConfigData.getBatchSize() * + kafkaProducerConfigData.getBatchSizeBoostFactor()); + props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProducerConfigData.getLingerMs()); + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, kafkaProducerConfigData.getCompressionType()); + props.put(ProducerConfig.ACKS_CONFIG, kafkaProducerConfigData.getAcks()); + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaProducerConfigData.getRequestTimeoutMs()); + props.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerConfigData.getRetryCount()); + return props; + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfig()); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} diff --git a/infrastructure/kafka/src/main/java/org/ecommerce/kafka/producer/service/KafkaProducer.java b/infrastructure/kafka/src/main/java/org/ecommerce/kafka/producer/service/KafkaProducer.java new file mode 100644 index 00000000..9ade867c --- /dev/null +++ b/infrastructure/kafka/src/main/java/org/ecommerce/kafka/producer/service/KafkaProducer.java @@ -0,0 +1,32 @@ +package org.ecommerce.kafka.producer.service; + +import java.io.Serializable; + +import org.apache.avro.specific.SpecificRecordBase; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +import jakarta.annotation.PreDestroy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Service +@RequiredArgsConstructor +@Slf4j +public class KafkaProducer { + + private final KafkaTemplate kafkaTemplate; + + // TODO : retry 및 예외처리 + public void send(String topicName, K key, V message) { + kafkaTemplate.send(topicName, key, message); + } + + @PreDestroy + public void close() { + if (kafkaTemplate != null) { + log.info("Closing kafka producer!"); + kafkaTemplate.destroy(); + } + } +} diff --git a/infrastructure/kafka/src/main/resources/avro/stockOperationModel.avsc b/infrastructure/kafka/src/main/resources/avro/stockOperationModel.avsc new file mode 100644 index 00000000..77bd0b69 --- /dev/null +++ b/infrastructure/kafka/src/main/resources/avro/stockOperationModel.avsc @@ -0,0 +1,17 @@ +{ + "type": "record", + "name": "StockOperationModel", + "namespace": "org.ecommerce", + "fields": [ + { + "name": "productId", + "type": ["null", "int"], + "default": null + }, + { + "name": "total", + "type": ["null", "int"], + "default": null + } + ] +} diff --git a/order-api/build.gradle b/order-api/build.gradle index 2ac489f4..eed424c4 100644 --- a/order-api/build.gradle +++ b/order-api/build.gradle @@ -2,10 +2,19 @@ plugins { id 'java' id 'org.springframework.boot' version '3.2.4' id 'io.spring.dependency-management' version '1.1.4' + id 'com.github.davidmc24.gradle.plugin.avro' version '1.3.0' +} + +repositories { + mavenCentral() + maven { + url 'https://packages.confluent.io/maven/' + } } dependencies { implementation(project(':common')) + implementation(project(':infrastructure:kafka')) implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.boot:spring-boot-starter-web' @@ -51,6 +60,7 @@ dependencies { // kafka implementation 'org.springframework.kafka:spring-kafka' + implementation "io.confluent:kafka-avro-serializer:7.0.1" //security testImplementation 'org.springframework.security:spring-security-test' diff --git a/order-api/src/main/java/org/ecommerce/orderapi/OrderApiApplication.java b/order-api/src/main/java/org/ecommerce/orderapi/OrderApiApplication.java index 3f570aad..47c5bee1 100644 --- a/order-api/src/main/java/org/ecommerce/orderapi/OrderApiApplication.java +++ b/order-api/src/main/java/org/ecommerce/orderapi/OrderApiApplication.java @@ -7,7 +7,7 @@ import org.springframework.scheduling.annotation.EnableAsync; @SpringBootApplication(scanBasePackages = - {"org.ecommerce.common", "org.ecommerce.orderapi"}, + {"org.ecommerce.common", "org.ecommerce.kafka", "org.ecommerce.orderapi"}, nameGenerator = FullyQualifiedAnnotationBeanNameGenerator.class) @EnableFeignClients @EnableAsync diff --git a/order-api/src/main/java/org/ecommerce/orderapi/global/config/KafkaProducerConfig.java b/order-api/src/main/java/org/ecommerce/orderapi/global/config/KafkaProducerConfig.java deleted file mode 100644 index 7b5fcf20..00000000 --- a/order-api/src/main/java/org/ecommerce/orderapi/global/config/KafkaProducerConfig.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.ecommerce.orderapi.global.config; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.support.serializer.JsonSerializer; - -import lombok.RequiredArgsConstructor; - -@Configuration -@RequiredArgsConstructor -public class KafkaProducerConfig { - - @Bean - public ProducerFactory producerFactory() { - Map config = new HashMap<>(); - config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092, localhost:29092, localhost:39092"); - config.put("schemaRegistryUrlKey", "localhost:9090"); - config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); - config.put(JsonSerializer.TYPE_MAPPINGS, "StockMessage:org.ecommerce.orderapi.stock.dto.StockOperationMessage"); - return new DefaultKafkaProducerFactory<>(config); - } - - @Bean - public KafkaTemplate kafkaTemplate() { - return new KafkaTemplate<>(producerFactory()); - } -} diff --git a/order-api/src/main/java/org/ecommerce/orderapi/stock/dto/StockMapper.java b/order-api/src/main/java/org/ecommerce/orderapi/stock/dto/StockMapper.java index d934b2f0..a5bc3a60 100644 --- a/order-api/src/main/java/org/ecommerce/orderapi/stock/dto/StockMapper.java +++ b/order-api/src/main/java/org/ecommerce/orderapi/stock/dto/StockMapper.java @@ -1,5 +1,6 @@ package org.ecommerce.orderapi.stock.dto; +import org.ecommerce.StockOperationModel; import org.ecommerce.orderapi.stock.entity.Stock; import org.mapstruct.Mapper; import org.mapstruct.ReportingPolicy; @@ -13,4 +14,6 @@ public interface StockMapper { StockDto toStockDto(Stock stock); StockDto.Response toResponse(StockDto stockDto); + + StockOperationModel toOperationModel(Stock stock); } diff --git a/order-api/src/main/java/org/ecommerce/orderapi/stock/event/publisher/StockDecreasedEventKafkaPublisher.java b/order-api/src/main/java/org/ecommerce/orderapi/stock/event/publisher/StockDecreasedEventKafkaPublisher.java index edc877b1..a98ce743 100644 --- a/order-api/src/main/java/org/ecommerce/orderapi/stock/event/publisher/StockDecreasedEventKafkaPublisher.java +++ b/order-api/src/main/java/org/ecommerce/orderapi/stock/event/publisher/StockDecreasedEventKafkaPublisher.java @@ -2,8 +2,8 @@ import java.util.List; -import org.ecommerce.orderapi.stock.dto.StockOperationMessage; -import org.ecommerce.orderapi.stock.messaging.KafkaProducer; +import org.ecommerce.StockOperationModel; +import org.ecommerce.kafka.producer.service.KafkaProducer; import org.springframework.stereotype.Component; import lombok.RequiredArgsConstructor; @@ -14,11 +14,11 @@ @RequiredArgsConstructor public class StockDecreasedEventKafkaPublisher { - private final KafkaProducer kafkaProducer; + private final KafkaProducer kafkaProducer; - public void publish(final List stockOperationMessages) { - for (StockOperationMessage stockOperationMessage : stockOperationMessages) { - kafkaProducer.send(stockOperationMessage); + public void publish(final List stockOperationModels) { + for (StockOperationModel stockOperationModel : stockOperationModels) { + kafkaProducer.send("topic", "key", stockOperationModel); } } } diff --git a/order-api/src/main/java/org/ecommerce/orderapi/stock/messaging/KafkaProducer.java b/order-api/src/main/java/org/ecommerce/orderapi/stock/messaging/KafkaProducer.java deleted file mode 100644 index 46d0b5c0..00000000 --- a/order-api/src/main/java/org/ecommerce/orderapi/stock/messaging/KafkaProducer.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.ecommerce.orderapi.stock.messaging; - -import org.ecommerce.orderapi.stock.dto.StockOperationMessage; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.stereotype.Component; - -import lombok.RequiredArgsConstructor; - -@Component -@RequiredArgsConstructor -public class KafkaProducer { - private final KafkaTemplate kafkaTemplate; - - public void send(final StockOperationMessage message) { - kafkaTemplate.send("topic", message); - } -} diff --git a/order-api/src/main/java/org/ecommerce/orderapi/stock/service/StockDomainService.java b/order-api/src/main/java/org/ecommerce/orderapi/stock/service/StockDomainService.java index af91546f..2920149b 100644 --- a/order-api/src/main/java/org/ecommerce/orderapi/stock/service/StockDomainService.java +++ b/order-api/src/main/java/org/ecommerce/orderapi/stock/service/StockDomainService.java @@ -9,6 +9,7 @@ import java.util.Map; import java.util.Set; +import org.ecommerce.StockOperationModel; import org.ecommerce.common.error.CustomException; import org.ecommerce.orderapi.order.entity.Order; import org.ecommerce.orderapi.order.entity.OrderItem; @@ -17,7 +18,6 @@ import org.ecommerce.orderapi.stock.aop.StockLock; import org.ecommerce.orderapi.stock.dto.StockDto; import org.ecommerce.orderapi.stock.dto.StockMapper; -import org.ecommerce.orderapi.stock.dto.StockOperationMessage; import org.ecommerce.orderapi.stock.entity.Stock; import org.ecommerce.orderapi.stock.entity.enumerated.StockOperationResult; import org.ecommerce.orderapi.stock.event.StockDecreasedEvent; @@ -59,7 +59,7 @@ public void decreaseStocks(final Long orderId) { final Map stockMap = getStockMap(productIds); final Set successfulOrderItemIds = new HashSet<>(); - final List stockOperationMessages = new ArrayList<>(); + final List stockOperationMessages = new ArrayList<>(); orderItems.forEach( orderItem -> { Stock stock = stockMap.get(orderItem.getProductId()); @@ -72,10 +72,7 @@ public void decreaseStocks(final Long orderId) { if (result.isSuccess()) { successfulOrderItemIds.add(orderItem.getId()); stockOperationMessages.add( - StockOperationMessage.of( - stock.getProductId(), - stock.getTotal()) - ); + StockMapper.INSTANCE.toOperationModel(stock)); } } ); diff --git a/order-api/src/main/resources/application-dev.yml b/order-api/src/main/resources/application-dev.yml index 4e2ce6c7..e3077ff0 100644 --- a/order-api/src/main/resources/application-dev.yml +++ b/order-api/src/main/resources/application-dev.yml @@ -32,4 +32,39 @@ spring: port: 6379 jwt: - secret: ${SECRET_KEY_SOURCE} \ No newline at end of file + secret: ${SECRET_KEY_SOURCE} + +kafka-config: + bootstrap-servers: localhost:19092, localhost:29092, localhost:39092 + schema-registry-url-key: schema.registry.url + schema-registry-url: http://localhost:9090 + num-of-partitions: 3 + replication-factor: 3 + +kafka-producer-config: + key-serializer-class: org.apache.kafka.common.serialization.StringSerializer + value-serializer-class: io.confluent.kafka.serializers.KafkaAvroSerializer + compression-type: none + acks: all + batch-size: 16384 + batch-size-boost-factor: 100 + linger-ms: 5 + request-timeout-ms: 60000 + retry-count: 5 + +kafka-consumer-config: + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer + auto-offset-reset: earliest + specific-avro-reader-key: specific.avro.reader + specific-avro-reader: true + batch-listener: true + auto-startup: true + concurrency-level: 3 + session-timeout-ms: 10000 + heartbeat-interval-ms: 3000 + max-poll-interval-ms: 300000 + max-poll-records: 500 + max-partition-fetch-bytes-default: 1048576 + max-partition-fetch-bytes-boost-factor: 1 + poll-timeout-ms: 150 \ No newline at end of file diff --git a/order-api/src/main/resources/application-local.yml b/order-api/src/main/resources/application-local.yml index d8dd1225..6bc668ce 100644 --- a/order-api/src/main/resources/application-local.yml +++ b/order-api/src/main/resources/application-local.yml @@ -32,4 +32,39 @@ spring: port: 6379 jwt: - secret: ${SECRET_KEY_SOURCE} \ No newline at end of file + secret: ${SECRET_KEY_SOURCE} + +kafka-config: + bootstrap-servers: localhost:19092, localhost:29092, localhost:39092 + schema-registry-url-key: schema.registry.url + schema-registry-url: http://localhost:9090 + num-of-partitions: 3 + replication-factor: 3 + +kafka-producer-config: + key-serializer-class: org.apache.kafka.common.serialization.StringSerializer + value-serializer-class: io.confluent.kafka.serializers.KafkaAvroSerializer + compression-type: none + acks: all + batch-size: 16384 + batch-size-boost-factor: 100 + linger-ms: 5 + request-timeout-ms: 60000 + retry-count: 5 + +kafka-consumer-config: + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer + auto-offset-reset: earliest + specific-avro-reader-key: specific.avro.reader + specific-avro-reader: true + batch-listener: true + auto-startup: true + concurrency-level: 3 + session-timeout-ms: 10000 + heartbeat-interval-ms: 3000 + max-poll-interval-ms: 300000 + max-poll-records: 500 + max-partition-fetch-bytes-default: 1048576 + max-partition-fetch-bytes-boost-factor: 1 + poll-timeout-ms: 150 \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index fe27aaaa..422b56bb 100644 --- a/settings.gradle +++ b/settings.gradle @@ -4,4 +4,5 @@ include 'order-api' include 'payment-api' include 'product-api' include 'statistic-api' -include 'common' \ No newline at end of file +include 'common' +include 'infrastructure:kafka'