Skip to content

Commit d6585a7

Browse files
authored
Merge pull request #74 from zzhhhzz/inject
add condition check before injecting ProducerRecord's Headers
2 parents 637936d + 097ff9d commit d6585a7

File tree

6 files changed

+14
-8
lines changed

6 files changed

+14
-8
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<modelVersion>4.0.0</modelVersion>
77
<groupId>com.pinterest.singer</groupId>
88
<artifactId>singer-package</artifactId>
9-
<version>0.8.0.23</version>
9+
<version>0.8.0.24</version>
1010
<packaging>pom</packaging>
1111
<description>Singer Logging Agent modules</description>
1212
<inceptionYear>2013</inceptionYear>

singer-commons/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<parent>
2121
<groupId>com.pinterest.singer</groupId>
2222
<artifactId>singer-package</artifactId>
23-
<version>0.8.0.23</version>
23+
<version>0.8.0.24</version>
2424
<relativePath>../pom.xml</relativePath>
2525
</parent>
2626
<developers>

singer/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>com.pinterest.singer</groupId>
99
<artifactId>singer-package</artifactId>
10-
<version>0.8.0.23</version>
10+
<version>0.8.0.24</version>
1111
<relativePath>../pom.xml</relativePath>
1212
</parent>
1313
<licenses>

singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java

+1
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ public class SingerMetrics {
104104
public static final String NUMBER_OF_MISSING_DIRS = "singer.missing_dir_checker.num_of_missing_dirs";
105105
public static final String NUMBER_OF_SERIALIZING_HEADERS_ERRORS = "singer.headers_injector.num_of_serializing_headers_errors";
106106
public static final String AUDIT_HEADERS_INJECTED = "singer.audit.num_of_headers_injected";
107+
public static final String CHECKSUM_INJECTED = "singer.audit.num_of_checksum_injected";
107108
public static final String AUDIT_HEADERS_METADATA_COUNT_MISMATCH = "singer.audit.headers_metadata_count_mismatch";
108109
public static final String AUDIT_HEADERS_METADATA_COUNT_MATCH = "singer.audit.headers_metadata_count_match";
109110
public static final String NUM_CORRUPTED_MESSAGES = "singer.audit.num_corrupted_messages";

singer/src/main/java/com/pinterest/singer/writer/KafkaWriter.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -381,10 +381,15 @@ public boolean checkMessageValidAndInjectHeaders(
381381

382382
protected void injectHeadersForProducerRecord(LogMessage msg, Headers headers) {
383383
try {
384-
byte[] serializedAuditHeaders = SERIALIZER.get().serialize(msg.getLoggingAuditHeaders());
385-
this.headersInjector.addHeaders(headers, LOGGING_AUDIT_HEADER_KEY, serializedAuditHeaders);
386-
this.headersInjector.addHeaders(headers, CRC_HEADER_KEY, Longs.toByteArray(msg.getChecksum()));
387-
OpenTsdbMetricConverter.incr(SingerMetrics.AUDIT_HEADERS_INJECTED, "topic=" + topic, "host=" + HOSTNAME, "logName=" + msg.getLoggingAuditHeaders().getLogName(), "logStreamName=" + logName);
384+
if (msg.isSetLoggingAuditHeaders()) {
385+
byte[] serializedAuditHeaders = SERIALIZER.get().serialize(msg.getLoggingAuditHeaders());
386+
this.headersInjector.addHeaders(headers, LOGGING_AUDIT_HEADER_KEY, serializedAuditHeaders);
387+
OpenTsdbMetricConverter.incr(SingerMetrics.AUDIT_HEADERS_INJECTED, "host=" + HOSTNAME, "logStreamName=" + logName);
388+
}
389+
if (msg.isSetChecksum()) {
390+
this.headersInjector.addHeaders(headers, CRC_HEADER_KEY, Longs.toByteArray(msg.getChecksum()));
391+
OpenTsdbMetricConverter.incr(SingerMetrics.CHECKSUM_INJECTED, "host=" + HOSTNAME, "logStreamName=" + logName);
392+
}
388393
} catch (TException e) {
389394
OpenTsdbMetricConverter.incr(SingerMetrics.NUMBER_OF_SERIALIZING_HEADERS_ERRORS);
390395
LOG.warn("Exception thrown while serializing headers", e);

thrift-logger/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<parent>
55
<groupId>com.pinterest.singer</groupId>
66
<artifactId>singer-package</artifactId>
7-
<version>0.8.0.23</version>
7+
<version>0.8.0.24</version>
88
<relativePath>../pom.xml</relativePath>
99
</parent>
1010
<artifactId>thrift-logger</artifactId>

0 commit comments

Comments
 (0)