Skip to content
This repository has been archived by the owner on Nov 25, 2020. It is now read-only.

Split udp packets #15

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
target
*.iml
.idea

# Eclipse files
/.classpath
/.project
/.settings
152 changes: 91 additions & 61 deletions src/main/java/com/bealetech/metrics/reporting/StatsdReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@
*/
package com.bealetech.metrics.reporting;

import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.*;
import com.yammer.metrics.reporting.AbstractPollingReporter;
import com.yammer.metrics.stats.Snapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
Expand All @@ -31,24 +28,49 @@
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Clock;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.Metered;
import com.yammer.metrics.core.Metric;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricPredicate;
import com.yammer.metrics.core.MetricProcessor;
import com.yammer.metrics.core.MetricsRegistry;
import com.yammer.metrics.core.Sampling;
import com.yammer.metrics.core.Summarizable;
import com.yammer.metrics.core.Timer;
import com.yammer.metrics.core.VirtualMachineMetrics;
import com.yammer.metrics.reporting.AbstractPollingReporter;
import com.yammer.metrics.stats.Snapshot;

public class StatsdReporter extends AbstractPollingReporter implements MetricProcessor<Long> {

public static enum StatType { COUNTER, TIMER, GAUGE }

private static final Logger LOG = LoggerFactory.getLogger(StatsdReporter.class);

protected final String prefix;
protected final MetricPredicate predicate;
protected final Locale locale = Locale.US;
protected final Clock clock;
protected final UDPSocketProvider socketProvider;
protected final VirtualMachineMetrics vm;
protected Writer writer;
protected ByteArrayOutputStream outputData;

private static final int DEFAULT_MAX_UDP_PACKET_SIZE = 1500;

private final String prefix;
private final MetricPredicate predicate;
private final Locale locale = Locale.US;
private final Clock clock;
private final UDPSocketProvider socketProvider;
private final VirtualMachineMetrics vm;
private ByteArrayOutputStream outputData;
private int udpPacketMaxSize;

private boolean prependNewline = false;
private boolean printVMMetrics = true;

private DatagramSocket socket;

public interface UDPSocketProvider {
DatagramSocket get() throws Exception;
DatagramPacket newPacket(ByteArrayOutputStream out);
Expand Down Expand Up @@ -79,14 +101,19 @@ public StatsdReporter(MetricsRegistry metricsRegistry, String prefix, MetricPred
}

public StatsdReporter(MetricsRegistry metricsRegistry, String prefix, MetricPredicate predicate, UDPSocketProvider socketProvider, Clock clock, VirtualMachineMetrics vm) throws IOException {
this(metricsRegistry, prefix, predicate, socketProvider, clock, vm, "graphite-reporter");
this(metricsRegistry, prefix, predicate, socketProvider, clock, vm, "statsd-reporter");
}

public StatsdReporter(MetricsRegistry metricsRegistry, String prefix, MetricPredicate predicate, UDPSocketProvider socketProvider, Clock clock, VirtualMachineMetrics vm, String name) throws IOException {
this(metricsRegistry, prefix, predicate, socketProvider, clock, vm, name, DEFAULT_MAX_UDP_PACKET_SIZE);
}

public StatsdReporter(MetricsRegistry metricsRegistry, String prefix, MetricPredicate predicate, UDPSocketProvider socketProvider, Clock clock, VirtualMachineMetrics vm, String name, int udpPacketMaxSize) throws IOException {
super(metricsRegistry, name);

this.socketProvider = socketProvider;
this.vm = vm;
this.udpPacketMaxSize = udpPacketMaxSize;

this.clock = clock;

Expand All @@ -110,46 +137,42 @@ public void setPrintVMMetrics(boolean printVMMetrics) {

@Override
public void run() {
DatagramSocket socket = null;
try {
socket = this.socketProvider.get();
outputData.reset();
prependNewline = false;
writer = new BufferedWriter(new OutputStreamWriter(this.outputData));

final long epoch = clock.time() / 1000;
if (this.printVMMetrics) {
printVmMetrics(epoch);
}
printRegularMetrics(epoch);

// Send UDP data
writer.flush();
DatagramPacket packet = this.socketProvider.newPacket(outputData);
packet.setData(outputData.toByteArray());
socket.send(packet);
sendUDPData();
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Error writing to Graphite", e);
LOG.debug("Error writing to Statsd", e);
} else {
LOG.warn("Error writing to Graphite: {}", e.getMessage());
}
if (writer != null) {
try {
writer.flush();
} catch (IOException e1) {
LOG.error("Error while flushing writer:", e1);
}
LOG.warn("Error writing to Statsd: {}", e.getMessage());
}
} finally {
if (socket != null) {
socket.close();
}
writer = null;
}
socket = null;
}

protected void printVmMetrics(long epoch) {
private void sendUDPData() throws IOException {
// Send UDP data
outputData.flush();
DatagramPacket packet = this.socketProvider.newPacket(outputData);
packet.setData(outputData.toByteArray());
socket.send(packet);

outputData.reset();
prependNewline = false;
}

private void printVmMetrics(long epoch) {
// Memory
sendFloat("jvm.memory.totalInit", StatType.GAUGE, vm.totalInit());
sendFloat("jvm.memory.totalUsed", StatType.GAUGE, vm.totalUsed());
Expand Down Expand Up @@ -196,7 +219,7 @@ protected void printVmMetrics(long epoch) {
}
}

protected void printRegularMetrics(long epoch) {
private void printRegularMetrics(long epoch) {
for (Map.Entry<String,SortedMap<MetricName,Metric>> entry : getMetricsRegistry().groupedMetrics(predicate).entrySet()) {
for (Map.Entry<MetricName, Metric> subEntry : entry.getValue().entrySet()) {
final Metric metric = subEntry.getValue();
Expand Down Expand Up @@ -246,14 +269,14 @@ public void processGauge(MetricName name, Gauge<?> gauge, Long epoch) throws Exc
sendObj(sanitizeName(name) + ".count", StatType.GAUGE, gauge.value());
}

protected void sendSummarizable(String sanitizedName, Summarizable metric) throws IOException {
private void sendSummarizable(String sanitizedName, Summarizable metric) throws IOException {
sendFloat(sanitizedName + ".min", StatType.TIMER, metric.min());
sendFloat(sanitizedName + ".max", StatType.TIMER, metric.max());
sendFloat(sanitizedName + ".mean", StatType.TIMER, metric.mean());
sendFloat(sanitizedName + ".stddev", StatType.TIMER, metric.stdDev());
}

protected void sendSampling(String sanitizedName, Sampling metric) throws IOException {
private void sendSampling(String sanitizedName, Sampling metric) throws IOException {
final Snapshot snapshot = metric.getSnapshot();
sendFloat(sanitizedName + ".median", StatType.TIMER, snapshot.getMedian());
sendFloat(sanitizedName + ".75percentile", StatType.TIMER, snapshot.get75thPercentile());
Expand All @@ -264,19 +287,19 @@ protected void sendSampling(String sanitizedName, Sampling metric) throws IOExce
}


protected void sendInt(String name, StatType statType, long value) {
private void sendInt(String name, StatType statType, long value) {
sendData(name, String.format(locale, "%d", value), statType);
}

protected void sendFloat(String name, StatType statType, double value) {
private void sendFloat(String name, StatType statType, double value) {
sendData(name, String.format(locale, "%2.2f", value), statType);
}

protected void sendObj(String name, StatType statType, Object value) {
private void sendObj(String name, StatType statType, Object value) {
sendData(name, String.format(locale, "%s", value), statType);
}

protected String sanitizeName(MetricName name) {
private String sanitizeName(MetricName name) {
final StringBuilder sb = new StringBuilder()
.append(name.getGroup())
.append('.')
Expand All @@ -289,11 +312,11 @@ protected String sanitizeName(MetricName name) {
return sb.append(name.getName()).toString();
}

protected String sanitizeString(String s) {
private static String sanitizeString(String s) {
return s.replace(' ', '-');
}

protected void sendData(String name, String value, StatType statType) {
private void sendData(String name, String value, StatType statType) {
String statTypeStr = "";
switch (statType) {
case COUNTER:
Expand All @@ -306,23 +329,30 @@ protected void sendData(String name, String value, StatType statType) {
statTypeStr = "ms";
break;
}


final StringBuffer toWrite = new StringBuffer();
if (prependNewline) {
toWrite.append("\n");
}
if (!prefix.isEmpty()) {
toWrite.append(prefix);
}
toWrite.append(sanitizeString(name));
toWrite.append(":");
toWrite.append(value);
toWrite.append("|");
toWrite.append(statTypeStr);
prependNewline = true;

final byte[] bytesToWrite = toWrite.toString().getBytes();

try {
if (prependNewline) {
writer.write("\n");
}
if (!prefix.isEmpty()) {
writer.write(prefix);
if (outputData.size() + bytesToWrite.length > this.udpPacketMaxSize) {
sendUDPData();
}
writer.write(sanitizeString(name));
writer.write(":");
writer.write(value);
writer.write("|");
writer.write(statTypeStr);
prependNewline = true;
writer.flush();
outputData.write(bytesToWrite);
} catch (IOException e) {
LOG.error("Error sending to Graphite:", e);
LOG.error("Error sending to Statsd:", e);
}
}

Expand Down
Loading