Skip to content

Commit

Permalink
Merge branch 'XiaoMi:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
wtt40122 authored Sep 4, 2023
2 parents f9f7b9f + b988cb2 commit 0ec3b27
Show file tree
Hide file tree
Showing 23 changed files with 211 additions and 110 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@

**committers:**
- 张平
- 曹宝玉
- 高一波
- 董振兴
- 王志东
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ mq.rocketmq.nameseraddr=${hera.rocketmq.nameserver}
spring.redis.cluster.nodes=${hera.redis.url}
spring.redis.password=${hera.redis.password}
tpc.token.parse.url=http://mi-tpclogin:8098/login/token/parse
prometheus.token=adqSWsad19E8D4
prometheus.token=adqSWsad19E8D4
prometheus.pull.header=application/openmetrics-text; version=1.0.0; charset=utf-8
2 changes: 1 addition & 1 deletion hera-all/trace-etl/trace-etl-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<dependency>
<groupId>run.mone</groupId>
<artifactId>prometheus-trace-etl</artifactId>
<version>0.0.5-SNAPSHOT</version>
<version>0.0.6-SNAPSHOT</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import com.alibaba.nacos.api.config.annotation.NacosValue;
import com.xiaomi.hera.trace.etl.mq.rocketmq.ClientMessageQueue;
import com.xiaomi.hera.trace.etl.mq.rocketmq.RocketMqProducer;
import com.xiaomi.hera.trace.etl.util.ThriftUtil;
import com.xiaomi.hera.tspandata.TSpanData;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.thrift.TDeserializer;
Expand All @@ -17,9 +18,6 @@

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* @author dingtao
Expand Down Expand Up @@ -67,12 +65,12 @@ public void takeMessage() throws MQClientException {
log.info("init consumer end ...");
}

private class TraceEtlMessageListener implements MessageListenerOrderly {
private class TraceEtlMessageListener implements MessageListenerConcurrently {

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, final ConsumeOrderlyContext context) {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (list == null || list.isEmpty()) {
return ConsumeOrderlyStatus.SUCCESS;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
enterManager.enter();
enterManager.getProcessNum().incrementAndGet();
Expand All @@ -89,7 +87,7 @@ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, final ConsumeO
}
clientMessageQueue.enqueue(traceId, message);
}
return ConsumeOrderlyStatus.SUCCESS;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} finally {
enterManager.getProcessNum().decrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package com.xiaomi.hera.trace.etl.consumer;

import com.alibaba.nacos.api.config.annotation.NacosValue;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
import com.xiaomi.youpin.prometheus.client.MetricsManager;
import com.xiaomi.youpin.prometheus.client.Prometheus;
import com.xiaomi.youpin.prometheus.client.multi.MutiMetrics;
import io.prometheus.client.*;
import com.xiaomi.youpin.prometheus.client.multi.MutiPrometheus;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
import io.prometheus.client.exporter.common.TextFormat;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

Expand All @@ -33,9 +39,23 @@ public class DataCacheService {

private CopyOnWriteArrayList<byte[]> cacheData = new CopyOnWriteArrayList<>();

/**
* Start caching after pulling data from Prometheus.
* Prevent the risk of having too much data in cacheData and the metrics being cleared due to Prometheus being slow to discover during service startup.
*/
@Getter
@Setter
private boolean startCache = false;

@Resource
private MutiMetricsCall call;

@Resource
private EnterManager enterManager;

@NacosValue(value = "${prometheus.pull.header}", autoRefreshed = true)
private String prometheusPullHeader;

public int dataSize() {
return cacheData.size();
}
Expand All @@ -54,33 +74,29 @@ public void init() {
}, 0, 60, TimeUnit.SECONDS);

Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
try {
Stopwatch sw = Stopwatch.createStarted();
enterManager.getMonitor().enter();
if (startCache) {
try {
while (enterManager.getProcessNum().get() >= 0) {
TimeUnit.MILLISECONDS.sleep(200);
Stopwatch sw = Stopwatch.createStarted();
enterManager.getMonitor().enter();
try {
while (enterManager.getProcessNum().get() > 0) {
TimeUnit.MILLISECONDS.sleep(200);
}
call.change();
} finally {
enterManager.getMonitor().leave();
log.info("change use time:{}ms", sw.elapsed(TimeUnit.MILLISECONDS));
}
call.change();
} finally {
enterManager.getMonitor().leave();
log.info("change use time:{}ms", sw.elapsed(TimeUnit.MILLISECONDS));

cacheData();

} catch (Throwable ex) {
log.error(ex.getMessage(), ex);
}
cacheData();
} catch (Throwable ex) {
log.error(ex.getMessage(), ex);
} finally {
enterManager.getMonitor().leave();
}
}, 0, 15, TimeUnit.SECONDS);
}

