diff --git a/CHANGELOG.md b/CHANGELOG.md index 6cbdb9930..78864eb1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,11 @@ # CHANGELOG -0.2.0 / Unreleased +0.2.0 / 2015.03.31 ================== - Changelog update, [#9][] [@miketheman][] +- Fixes `threadstats` unsafe thread operations, [#6][] +- Add tests to check `statsd` and `threadstats` thread safety [#6][] 0.1.2 / 2015.03.23 ================== @@ -16,6 +18,7 @@ - First release +[#6]: https://github.com/DataDog/datadogpy/issues/6 [#7]: https://github.com/DataDog/datadogpy/issues/7 [#9]: https://github.com/DataDog/datadogpy/issues/9 [@miketheman]: https://github.com/miketheman diff --git a/README.md b/README.md index ab616fe4c..4023ebf02 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ api.Event.create(title=title, text=text, tags=tags) from datadog import statsd statsd.increment('whatever') -statsd.gauge('foo', 2) +statsd.gauge('foo', 42) # Or ThreadStats, an alternative tool to collect and flush metrics, using Datadog REST API from datadog import ThreadStats @@ -57,3 +57,7 @@ stats.start() stats.increment('home.page.hits') ``` + +Threadsafety +------------ +`DogStatsd` and `ThreadStats` are threadsafe. diff --git a/datadog/threadstats/metrics.py b/datadog/threadstats/metrics.py index dbf5c40ba..330739ab2 100644 --- a/datadog/threadstats/metrics.py +++ b/datadog/threadstats/metrics.py @@ -3,6 +3,9 @@ """ from collections import defaultdict import random +import itertools + +from datadog.util.compat import iternext class Metric(object): @@ -47,13 +50,14 @@ def __init__(self, name, tags, host): self.name = name self.tags = tags self.host = host - self.count = 0 + self.count = [] def add_point(self, value): - self.count += value + self.count.append(value) def flush(self, timestamp): - return [(timestamp, self.count, self.name, self.tags, self.host)] + count = sum(self.count, 0) + return [(timestamp, count, self.name, self.tags, self.host)] class Histogram(Metric): @@ -67,8 +71,9 @@ def __init__(self, name, tags, host): self.host = host self.max = float("-inf") self.min = float("inf") - self.sum = 0 - self.count = 0 + self.sum = [] + self.iter_counter = itertools.count() + self.count = iternext(self.iter_counter) self.sample_size = 1000 self.samples = [] self.percentiles = [0.75, 0.85, 0.95, 0.99] @@ -76,12 +81,12 @@ def __init__(self, name, tags, host): def add_point(self, value): self.max = self.max if self.max > value else value self.min = self.min if self.min < value else value - self.sum += value + self.sum.append(value) if self.count < self.sample_size: self.samples.append(value) else: self.samples[random.randrange(0, self.sample_size)] = value - self.count += 1 + self.count = iternext(self.iter_counter) def flush(self, timestamp): if not self.count: @@ -101,7 +106,8 @@ def flush(self, timestamp): return metrics def average(self): - return float(self.sum) / self.count + sum_metrics = sum(self.sum, 0) + return float(sum_metrics) / self.count class Timing(Histogram): diff --git a/datadog/util/compat.py b/datadog/util/compat.py index fa617dd65..7aac77d90 100644 --- a/datadog/util/compat.py +++ b/datadog/util/compat.py @@ -20,6 +20,10 @@ def is_p3k(): def iteritems(d): return iter(d.items()) + + def iternext(iter): + return next(iter) + else: get_input = raw_input import ConfigParser as configparser @@ -29,6 +33,10 @@ def iteritems(d): def iteritems(d): return d.iteritems() + def iternext(iter): + return iter.next() + + try: from UserDict import IterableUserDict except ImportError: @@ -49,4 +57,3 @@ def iteritems(d): import simplejson as json except ImportError: import json - diff --git a/setup.py b/setup.py index 374f65cc9..d54b31e6d 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ setup( name="datadog", - version="0.1.2", + version="0.2.0", install_requires=install_reqs, tests_require=["tox"], packages=[ diff --git a/tests/performance/test_statsd_thread_safety.py b/tests/performance/test_statsd_thread_safety.py new file mode 100644 index 000000000..6986698e0 --- /dev/null +++ b/tests/performance/test_statsd_thread_safety.py @@ -0,0 +1,59 @@ +import time +import six +import threading +from collections import deque +from nose import tools as t + +from datadog.dogstatsd.base import DogStatsd + + +class FakeSocket(object): + """ A fake socket for testing. """ + + def __init__(self): + self.payloads = deque() + + def send(self, payload): + assert type(payload) == six.binary_type + self.payloads.append(payload) + + def recv(self): + try: + return self.payloads + except IndexError: + return None + + def __repr__(self): + return str(self.payloads) + + +class DogstatsdTest(DogStatsd): + def send_metrics(self): + self.increment('whatever') + + +class TestDogStatsdThreadSafety(object): + + def setUp(self): + self.socket = FakeSocket() + + def recv(self): + return self.socket.recv() + + def test_send_metrics(self): + statsd = DogstatsdTest() + statsd.socket = self.socket + for _ in range(10000): + threading.Thread(target=statsd.send_metrics).start() + time.sleep(1) + t.assert_equal(10000, len(self.recv()), len(self.recv())) + + def test_send_batch_metrics(self): + with DogstatsdTest() as batch: + batch.socket = self.socket + for _ in range(10000): + threading.Thread(target=batch.send_metrics).start() + time.sleep(1) + payload = map(lambda x: x.split("\n"), self.recv()) + payload = reduce(lambda prev, ele: prev + ele, payload, []) + t.assert_equal(10001, len(payload), len(payload)) diff --git a/tests/performance/test_threadstats_thread_safety.py b/tests/performance/test_threadstats_thread_safety.py new file mode 100644 index 000000000..6d1e0d0c7 --- /dev/null +++ b/tests/performance/test_threadstats_thread_safety.py @@ -0,0 +1,90 @@ +import re +import time +import threading +from nose import tools as t + +from datadog import ThreadStats + + +class MemoryReporter(object): + """ A reporting class that reports to memory for testing. """ + + def __init__(self): + self.metrics = [] + self.events = [] + + def flush_metrics(self, metrics): + self.metrics += metrics + + def flush_events(self, events): + self.events += events + + +class ThreadStatsTest(ThreadStats): + def send_metrics_and_event(self, id): + # Counter + self.increment("counter", timestamp=12345) + time.sleep(0.001) # sleep makes the os continue another thread + + # Gauge + self.gauge("gauge_" + str(id), 42) + time.sleep(0.001) # sleep makes the os continue another thread + + # Histogram + self.histogram("histogram", id, timestamp=12345) + time.sleep(0.001) # sleep makes the os continue another thread + + # Event + self.event("title", "content") + + +class TestThreadStatsThreadSafety(object): + + def test_threadstats_thread_safety(self): + stats = ThreadStatsTest() + stats.start(roll_up_interval=10, flush_in_thread=False) + reporter = stats.reporter = MemoryReporter() + + for i in range(10000): + threading.Thread(target=stats.send_metrics_and_event, args=[i]).start() + # Wait all threads to finish + time.sleep(10) + + # Flush and check + stats.flush() + metrics = reporter.metrics + events = reporter.events + + # Overview + t.assert_equal(len(metrics), 10009, len(metrics)) + + # Sort metrics + counter_metrics = [] + gauge_metrics = [] + histogram_metrics = [] + + for m in metrics: + if re.match("gauge_.*", m['metric']): + gauge_metrics.append(m) + elif re.match("histogram.*", m['metric']): + histogram_metrics.append(m) + else: + counter_metrics.append(m) + + # Counter + t.assert_equal(len(counter_metrics), 1, len(counter_metrics)) + counter = counter_metrics[0] + t.assert_equal(counter['points'][0][1], 10000, counter['points'][0][1]) + + # Gauge + t.assert_equal(len(gauge_metrics), 10000, len(gauge_metrics)) + + # Histogram + t.assert_equal(len(histogram_metrics), 8, len(histogram_metrics)) + count_histogram = filter(lambda x: x['metric'] == "histogram.count", histogram_metrics)[0] + t.assert_equal(count_histogram['points'][0][1], 10000, count_histogram['points'][0][1]) + sum_histogram = filter(lambda x: x['metric'] == "histogram.avg", histogram_metrics)[0] + t.assert_equal(sum_histogram['points'][0][1], 4999.5, sum_histogram['points'][0][1]) + + # Events + t.assert_equal(10000, len(events), len(events))