Skip to content

Commit 22a66a2

Browse files
authored
Merge pull request #38 from ambud/master
Create monitor for Kafka Producer metrics and expose them as singer metrics
2 parents 4d4cef2 + f6cee83 commit 22a66a2

File tree

11 files changed

+183
-7
lines changed

11 files changed

+183
-7
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<modelVersion>4.0.0</modelVersion>
77
<groupId>com.pinterest.singer</groupId>
88
<artifactId>singer-package</artifactId>
9-
<version>0.8.0.1</version>
9+
<version>0.8.0.2</version>
1010
<packaging>pom</packaging>
1111
<description>Singer Logging Agent modules</description>
1212
<inceptionYear>2013</inceptionYear>

singer-commons/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<parent>
2121
<groupId>com.pinterest.singer</groupId>
2222
<artifactId>singer-package</artifactId>
23-
<version>0.8.0.1</version>
23+
<version>0.8.0.2</version>
2424
<relativePath>../pom.xml</relativePath>
2525
</parent>
2626
<developers>

singer/deb.version

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.8.0.1
1+
0.8.0.2

singer/pom.xml

+7-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>com.pinterest.singer</groupId>
99
<artifactId>singer-package</artifactId>
10-
<version>0.8.0.1</version>
10+
<version>0.8.0.2</version>
1111
<relativePath>../pom.xml</relativePath>
1212
</parent>
1313
<licenses>
@@ -189,6 +189,12 @@
189189
<artifactId>aws-java-sdk-s3</artifactId>
190190
<version>1.11.35</version>
191191
</dependency>
192+
<dependency>
193+
<groupId>com.salesforce.kafka.test</groupId>
194+
<artifactId>kafka-junit4</artifactId>
195+
<version>3.2.0</version>
196+
<scope>test</scope>
197+
</dependency>
192198
</dependencies>
193199
<build>
194200
<resources>

singer/src/main/java/com/pinterest/singer/SingerMain.java

+8
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,14 @@ public void run() {
6262
} catch (Throwable t) {
6363
LOG.error("Shutdown failure: heartbeat generator : ", t);
6464
}
65+
66+
try {
67+
if (SingerSettings.getKafkaProducerMonitorThread() != null) {
68+
SingerSettings.getKafkaProducerMonitorThread().interrupt();
69+
}
70+
}catch(Throwable t) {
71+
LOG.error("Shutdown error: kafka producer metrics monitor : ", t);
72+
}
6573

