Skip to content

Commit

Permalink
Merge pull request #597 from goodjava/master
Browse files Browse the repository at this point in the history
etl_update
  • Loading branch information
goodjava authored Aug 29, 2023
2 parents 4035fc5 + 9aac85c commit f1a457d
Show file tree
Hide file tree
Showing 12 changed files with 443 additions and 110 deletions.
8 changes: 8 additions & 0 deletions hera-all/trace-etl/trace-etl-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@
<version>1.0.0-SNAPSHOT</version>
</dependency>


<dependency>
<groupId>com.squareup.okio</groupId>
<artifactId>okio</artifactId>
<version>3.5.0</version>
</dependency>


</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* @author dingtao
Expand All @@ -44,6 +47,9 @@ public class ConsumerService {
@Autowired
private ClientMessageQueue clientMessageQueue;

@Autowired
private DataCacheService cacheService;

@PostConstruct
public void takeMessage() throws MQClientException {
// Before initializing rocketmq consumer,
Expand All @@ -62,6 +68,11 @@ public void takeMessage() throws MQClientException {

private class TraceEtlMessageListener implements MessageListenerConcurrently {

private AtomicInteger i = new AtomicInteger();

private AtomicLong time = new AtomicLong(System.currentTimeMillis());


@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (list == null || list.isEmpty()) {
Expand All @@ -79,6 +90,15 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeCo
}
clientMessageQueue.enqueue(traceId, message);
}

long now = System.currentTimeMillis();

i.addAndGet(list.size());
if (i.get() > 10000 || (now - time.get() >= TimeUnit.SECONDS.toMillis(4))) {
i.set(0);
time.set(now);
cacheService.cacheData();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package com.xiaomi.hera.trace.etl.consumer;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
import com.xiaomi.hera.trace.etl.constant.LockUtil;
import com.xiaomi.youpin.prometheus.client.Metrics;
import com.xiaomi.youpin.prometheus.client.MetricsManager;
import com.xiaomi.youpin.prometheus.client.Prometheus;
import io.prometheus.client.*;
import io.prometheus.client.exporter.common.TextFormat;
import lombok.extern.slf4j.Slf4j;
import okio.Buffer;
import org.springframework.stereotype.Service;

import java.io.ByteArrayOutputStream;
import java.io.OutputStreamWriter;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* @author [email protected]
* @date 2023/8/29 10:02
*/
@Service
@Slf4j
public class DataCacheService {

private CopyOnWriteArrayList<byte[]> data = new CopyOnWriteArrayList<>();

public byte[] getData() {
log.info("get data");
Stopwatch sw = Stopwatch.createStarted();
Buffer buffer = new Buffer();
try {
data.forEach(it -> buffer.write(it));
data.clear();
return buffer.readByteArray();
} finally {
log.info("get data use time:{}ms", sw.elapsed(TimeUnit.MILLISECONDS));
buffer.clear();
}
}


public void cacheData() {
log.info("cache data");
Stopwatch sw = Stopwatch.createStarted();
synchronized (LockUtil.lock) {
List<String> list = new ArrayList<>();
CollectorRegistry registry = CollectorRegistry.defaultRegistry;
try {
Field field = registry.getClass().getDeclaredField("namesToCollectors");
field.setAccessible(true);
Map<String, Collector> namesToCollectors = (Map<String, Collector>) field.get(registry);
list = namesToCollectors.keySet().stream()
.filter(it -> !it.endsWith("created"))
.collect(Collectors.toList());
} catch (Exception e) {
log.info("export metrics error : ", e);
}
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputStreamWriter writer = new OutputStreamWriter(baos)) {
TextFormat.writeFormat(TextFormat.CONTENT_TYPE_004, writer, registry.filteredMetricFamilySamples(Sets.newHashSet(list)));
writer.flush();
byte[] bytes = baos.toByteArray();
this.data.add(bytes);
} catch (Throwable ex) {
log.error(ex.getMessage());
} finally {
clearMetrics();
}
}
log.info("cache data use time:{}ms", sw.elapsed(TimeUnit.MILLISECONDS));
}


private void clearMetrics() {
try {
MetricsManager gMetricsMgr = Metrics.getInstance().gMetricsMgr;
if (gMetricsMgr instanceof Prometheus) {
Prometheus prometheus = (Prometheus) gMetricsMgr;
Map<String, Object> prometheusMetrics = prometheus.prometheusMetrics;
clearTypeMetrics(prometheusMetrics);
prometheus.prometheusMetrics.clear();
prometheus.prometheusTypeMetrics.clear();
}
} catch (Exception e) {
log.error("clear metrics error", e);
}
}

private void clearTypeMetrics(Map<String, Object> prometheusMetrics) {
for (String key : prometheusMetrics.keySet()) {
Object o = prometheusMetrics.get(key);
if (o instanceof Counter) {
Counter counter = (Counter) o;
CollectorRegistry.defaultRegistry.unregister(counter);
} else if (o instanceof Gauge) {
Gauge gauge = (Gauge) o;
gauge.clear();
CollectorRegistry.defaultRegistry.unregister(gauge);
} else if (o instanceof Histogram) {
Histogram histogram = (Histogram) o;
histogram.clear();
CollectorRegistry.defaultRegistry.unregister(histogram);
} else {
log.error("metrics : " + key + " Type conversion failed, original type : " + o.getClass().getName());
}
}
}


}
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package com.xiaomi.hera.trace.etl.util.nacos;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.client.naming.NacosNamingService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -15,13 +13,7 @@

import javax.annotation.PostConstruct;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.logging.Level;
import java.util.*;

@Component
public class NacosClientUtil {
Expand Down
Loading

0 comments on commit f1a457d

Please sign in to comment.