From e697d038f7552b1378e753c0d63a30c21485d4e1 Mon Sep 17 00:00:00 2001
From: xiaotietietie <145418862+xiaotietietie@users.noreply.github.com>
Date: Wed, 29 Nov 2023 14:08:56 +0800
Subject: [PATCH] apply kafka (#760)
---
ozhera-all/ozhera-log/log-agent/pom.xml | 6 +-
.../log/agent/extension/KafkaExporter.java | 62 ++++++++++++
.../log/agent/extension/KafkaMQService.java | 99 +++++++++++++++++++
.../mone/log/agent/extension/KafkaOutput.java | 45 +++++++++
4 files changed, 211 insertions(+), 1 deletion(-)
create mode 100644 ozhera-all/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/extension/KafkaExporter.java
create mode 100644 ozhera-all/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/extension/KafkaMQService.java
create mode 100644 ozhera-all/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/extension/KafkaOutput.java
diff --git a/ozhera-all/ozhera-log/log-agent/pom.xml b/ozhera-all/ozhera-log/log-agent/pom.xml
index c0da5f15e..6c1f8356f 100644
--- a/ozhera-all/ozhera-log/log-agent/pom.xml
+++ b/ozhera-all/ozhera-log/log-agent/pom.xml
@@ -85,7 +85,11 @@
-
+
+ org.apache.kafka
+ kafka-clients
+ 3.5.1
+
org.projectlombok
lombok
diff --git a/ozhera-all/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/extension/KafkaExporter.java b/ozhera-all/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/extension/KafkaExporter.java
new file mode 100644
index 000000000..2f191f9cc
--- /dev/null
+++ b/ozhera-all/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/extension/KafkaExporter.java
@@ -0,0 +1,62 @@
+package com.xiaomi.mone.log.agent.extension;
+
+import com.google.common.collect.Lists;
+import com.xiaomi.mone.log.agent.export.MsgExporter;
+import com.xiaomi.mone.log.api.model.msg.LineMessage;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.List;
+
+
+public class KafkaExporter implements MsgExporter {
+
+ private Producer mqProducer;
+
+ private String rmqTopic;
+
+ private Integer batchSize;
+
+ public KafkaExporter(Producer mqProducer) {
+ this.mqProducer = mqProducer;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void export(LineMessage message) {
+ this.export(Lists.newArrayList(message));
+ }
+
+ @Override
+ public void export(List messageList) {
+ if (messageList.isEmpty()) {
+ return;
+ }
+
+ for (LineMessage lineMessage : messageList) {
+ ProducerRecord record = new ProducerRecord<>(rmqTopic, "message", gson.toJson(lineMessage));
+ mqProducer.send(record);
+ }
+
+ }
+
+ public String getRmqTopic() {
+ return rmqTopic;
+ }
+
+ public void setRmqTopic(String rmqTopic) {
+ this.rmqTopic = rmqTopic;
+ }
+
+ public Integer getBatchSize() {
+ return batchSize;
+ }
+
+ public void setBatchSize(Integer batchSize) {
+ this.batchSize = batchSize;
+ }
+}
diff --git a/ozhera-all/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/extension/KafkaMQService.java b/ozhera-all/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/extension/KafkaMQService.java
new file mode 100644
index 000000000..0db62462a
--- /dev/null
+++ b/ozhera-all/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/extension/KafkaMQService.java
@@ -0,0 +1,99 @@
+package com.xiaomi.mone.log.agent.extension;
+
+import com.google.common.base.Preconditions;
+import com.xiaomi.mone.log.agent.export.MsgExporter;
+import com.xiaomi.mone.log.agent.output.Output;
+import com.xiaomi.mone.log.agent.service.OutPutService;
+import com.xiaomi.mone.log.api.model.meta.LogPattern;
+import com.xiaomi.mone.log.api.model.meta.MQConfig;
+import com.xiaomi.youpin.docean.anno.Service;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.xiaomi.mone.log.common.Constant.DEFAULT_CONSUMER_GROUP;
+
+@Service(name = "KafkaMQService")
+@Slf4j
+public class KafkaMQService implements OutPutService {
+
+ private ConcurrentHashMap producerMap;
+
+ public void init() {
+ producerMap = new ConcurrentHashMap<>(128);
+ }
+
+ @Override
+ public boolean compare(Output oldOutPut, Output newOutPut) {
+
+ return false;
+ }
+
+ @Override
+ public void preCheckOutput(Output output) {
+ KafkaOutput rmqOutput = (KafkaOutput) output;
+ Preconditions.checkArgument(null != rmqOutput.getClusterInfo(), "rmqOutput.getClusterInfo can not be null");
+ Preconditions.checkArgument(null != rmqOutput.getTopic(), "rmqOutput.getTopic can not be null");
+ }
+
+ @Override
+ public MsgExporter exporterTrans(Output output) throws Exception {
+ KafkaOutput kafkaOutput = (KafkaOutput) output;
+ String nameSrvAddr = kafkaOutput.getClusterInfo();
+ Producer mqProducer = producerMap.get(nameSrvAddr);
+ if (null == mqProducer) {
+ mqProducer = initMqProducer(kafkaOutput);
+ producerMap.put(String.valueOf(nameSrvAddr), mqProducer);
+ }
+
+ KafkaExporter rmqExporter = new KafkaExporter(mqProducer);
+ rmqExporter.setRmqTopic(kafkaOutput.getTopic());
+ rmqExporter.setBatchSize(kafkaOutput.getBatchExportSize());
+
+ return rmqExporter;
+ }
+
+ private Producer initMqProducer(KafkaOutput output) {
+ Properties properties = new Properties();
+ properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, output.getClusterInfo());
+ properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ if (StringUtils.isNotEmpty(output.getAk()) && StringUtils.isNotEmpty(output.getSk())) {
+ properties.setProperty("security.protocol", "SASL_SSL");
+ properties.setProperty("sasl.mechanism", "PLAIN");
+ properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + output.getAk() + "\" password=\"" + output.getSk() + "\";");
+ }
+ Producer producer = new KafkaProducer<>(properties);
+ return producer;
+ }
+
+ @Override
+ public void removeMQ(Output output) {
+ KafkaOutput kafkaOutput = (KafkaOutput) output;
+ if (null != producerMap.get(kafkaOutput.getClusterInfo())) {
+ producerMap.get(kafkaOutput.getClusterInfo()).close();
+ }
+ }
+
+ @Override
+ public Output configOutPut(LogPattern logPattern) {
+
+ MQConfig mqConfig = logPattern.getMQConfig();
+ KafkaOutput output = new KafkaOutput();
+ output.setOutputType(KafkaOutput.OUTPUT_KAFKAMQ);
+ output.setClusterInfo(mqConfig.getClusterInfo());
+ output.setProducerGroup(mqConfig.getProducerGroup());
+ output.setAk(mqConfig.getAk());
+ output.setSk(mqConfig.getSk());
+ output.setTopic(mqConfig.getTopic());
+ output.setPartitionCnt(mqConfig.getPartitionCnt());
+ output.setTag(mqConfig.getTag());
+ output.setProducerGroup(DEFAULT_CONSUMER_GROUP + (null == logPattern.getPatternCode() ? "" : logPattern.getPatternCode()));
+ return output;
+ }
+}
diff --git a/ozhera-all/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/extension/KafkaOutput.java b/ozhera-all/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/extension/KafkaOutput.java
new file mode 100644
index 000000000..58f9128bc
--- /dev/null
+++ b/ozhera-all/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/extension/KafkaOutput.java
@@ -0,0 +1,45 @@
+package com.xiaomi.mone.log.agent.extension;
+
+import com.xiaomi.mone.log.agent.output.Output;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+import java.io.Serializable;
+
+@Data
+@EqualsAndHashCode
+public class KafkaOutput extends Output implements Serializable {
+
+ public static final String OUTPUT_KAFKAMQ = "kafkamq";
+
+ private String serviceName = "KafkaMQService";
+
+ /**
+ * mq fill:namesrv_addr
+ */
+ private String clusterInfo;
+
+ private String producerGroup;
+
+ private String orgId;
+
+ private String ak;
+
+ private String sk;
+
+ private String topic;
+
+ private Integer partitionCnt;
+
+ private Integer batchExportSize;
+
+ @Override
+ public String getEndpoint() {
+ return clusterInfo;
+ }
+
+ @Override
+ public String getServiceName() {
+ return serviceName;
+ }
+}