Skip to content

Commit

Permalink
Merge pull request #6 from DataDog/yann/test-thread-safety
Browse files Browse the repository at this point in the history
[threadstats]fix threadstats thread safety + tests
  • Loading branch information
yannmh committed Mar 31, 2015
2 parents ec6b900 + ebfbd5b commit 8a522d7
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 12 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# CHANGELOG

0.2.0 / Unreleased
0.2.0 / 2015.03.31
==================

- Changelog update, [#9][] [@miketheman][]
- Fixes `threadstats` unsafe thread operations, [#6][]
- Add tests to check `statsd` and `threadstats` thread safety [#6][]

0.1.2 / 2015.03.23
==================
Expand All @@ -16,6 +18,7 @@
- First release

<!--- The following link definition list is generated by PimpMyChangelog --->
[#6]: https://github.com/DataDog/datadogpy/issues/6
[#7]: https://github.com/DataDog/datadogpy/issues/7
[#9]: https://github.com/DataDog/datadogpy/issues/9
[@miketheman]: https://github.com/miketheman
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ api.Event.create(title=title, text=text, tags=tags)
from datadog import statsd

statsd.increment('whatever')
statsd.gauge('foo', 2)
statsd.gauge('foo', 42)

# Or ThreadStats, an alternative tool to collect and flush metrics, using Datadog REST API
from datadog import ThreadStats
Expand All @@ -57,3 +57,7 @@ stats.start()
stats.increment('home.page.hits')

```

Threadsafety
------------
`DogStatsd` and `ThreadStats` are threadsafe.
22 changes: 14 additions & 8 deletions datadog/threadstats/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
"""
from collections import defaultdict
import random
import itertools

from datadog.util.compat import iternext


class Metric(object):
Expand Down Expand Up @@ -47,13 +50,14 @@ def __init__(self, name, tags, host):
self.name = name
self.tags = tags
self.host = host
self.count = 0
self.count = []

def add_point(self, value):
self.count += value
self.count.append(value)

def flush(self, timestamp):
return [(timestamp, self.count, self.name, self.tags, self.host)]
count = sum(self.count, 0)
return [(timestamp, count, self.name, self.tags, self.host)]


class Histogram(Metric):
Expand All @@ -67,21 +71,22 @@ def __init__(self, name, tags, host):
self.host = host
self.max = float("-inf")
self.min = float("inf")
self.sum = 0
self.count = 0
self.sum = []
self.iter_counter = itertools.count()
self.count = iternext(self.iter_counter)
self.sample_size = 1000
self.samples = []
self.percentiles = [0.75, 0.85, 0.95, 0.99]

def add_point(self, value):
self.max = self.max if self.max > value else value
self.min = self.min if self.min < value else value
self.sum += value
self.sum.append(value)
if self.count < self.sample_size:
self.samples.append(value)
else:
self.samples[random.randrange(0, self.sample_size)] = value
self.count += 1
self.count = iternext(self.iter_counter)

def flush(self, timestamp):
if not self.count:
Expand All @@ -101,7 +106,8 @@ def flush(self, timestamp):
return metrics

def average(self):
return float(self.sum) / self.count
sum_metrics = sum(self.sum, 0)
return float(sum_metrics) / self.count


class Timing(Histogram):
Expand Down
9 changes: 8 additions & 1 deletion datadog/util/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ def is_p3k():

def iteritems(d):
return iter(d.items())

def iternext(iter):
return next(iter)

else:
get_input = raw_input
import ConfigParser as configparser
Expand All @@ -29,6 +33,10 @@ def iteritems(d):
def iteritems(d):
return d.iteritems()

def iternext(iter):
return iter.next()


try:
from UserDict import IterableUserDict
except ImportError:
Expand All @@ -49,4 +57,3 @@ def iteritems(d):
import simplejson as json
except ImportError:
import json

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

setup(
name="datadog",
version="0.1.2",
version="0.2.0",
install_requires=install_reqs,
tests_require=["tox"],
packages=[
Expand Down
59 changes: 59 additions & 0 deletions tests/performance/test_statsd_thread_safety.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import time
import six
import threading
from collections import deque
from nose import tools as t

from datadog.dogstatsd.base import DogStatsd


class FakeSocket(object):
""" A fake socket for testing. """

def __init__(self):
self.payloads = deque()

def send(self, payload):
assert type(payload) == six.binary_type
self.payloads.append(payload)

def recv(self):
try:
return self.payloads
except IndexError:
return None

def __repr__(self):
return str(self.payloads)


class DogstatsdTest(DogStatsd):
def send_metrics(self):
self.increment('whatever')


class TestDogStatsdThreadSafety(object):

def setUp(self):
self.socket = FakeSocket()

def recv(self):
return self.socket.recv()

def test_send_metrics(self):
statsd = DogstatsdTest()
statsd.socket = self.socket
for _ in range(10000):
threading.Thread(target=statsd.send_metrics).start()
time.sleep(1)
t.assert_equal(10000, len(self.recv()), len(self.recv()))

def test_send_batch_metrics(self):
with DogstatsdTest() as batch:
batch.socket = self.socket
for _ in range(10000):
threading.Thread(target=batch.send_metrics).start()
time.sleep(1)
payload = map(lambda x: x.split("\n"), self.recv())
payload = reduce(lambda prev, ele: prev + ele, payload, [])
t.assert_equal(10001, len(payload), len(payload))
90 changes: 90 additions & 0 deletions tests/performance/test_threadstats_thread_safety.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import re
import time
import threading
from nose import tools as t

from datadog import ThreadStats


class MemoryReporter(object):
""" A reporting class that reports to memory for testing. """

def __init__(self):
self.metrics = []
self.events = []

def flush_metrics(self, metrics):
self.metrics += metrics

def flush_events(self, events):
self.events += events


class ThreadStatsTest(ThreadStats):
def send_metrics_and_event(self, id):
# Counter
self.increment("counter", timestamp=12345)
time.sleep(0.001) # sleep makes the os continue another thread

# Gauge
self.gauge("gauge_" + str(id), 42)
time.sleep(0.001) # sleep makes the os continue another thread

# Histogram
self.histogram("histogram", id, timestamp=12345)
time.sleep(0.001) # sleep makes the os continue another thread

# Event
self.event("title", "content")


class TestThreadStatsThreadSafety(object):

def test_threadstats_thread_safety(self):
stats = ThreadStatsTest()
stats.start(roll_up_interval=10, flush_in_thread=False)
reporter = stats.reporter = MemoryReporter()

for i in range(10000):
threading.Thread(target=stats.send_metrics_and_event, args=[i]).start()
# Wait all threads to finish
time.sleep(10)

# Flush and check
stats.flush()
metrics = reporter.metrics
events = reporter.events

# Overview
t.assert_equal(len(metrics), 10009, len(metrics))

# Sort metrics
counter_metrics = []
gauge_metrics = []
histogram_metrics = []

for m in metrics:
if re.match("gauge_.*", m['metric']):
gauge_metrics.append(m)
elif re.match("histogram.*", m['metric']):
histogram_metrics.append(m)
else:
counter_metrics.append(m)

# Counter
t.assert_equal(len(counter_metrics), 1, len(counter_metrics))
counter = counter_metrics[0]
t.assert_equal(counter['points'][0][1], 10000, counter['points'][0][1])

# Gauge
t.assert_equal(len(gauge_metrics), 10000, len(gauge_metrics))

# Histogram
t.assert_equal(len(histogram_metrics), 8, len(histogram_metrics))
count_histogram = filter(lambda x: x['metric'] == "histogram.count", histogram_metrics)[0]
t.assert_equal(count_histogram['points'][0][1], 10000, count_histogram['points'][0][1])
sum_histogram = filter(lambda x: x['metric'] == "histogram.avg", histogram_metrics)[0]
t.assert_equal(sum_histogram['points'][0][1], 4999.5, sum_histogram['points'][0][1])

# Events
t.assert_equal(10000, len(events), len(events))

0 comments on commit 8a522d7

Please sign in to comment.