Skip to content

Commit 4d4cef2

Browse files
zzhhhzzyuyang08
authored andcommitted
Fix bug: When sending event to Kafka fails for the first time, audit event needs to be inserted at the beginning of the queue. (#37)
bump version 0.8.0.1
1 parent 555949f commit 4d4cef2

File tree

6 files changed

+23
-18
lines changed

6 files changed

+23
-18
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<modelVersion>4.0.0</modelVersion>
77
<groupId>com.pinterest.singer</groupId>
88
<artifactId>singer-package</artifactId>
9-
<version>0.8.0</version>
9+
<version>0.8.0.1</version>
1010
<packaging>pom</packaging>
1111
<description>Singer Logging Agent modules</description>
1212
<inceptionYear>2013</inceptionYear>

singer-commons/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<parent>
2121
<groupId>com.pinterest.singer</groupId>
2222
<artifactId>singer-package</artifactId>
23-
<version>0.8.0</version>
23+
<version>0.8.0.1</version>
2424
<relativePath>../pom.xml</relativePath>
2525
</parent>
2626
<developers>

singer-commons/src/main/java/com/pinterest/singer/loggingaudit/client/AuditEventKafkaSender.java

+18-13
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,7 @@ public void checkAndEnqueueWhenSendFailed(RecordMetadata recordMetadata, Excepti
288288
Integer count = eventTriedCount.get(event.getLoggingAuditHeaders());
289289
if (count == null){
290290
eventTriedCount.put(event.getLoggingAuditHeaders(), 1);
291+
insertEvent(event);
291292
OpenTsdbMetricConverter
292293
.gauge(LoggingAuditClientMetrics.AUDIT_CLIENT_SENDER_KAFKA_EVENTS_RETRIED,
293294
eventTriedCount.size(), "host=" + host, "stage=" + stage.toString(),
@@ -301,19 +302,23 @@ public void checkAndEnqueueWhenSendFailed(RecordMetadata recordMetadata, Excepti
301302
eventTriedCount.remove(event.getLoggingAuditHeaders());
302303
} else {
303304
eventTriedCount.put(event.getLoggingAuditHeaders(), count + 1);
304-
try {
305-
boolean success = queue.offerFirst(event, 3, TimeUnit.SECONDS);
306-
if (!success) {
307-
LOG.debug("Failed to enqueue LoggingAuditEvent at head of the queue when executing "
308-
+ "producer send callback. Drop this event.");
309-
eventTriedCount.remove(event.getLoggingAuditHeaders());
310-
}
311-
} catch (InterruptedException ex) {
312-
LOG.debug(
313-
"Enqueuing LoggingAuditEvent at head of the queue was interrupted in callback. "
314-
+ "Drop this event");
315-
eventTriedCount.remove(event.getLoggingAuditHeaders());
316-
}
305+
insertEvent(event);
306+
}
307+
}
308+
309+
public void insertEvent(LoggingAuditEvent event){
310+
try {
311+
boolean success = queue.offerFirst(event, 3, TimeUnit.SECONDS);
312+
if (!success) {
313+
LOG.debug("Failed to enqueue LoggingAuditEvent at head of the queue when executing "
314+
+ "producer send callback. Drop this event.");
315+
eventTriedCount.remove(event.getLoggingAuditHeaders());
316+
}
317+
} catch (InterruptedException ex) {
318+
LOG.debug(
319+
"Enqueuing LoggingAuditEvent at head of the queue was interrupted in callback. "
320+
+ "Drop this event");
321+
eventTriedCount.remove(event.getLoggingAuditHeaders());
317322
}
318323
}
319324

singer/deb.version

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.8.0
1+
0.8.0.1

singer/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>com.pinterest.singer</groupId>
99
<artifactId>singer-package</artifactId>
10-
<version>0.8.0</version>
10+
<version>0.8.0.1</version>
1111
<relativePath>../pom.xml</relativePath>
1212
</parent>
1313
<licenses>

thrift-logger/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<parent>
55
<groupId>com.pinterest.singer</groupId>
66
<artifactId>singer-package</artifactId>
7-
<version>0.8.0</version>
7+
<version>0.8.0.1</version>
88
<relativePath>../pom.xml</relativePath>
99
</parent>
1010
<artifactId>thrift-logger</artifactId>

0 commit comments

Comments
 (0)