diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index 28a62d896..da9ece563 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -50,7 +50,7 @@ DEFAULT_PORT = 8125 # Buffering-related values (in seconds) -DEFAULT_FLUSH_INTERVAL = 0.3 +DEFAULT_BUFFERING_FLUSH_INTERVAL = 0.3 MIN_FLUSH_INTERVAL = 0.0001 # Env var to enable/disable sending the container ID field @@ -145,7 +145,7 @@ def __init__( host=DEFAULT_HOST, # type: Text port=DEFAULT_PORT, # type: int max_buffer_size=None, # type: None - flush_interval=DEFAULT_FLUSH_INTERVAL, # type: float + flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL, # type: float disable_aggregation=True, # type: bool disable_buffering=True, # type: bool namespace=None, # type: Optional[Text] @@ -643,7 +643,7 @@ def disable_aggregation(self): self._stop_flush_thread() log.debug("Statsd aggregation is disabled") - def enable_aggregation(self, flush_interval=DEFAULT_FLUSH_INTERVAL): + def enable_aggregation(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL): with self._config_lock: if not self._disable_aggregation: return @@ -805,6 +805,9 @@ def _reset_buffer(self): self._current_buffer_total_size = 0 self._buffer = [] + def flush(self): + self.flush_buffered_metrics() + def flush_buffered_metrics(self): """ Flush the metrics buffer by sending the data to the server. diff --git a/tests/unit/dogstatsd/test_statsd.py b/tests/unit/dogstatsd/test_statsd.py index 02edcdf4c..b9a24cfc1 100644 --- a/tests/unit/dogstatsd/test_statsd.py +++ b/tests/unit/dogstatsd/test_statsd.py @@ -29,7 +29,7 @@ # Datadog libraries from datadog import initialize, statsd from datadog import __version__ as version -from datadog.dogstatsd.base import DEFAULT_FLUSH_INTERVAL, DogStatsd, MIN_SEND_BUFFER_SIZE, UDP_OPTIMAL_PAYLOAD_LENGTH, UDS_OPTIMAL_PAYLOAD_LENGTH +from datadog.dogstatsd.base import DEFAULT_BUFFERING_FLUSH_INTERVAL, DogStatsd, MIN_SEND_BUFFER_SIZE, UDP_OPTIMAL_PAYLOAD_LENGTH, UDS_OPTIMAL_PAYLOAD_LENGTH from datadog.dogstatsd.context import TimedContextManagerDecorator from datadog.util.compat import is_higher_py35, is_p3k from tests.util.contextmanagers import preserve_environment_variable, EnvVars @@ -41,7 +41,7 @@ class FakeSocket(object): FLUSH_GRACE_PERIOD = 0.2 - def __init__(self, flush_interval=DEFAULT_FLUSH_INTERVAL): + def __init__(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL): self.payloads = deque() self._flush_interval = flush_interval @@ -331,42 +331,42 @@ def test_gauge_with_invalid_ts_should_be_ignored(self): def test_counter(self): self.statsd.increment('page.views') - self.statsd.flush_buffered_metrics() + self.statsd.flush() self.assert_equal_telemetry('page.views:1|c\n', self.recv(2)) self.statsd._reset_telemetry() self.statsd.increment('page.views', 11) - self.statsd.flush_buffered_metrics() + self.statsd.flush() self.assert_equal_telemetry('page.views:11|c\n', self.recv(2)) self.statsd._reset_telemetry() self.statsd.decrement('page.views') - self.statsd.flush_buffered_metrics() + self.statsd.flush() self.assert_equal_telemetry('page.views:-1|c\n', self.recv(2)) self.statsd._reset_telemetry() self.statsd.decrement('page.views', 12) - self.statsd.flush_buffered_metrics() + self.statsd.flush() self.assert_equal_telemetry('page.views:-12|c\n', self.recv(2)) def test_count(self): self.statsd.count('page.views', 11) - self.statsd.flush_buffered_metrics() + self.statsd.flush() self.assert_equal_telemetry('page.views:11|c\n', self.recv(2)) def test_count_with_ts(self): self.statsd.count_with_timestamp("page.views", 1, timestamp=1066) - self.statsd.flush_buffered_metrics() + self.statsd.flush() self.assert_equal_telemetry("page.views:1|c|T1066\n", self.recv(2)) self.statsd._reset_telemetry() self.statsd.count_with_timestamp("page.views", 11, timestamp=2121) - self.statsd.flush_buffered_metrics() + self.statsd.flush() self.assert_equal_telemetry("page.views:11|c|T2121\n", self.recv(2)) def test_count_with_invalid_ts_should_be_ignored(self): self.statsd.count_with_timestamp("page.views", 1, timestamp=-1066) - self.statsd.flush_buffered_metrics() + self.statsd.flush() self.assert_equal_telemetry("page.views:1|c\n", self.recv(2)) def test_histogram(self): @@ -399,7 +399,7 @@ def test_sample_rate(self): for _ in range(10000): self.statsd.increment('sampled_counter', sample_rate=0.3) - self.statsd.flush_buffered_metrics() + self.statsd.flush() total_metrics = 0 payload = self.recv() @@ -667,7 +667,7 @@ def test_socket_error(self): self.statsd.socket = BrokenSocket() with mock.patch("datadog.dogstatsd.base.log") as mock_log: self.statsd.gauge('no error', 1) - self.statsd.flush_buffered_metrics() + self.statsd.flush() mock_log.error.assert_not_called() mock_log.warning.assert_called_once_with( @@ -679,7 +679,7 @@ def test_socket_overflown(self): self.statsd.socket = OverflownSocket() with mock.patch("datadog.dogstatsd.base.log") as mock_log: self.statsd.gauge('no error', 1) - self.statsd.flush_buffered_metrics() + self.statsd.flush() mock_log.error.assert_not_called() calls = [call("Socket send would block: %s, dropping the packet", mock.ANY)] @@ -689,7 +689,7 @@ def test_socket_message_too_long(self): self.statsd.socket = BrokenSocket(error_number=errno.EMSGSIZE) with mock.patch("datadog.dogstatsd.base.log") as mock_log: self.statsd.gauge('no error', 1) - self.statsd.flush_buffered_metrics() + self.statsd.flush() mock_log.error.assert_not_called() calls = [ @@ -705,7 +705,7 @@ def test_socket_no_buffer_space(self): self.statsd.socket = BrokenSocket(error_number=errno.ENOBUFS) with mock.patch("datadog.dogstatsd.base.log") as mock_log: self.statsd.gauge('no error', 1) - self.statsd.flush_buffered_metrics() + self.statsd.flush() mock_log.error.assert_not_called() calls = [call("Socket buffer full: %s, dropping the packet", mock.ANY)] @@ -720,7 +720,7 @@ def test_uds_socket_ensures_min_receive_buffer(self, mock_socket_create): datadog = DogStatsd(socket_path="/fake/uds/socket/path") datadog.gauge('some value', 1) - datadog.flush_buffered_metrics() + datadog.flush() # Sanity check mock_socket_create.assert_called_once_with(socket.AF_UNIX, socket.SOCK_DGRAM) @@ -740,7 +740,7 @@ def test_udp_socket_ensures_min_receive_buffer(self, mock_socket_create): datadog = DogStatsd() datadog.gauge('some value', 1) - datadog.flush_buffered_metrics() + datadog.flush() # Sanity check mock_socket_create.assert_called_once_with(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) @@ -837,7 +837,7 @@ def func(arg1, arg2, kwarg1=1, kwarg2=1): return (arg1, arg2, kwarg1, kwarg2) func(1, 2, kwarg2=3) - self.statsd.flush_buffered_metrics() + self.statsd.flush() # Ignore telemetry packet packet = self.recv(2).split("\n")[0] @@ -881,7 +881,7 @@ def func(arg1, arg2, kwarg1=1, kwarg2=1): return (arg1, arg2, kwarg1, kwarg2) func(1, 2, kwarg2=3) - self.statsd.flush_buffered_metrics() + self.statsd.flush() packet = self.recv() name_value, type_ = packet.rstrip('\n').split('|') @@ -1068,7 +1068,7 @@ def test_flush(self): dogstatsd.increment('page.views') self.assertIsNone(fake_socket.recv(no_wait=True)) - dogstatsd.flush_buffered_metrics() + dogstatsd.flush() self.assert_equal_telemetry('page.views:1|c\n', fake_socket.recv(2)) def test_flush_interval(self): @@ -1096,7 +1096,7 @@ def test_aggregation_buffering_simultaneously(self): dogstatsd.increment('test.aggregation_and_buffering') self.assertIsNone(fake_socket.recv(no_wait=True)) dogstatsd.flush_aggregated_metrics() - dogstatsd.flush_buffered_metrics() + dogstatsd.flush() self.assert_equal_telemetry('test.aggregation_and_buffering:10|c\n', fake_socket.recv(2)) def test_aggregation_buffering_simultaneously_with_interval(self): @@ -1139,7 +1139,7 @@ def test_flush_disable(self): dogstatsd.increment('page.views') self.assertIsNone(fake_socket.recv(no_wait=True)) - time.sleep(DEFAULT_FLUSH_INTERVAL) + time.sleep(DEFAULT_BUFFERING_FLUSH_INTERVAL) self.assertIsNone(fake_socket.recv(no_wait=True)) time.sleep(0.3) @@ -1697,7 +1697,7 @@ def test_entity_id_and_container_id(self): dogstatsd._container_id = "ci-fake-container-id" dogstatsd.increment("page.views") - dogstatsd.flush_buffered_metrics() + dogstatsd.flush() tags = "dd.internal.entity_id:04652bb7-19b7-11e9-9cc6-42010a9c016d" metric = 'page.views:1|c|#' + tags + '|c:ci-fake-container-id\n' self.assertEqual(metric, dogstatsd.socket.recv()) @@ -1712,7 +1712,7 @@ def test_entity_id_and_container_id_and_external_env(self): dogstatsd._container_id = "ci-fake-container-id" dogstatsd.increment("page.views") - dogstatsd.flush_buffered_metrics() + dogstatsd.flush() tags = "dd.internal.entity_id:04652bb7-19b7-11e9-9cc6-42010a9c016d" metric = 'page.views:1|c|#' + tags + '|c:ci-fake-container-id' + '|e:it-false,cn-container-name,pu-04652bb7-19b7-11e9-9cc6-42010a9c016d' + '\n' self.assertEqual(metric, dogstatsd.socket.recv()) @@ -1795,7 +1795,7 @@ def test_dogstatsd_initialization_with_dd_env_service_version(self): # Make call with no tags passed; only the globally configured tags will be used. global_tags_str = ','.join([t for t in global_tags]) dogstatsd.gauge('gt', 123.4) - dogstatsd.flush_buffered_metrics() + dogstatsd.flush() # Protect against the no tags case. metric = 'gt:123.4|g|#{}\n'.format(global_tags_str) if global_tags_str else 'gt:123.4|g\n' @@ -1813,7 +1813,7 @@ def test_dogstatsd_initialization_with_dd_env_service_version(self): passed_tags = ['env:prod', 'version:def456', 'custom_tag:toad'] all_tags_str = ','.join([t for t in passed_tags + global_tags]) dogstatsd.gauge('gt', 123.4, tags=passed_tags) - dogstatsd.flush_buffered_metrics() + dogstatsd.flush() metric = 'gt:123.4|g|#{}\n'.format(all_tags_str) self.assertEqual(metric, dogstatsd.socket.recv()) @@ -1919,22 +1919,22 @@ def test_counter_with_container_field(self): self.statsd._container_id = "ci-fake-container-id" self.statsd.increment("page.views") - self.statsd.flush_buffered_metrics() + self.statsd.flush() self.assert_equal_telemetry("page.views:1|c|c:ci-fake-container-id\n", self.recv(2)) self.statsd._reset_telemetry() self.statsd.increment("page.views", 11) - self.statsd.flush_buffered_metrics() + self.statsd.flush() self.assert_equal_telemetry("page.views:11|c|c:ci-fake-container-id\n", self.recv(2)) self.statsd._reset_telemetry() self.statsd.decrement("page.views") - self.statsd.flush_buffered_metrics() + self.statsd.flush() self.assert_equal_telemetry("page.views:-1|c|c:ci-fake-container-id\n", self.recv(2)) self.statsd._reset_telemetry() self.statsd.decrement("page.views", 12) - self.statsd.flush_buffered_metrics() + self.statsd.flush() self.assert_equal_telemetry("page.views:-12|c|c:ci-fake-container-id\n", self.recv(2)) self.statsd._container_id = None