6674
try {
6775
KafkaProducerManager.shutdown();

singer/src/main/java/com/pinterest/singer/common/SingerSettings.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import com.pinterest.singer.monitor.LogStreamManager;
2727
import com.pinterest.singer.thrift.configuration.SingerConfig;
2828
import com.pinterest.singer.thrift.configuration.SingerLogConfig;
29-
29+
import com.pinterest.singer.writer.KafkaProducerMetricsMonitor;
3030
import com.twitter.ostrich.stats.Stats;
3131
import com.google.common.annotations.VisibleForTesting;
3232
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -98,6 +98,8 @@ public final class SingerSettings {
9898
// loggingAuditClient is used to send LoggingAuditEvent if LoggingAudit feature is enabled and
9999
// a TopicAuditConfig is set for a given logStream.
100100
private static LoggingAuditClient loggingAuditClient = null;
101+
102+
private static Thread kafkaProducerMonitorThread;
101103

102104
private SingerSettings() {
103105
}
@@ -155,6 +157,10 @@ public static void initialize(SingerConfig config)
155157

156158
logWritingExecutors.put(clusterSig, threadPool);
157159
}
160+
161+
kafkaProducerMonitorThread = new Thread(new KafkaProducerMetricsMonitor());
162+
kafkaProducerMonitorThread.setDaemon(true);
163+
kafkaProducerMonitorThread.start();
158164

159165
if (loggingAuditClient != null && logConfig.isEnableLoggingAudit() &&
160166
logConfig.getAuditConfig() != null){
@@ -352,4 +358,8 @@ public static LoggingAuditClient getLoggingAuditClient() {
352358
public static void setLoggingAuditClient(LoggingAuditClient loggingAuditClient) {
353359
SingerSettings.loggingAuditClient = loggingAuditClient;
354360
}
361+
362+
public static Thread getKafkaProducerMonitorThread() {
363+
return kafkaProducerMonitorThread;
364+
}
355365
}

singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@
119119
public class LogConfigUtils {
120120

121121
private static final Logger LOG = LoggerFactory.getLogger(LogConfigUtils.class);
122-
private static final String DEFAULT_SERVERSET_DIR = "/var/serverset";
122+
public static final String DEFAULT_SERVERSET_DIR = "/var/serverset";
123123
private static final String DEFAULT_ACKS = "1";
124124
private static final String ACKS_ALL = "all";
125125
private static final long MaximumProcessingTimeSliceInMilliseconds = 864000000L;

singer/src/main/java/com/pinterest/singer/writer/KafkaProducerManager.java

+5
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
2626

27+
import java.util.Map;
2728
import java.util.concurrent.ConcurrentHashMap;
2829

2930
/**
@@ -125,4 +126,8 @@ private void shutdownInternal() {
125126
}
126127
}
127128
}
129+
130+
public Map<KafkaProducerConfig, KafkaProducer<byte[], byte[]>> getProducers() {
131+
return producers;
132+
}
128133
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/**
2+
* Copyright 2020 Pinterest, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.pinterest.singer.writer;
17+
18+
import java.util.Arrays;
19+
import java.util.HashSet;
20+
import java.util.Map;
21+
import java.util.Map.Entry;
22+
import java.util.Set;
23+
24+
import org.apache.kafka.clients.producer.KafkaProducer;
25+
import org.apache.kafka.common.Metric;
26+
import org.apache.kafka.common.MetricName;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import com.pinterest.singer.metrics.OpenTsdbMetricConverter;
31+
import com.pinterest.singer.thrift.configuration.KafkaProducerConfig;
32+
import com.pinterest.singer.utils.LogConfigUtils;
33+
34+
/**
35+
* Responsible for pulling metrics from {@link KafkaProducer} and copying them
36+
* to Ostrich so they can be accessed and forwarded. This helps provide
37+
* additional instrumentation on Singer and how it's performing.
38+
*/
39+
public class KafkaProducerMetricsMonitor implements Runnable {
40+
41+
private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerMetricsMonitor.class);
42+
public static final Set<String> PRODUCER_METRICS_WHITELIST = new HashSet<>(
43+
Arrays.asList("buffer-total-bytes", "buffer-available-bytes"));
44+
// sample every 60seconds
45+
private static final int SAMPLING_INTERVAL = 60_000;
46+
47+
@Override
48+
public void run() {
49+
while (true) {
50+
try {
51+
publishKafkaProducerMetricsToOstrich();
52+
} catch (Exception e) {
53+
LOG.warn("Error publishing KafkaProducer metrics", e);
54+
}
55+
try {
56+
Thread.sleep(SAMPLING_INTERVAL);
57+
} catch (InterruptedException e) {
58+
LOG.warn("KafkaProducerMetricsMonitor thread interrupted, exiting");
59+
break;
60+
}
61+
}
62+
}
63+
64+
@SuppressWarnings({ "deprecation" })
65+
protected void publishKafkaProducerMetricsToOstrich() {
66+
Map<KafkaProducerConfig, KafkaProducer<byte[], byte[]>> producers = KafkaProducerManager
67+
.getInstance().getProducers();
68+
for (Entry<KafkaProducerConfig, KafkaProducer<byte[], byte[]>> kafkaProducerEntry : producers
69+
.entrySet()) {
70+
KafkaProducerConfig key = kafkaProducerEntry.getKey();
71+
String signature = convertSignatureToTag(key);
72+
Map<MetricName, ? extends Metric> metrics = kafkaProducerEntry.getValue().metrics();
73+
for (Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
74+
if (PRODUCER_METRICS_WHITELIST.contains(entry.getKey().name())) {
75+
OpenTsdbMetricConverter.gauge("kafkaproducer." + entry.getKey().name(),
76+
entry.getValue().value(), "cluster=" + signature);
77+
}
78+
}
79+
}
80+
}
81+
82+
public static String convertSignatureToTag(KafkaProducerConfig key) {
83+
return key.getKafkaClusterSignature()
84+
.replaceAll("(" + LogConfigUtils.DEFAULT_SERVERSET_DIR + "|discovery|/|prod|\\.)", "");
85+
}
86+
87+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/**
2+
* Copyright 2020 Pinterest, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.pinterest.singer.writer;
17+
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertNotNull;
20+
21+
import java.util.Arrays;
22+
23+
import org.apache.kafka.clients.producer.KafkaProducer;
24+
import org.junit.Test;
25+
26+
import com.pinterest.singer.thrift.configuration.KafkaProducerConfig;
27+
import com.twitter.ostrich.stats.Stats;
28+
29+
public class TestKafkaProducerMetricsMonitor {
30+
31+
@Test
32+
public void testSignatureTagExtractionTLS() {
33+
KafkaProducerConfig config = new KafkaProducerConfig("/var/serverset/discovery/kafka_tls/prod",
34+
Arrays.asList(), "-1");
35+
assertEquals("kafka_tls", KafkaProducerMetricsMonitor.convertSignatureToTag(config));
36+
}
37+
38+
@Test
39+
public void testSignatureTagExtraction() {
40+
KafkaProducerConfig config = new KafkaProducerConfig("/var/serverset/discovery/kafka/prod",
41+
Arrays.asList(), "-1");
42+
assertEquals("kafka", KafkaProducerMetricsMonitor.convertSignatureToTag(config));
43+
}
44+
45+
@Test
46+
public void testPublishMetrics() {
47+
KafkaProducerConfig config = new KafkaProducerConfig("/var/serverset/discovery.kafka.prod",
48+
Arrays.asList("localhost:9092"), "-1");
49+
KafkaProducerManager.getInstance().getProducers().clear();
50+
KafkaProducer<byte[], byte[]> producer = KafkaProducerManager.getProducer(config);
51+
KafkaProducerMetricsMonitor monitor = new KafkaProducerMetricsMonitor();
52+
monitor.publishKafkaProducerMetricsToOstrich();
53+
producer.close();
54+
for (String metricName : KafkaProducerMetricsMonitor.PRODUCER_METRICS_WHITELIST) {
55+
Object gauge = Stats.getGauge("kafkaproducer." + metricName + " cluster=kafka").get();
56+
assertNotNull(gauge);
57+
}
58+
}
59+
60+
}

thrift-logger/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<parent>
55
<groupId>com.pinterest.singer</groupId>
66
<artifactId>singer-package</artifactId>
7-
<version>0.8.0.1</version>
7+
<version>0.8.0.2</version>
88
<relativePath>../pom.xml</relativePath>
99
</parent>
1010
<artifactId>thrift-logger</artifactId>

0 commit comments

Comments
 (0)