@Resource
private EnterManager enterManager;




public byte[] getData() {
log.info("get data");
Stopwatch sw = Stopwatch.createStarted();
Expand Down Expand Up @@ -114,7 +130,7 @@ public void cacheData() {
log.info("export metrics error : ", e);
}
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputStreamWriter writer = new OutputStreamWriter(baos)) {
TextFormat.writeFormat(TextFormat.CONTENT_TYPE_004, writer, registry.filteredMetricFamilySamples(Sets.newHashSet(list)));
TextFormat.writeFormat(prometheusPullHeader, writer, registry.filteredMetricFamilySamples(Sets.newHashSet(list)));
writer.flush();
byte[] bytes = baos.toByteArray();
this.cacheData.add(bytes);
Expand All @@ -127,16 +143,44 @@ public void cacheData() {
});
}

public byte[] cacheDataSync() {
call.change();
log.info("cache data sync start");
Stopwatch sw = Stopwatch.createStarted();
List<String> list = new ArrayList<>();
MutiMetrics old = call.old();
CollectorRegistry registry = old.getRegistry();
try {
Field field = registry.getClass().getDeclaredField("namesToCollectors");
field.setAccessible(true);
Map<String, Collector> namesToCollectors = (Map<String, Collector>) field.get(registry);
list = namesToCollectors.keySet().stream()
.filter(it -> !it.endsWith("created"))
.collect(Collectors.toList());
} catch (Exception e) {
log.info("sync cache data error : ", e);
}
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputStreamWriter writer = new OutputStreamWriter(baos)) {
TextFormat.writeFormat(TextFormat.CONTENT_TYPE_004, writer, registry.filteredMetricFamilySamples(Sets.newHashSet(list)));
writer.flush();
return baos.toByteArray();
} catch (Throwable ex) {
log.error(ex.getMessage());
} finally {
clearMetrics(old);
log.info("sync cache data use time:{} ms", sw.elapsed(TimeUnit.MILLISECONDS));
}
return null;
}

private void clearMetrics(MutiMetrics old) {
try {
MetricsManager gMetricsMgr = old.gMetricsMgr;
if (gMetricsMgr instanceof Prometheus) {
Prometheus prometheus = (Prometheus) gMetricsMgr;
Map<String, Object> prometheusMetrics = prometheus.prometheusMetrics;
MutiPrometheus prometheus = old.gMetricsMgr;
if (prometheus != null) {
Map<String, Object> prometheusMetrics = prometheus.getPrometheusMetrics();
clearTypeMetrics(prometheusMetrics, old.getRegistry());
prometheus.prometheusMetrics.clear();
prometheus.prometheusTypeMetrics.clear();
prometheus.getPrometheusMetrics().clear();
prometheus.getPrometheusTypeMetrics().clear();
}
} catch (Exception e) {
log.error("clear metrics error", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ public class EnterManager {
@Getter
private Monitor monitor = new Monitor();

@Getter
private Monitor processMonitor = new Monitor();

@Getter
private AtomicInteger processNum = new AtomicInteger();

Expand All @@ -36,11 +33,4 @@ public void enter() {
monitor.leave();
}

public void processEnter() {
processMonitor.enter();
processMonitor.leave();
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ public void parse(TSpanData tSpanData) {
log.error("serviceName is empty : " + tSpanData);
return;
}
// 统计span来源、qps、日总量等信息
// Collect information such as span source, QPS, daily total, etc.
traceStatistics(serviceName);
// 解析TSpanData,转换为指标类
// Parse TSpanData and convert it into an indicator class.
MetricsParseResult metricsParseResult = metricsParse(tSpanData);
if (metricsParseResult.isIgnore()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import com.xiaomi.hera.trace.etl.consumer.DataCacheService;
import com.xiaomi.hera.trace.etl.consumer.EnterManager;
import com.xiaomi.youpin.prometheus.client.binder.ClassLoaderMetricsReduced;
import com.xiaomi.youpin.prometheus.client.binder.JvmGcMetricsReduced;
import com.xiaomi.youpin.prometheus.client.binder.JvmMemoryMetricsReduced;
Expand Down Expand Up @@ -60,6 +61,8 @@ public class HTTPServer {

@Resource
private DataCacheService dataCacheService;
@Resource
private EnterManager enterManager;

private HttpServer server;
private ExecutorService executorService;
Expand Down Expand Up @@ -164,14 +167,20 @@ public void handle(HttpExchange exchange) throws IOException {
os.write(HEALTHY_RESPONSE);
os.flush();
} else {
String contentType = TextFormat.chooseContentType(exchange.getRequestHeaders().getFirst("Accept"));
String acceptHeader = exchange.getRequestHeaders().getFirst("Accept");
log.info("prometheus pull header is : " + acceptHeader);
String contentType = TextFormat.chooseContentType(acceptHeader);
exchange.getResponseHeaders().set("Content-Type", contentType);
if ("/jvm".equals(path)) {
CollectorRegistry registry = this.registryMap.get("jvm");
Map<String, Object> dataMap = getData(contentType, registry, path, hostString);
data = (byte[]) dataMap.get("data");
} else {
data = dataCacheService.getData();
if (dataCacheService.isStartCache()) {
data = dataCacheService.getData();
} else {
data = firstPull();
}
}
exchange.sendResponseHeaders(200, data.length);
os.write(data);
Expand All @@ -190,6 +199,30 @@ public void handle(HttpExchange exchange) throws IOException {
}
}

/**
* In order to solve the problem that consumer messages have been coming in for a long time when the service starts, but Prometheus has not started pulling yet,
* causing the risk of clearing the metrics when the cacheData length in DataCacheService exceeds 4.
* When Prometheus pulls for the first time, it pulls directly from CollectRegister.
* The cacheData operation in ConsumerService will not be executed until 15 seconds after the first pull of Prometheus.
*
* @return
*/
private byte[] firstPull() {
try {
enterManager.getMonitor().enter();
while (enterManager.getProcessNum().get() > 0) {
TimeUnit.MILLISECONDS.sleep(200);
}
return dataCacheService.cacheDataSync();
} catch (Throwable ex) {
log.error("first pull error", ex);
} finally {
dataCacheService.setStartCache(true);
enterManager.getMonitor().leave();
}
return null;
}

private boolean filterRequest(HttpExchange exchange) {
if (StringUtils.isEmpty(ua)) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public class ClientMessageQueue {

private static final int CLIENT_QUEUE_SIZE = 2000;


private final static int FETCH_ROCKETMQ_QUEUE_GAP = 10;

public ClientMessageQueue(RocketMqProducer producer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public ClientMessageQueueWrapper(MessageQueue rocketMQMessageQueue, BlockingQueu
this.clientMessageQueue = clientMessageQueue;
this.producer = producer;
startExport();
log.info(rocketMQMessageQueue.getBrokerName() + " - "+rocketMQMessageQueue.getQueueId()+" start");
log.info(rocketMQMessageQueue.getBrokerName() + " - " + rocketMQMessageQueue.getQueueId() + " start");
}

public BlockingQueue<MessageExt> getClientMessageQueue() {
Expand All @@ -57,13 +57,13 @@ public void setRocketMQMessageQueue(MessageQueue rocketMQMessageQueue) {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null) return false;
if(o instanceof ClientMessageQueueWrapper) {
if (o instanceof ClientMessageQueueWrapper) {
ClientMessageQueueWrapper that = (ClientMessageQueueWrapper) o;
return Objects.equals(rocketMQMessageQueue, that.rocketMQMessageQueue);
}else if (o instanceof MessageQueue){
} else if (o instanceof MessageQueue) {
MessageQueue queue = (MessageQueue) o;
return Objects.equals(rocketMQMessageQueue, queue);
}else{
} else {
return false;
}
}
Expand Down Expand Up @@ -96,7 +96,7 @@ public void stopExport() {
running = false;
// Destroy thread pool
executor.shutdown();
log.info(rocketMQMessageQueue.getBrokerName()+" - "+rocketMQMessageQueue.getQueueId()+" stopped");
log.info(rocketMQMessageQueue.getBrokerName() + " - " + rocketMQMessageQueue.getQueueId() + " stopped");
}

private class ClientQueueExporter implements Runnable {
Expand Down
Loading

0 comments on commit 0ec3b27

Please sign in to comment.