Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decrement Redis Counter for Errored Sidekiq Jobs #24

Merged
merged 4 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ For each queue, the following metrics will be reported:
2. **shared.sidekiq._queue_.latency**: gauge of how long the oldest job has been in the queue

For each worker, the following metrics and tags will be reported:
1. **sidekiq.worker_metrics.inqueue.#{key}**: number of jobs "in queue" per worker, uses redis to track increment/decrement
1. **sidekiq.worker_metrics.in_queue.#{key}**: number of jobs "in queue" per worker, uses redis to track increment/decrement

## DogStatsD Keys
For each job, the following metrics and tags will be reported:
Expand All @@ -111,7 +111,7 @@ For each queue, the following metrics and tags will be reported:
2. **sidekiq.queue.latency (tags: {queue: _queue_})**: gauge of how long the oldest job has been in the queue

For each worker, the following metrics and tags will be reported:
1. **sidekiq.worker_metrics.inqueue.#{key}**: number of jobs "in queue" per worker, uses redis to track increment/decrement
1. **sidekiq.worker_metrics.in_queue.#{key}**: number of jobs "in queue" per worker, uses redis to track increment/decrement

## Worker
There is a worker, `Sidekiq::Instrument::Worker`, that submits gauges
Expand Down
4 changes: 2 additions & 2 deletions lib/sidekiq/instrument/middleware/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ module Sidekiq::Instrument
class ServerMiddleware
include Sidekiq::Instrument::MetricNames

def call(worker, job, queue, &block)
def call(worker, _job, _queue, &block)
Statter.statsd.increment(metric_name(worker, 'dequeue'))
Statter.dogstatsd&.increment('sidekiq.dequeue', worker_dog_options(worker))

start_time = Time.now
WorkerMetrics.trace_workers_decrement_counter(worker.class.to_s.underscore)
yield block
execution_time_ms = (Time.now - start_time) * 1000
Statter.statsd.measure(metric_name(worker, 'runtime'), execution_time_ms)
Expand All @@ -22,6 +21,7 @@ def call(worker, job, queue, &block)
Statter.dogstatsd&.increment('sidekiq.error', worker_dog_options(worker))
raise e
ensure
WorkerMetrics.trace_workers_decrement_counter(worker.class.to_s.underscore)
Statter.dogstatsd&.flush(sync: true)
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq/instrument/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Sidekiq
module Instrument
VERSION = '0.6.1'
VERSION = '0.6.2'
end
end
8 changes: 5 additions & 3 deletions lib/sidekiq/instrument/worker.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

require 'sidekiq'
require 'sidekiq/api'

Expand All @@ -13,7 +15,7 @@ class Worker
workers_size: :workers,
enqueued: :pending,
failed: nil
}
}.freeze

def perform
info = Sidekiq::Stats.new
Expand Down Expand Up @@ -64,8 +66,8 @@ def send_worker_metrics
return unless WorkerMetrics.enabled

WorkerMetrics.workers_in_queue.each do |key, value|
Statter.statsd.gauge("shared.sidekiq.worker_metrics.inqueue.#{key}", value)
Statter.dogstatsd&.gauge("shared.sidekiq.worker_metrics.inqueue", value, tags: ["worker:#{key}"])
Statter.statsd.gauge("shared.sidekiq.worker_metrics.in_queue.#{key}", value)
Statter.dogstatsd&.gauge('shared.sidekiq.worker_metrics.in_queue', value, tags: ["worker:#{key}"])
end
end
end
Expand Down
1 change: 1 addition & 0 deletions sidekiq-instrument.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'bundler', '~> 2.0', '>= 2.0.2'
spec.add_development_dependency 'rake', '~> 12.0'
spec.add_development_dependency 'rspec', '~> 3.0'
spec.add_development_dependency 'rubocop', '~> 1.0'
spec.add_development_dependency 'pry-byebug', '~> 3.4'
spec.add_development_dependency 'simplecov'
spec.add_development_dependency 'simplecov-cobertura'
Expand Down
52 changes: 23 additions & 29 deletions spec/sidekiq-instrument/client_middleware_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
# frozen_string_literal: true

require 'sidekiq/instrument/middleware/client'

RSpec.describe Sidekiq::Instrument::ClientMiddleware do
describe '#call' do
let(:worker_metric_name) do
'sidekiq_instrument_trace_workers::in_queue'
end

before(:all) do
Sidekiq.configure_client do |c|
c.client_middleware do |chain|
Expand All @@ -10,6 +16,10 @@
end
end

before(:each) do
Redis.new.flushall
end

after(:all) do
Sidekiq.configure_client do |c|
c.client_middleware do |chain|
Expand All @@ -20,50 +30,34 @@

context 'without statsd_metric_name' do
it 'increments the StatsD enqueue counter' do
expect {
expect do
MyWorker.perform_async
}.to trigger_statsd_increment('shared.sidekiq.default.MyWorker.enqueue')
end.to trigger_statsd_increment('shared.sidekiq.default.MyWorker.enqueue')
end

it 'increments the DogStatsD enqueue counter' do
expect(Sidekiq::Instrument::Statter.dogstatsd).to receive(:increment).with('sidekiq.enqueue', { tags: ['queue:default', 'worker:my_worker'] }).once
expect(
Sidekiq::Instrument::Statter.dogstatsd
).to receive(:increment).with('sidekiq.enqueue', { tags: ['queue:default', 'worker:my_worker'] }).once
MyWorker.perform_async
end
end

context 'with statsd_metric_name' do
it 'increments the enqueue counter' do
expect {
expect do
MyOtherWorker.perform_async
}.to trigger_statsd_increment('my_other_worker.enqueue')
end.to trigger_statsd_increment('my_other_worker.enqueue')
end
end

context 'with WorkerMetrics.enabled true' do
let(:worker_metric_name) do
"sidekiq_instrument_trace_workers::in_queue"
end
it 'increments the enqueue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
Redis.new.hdel worker_metric_name ,'my_other_worker'
MyOtherWorker.perform_async
expect(
Redis.new.hget worker_metric_name ,'my_other_worker'
).to eq('1')
end
end

context 'with WorkerMetrics.enabled true and redis_config not provided' do
hkim3162 marked this conversation as resolved.
Show resolved Hide resolved
let(:worker_metric_name) do
"sidekiq_instrument_trace_workers::in_queue"
end
it 'increments the enqueue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
Redis.new.hdel worker_metric_name ,'my_other_worker'
MyOtherWorker.perform_async
expect(
Redis.new.hget worker_metric_name ,'my_other_worker'
).to eq('1')
it 'increments the in_queue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
MyOtherWorker.perform_async
expect(Redis.new.hget(worker_metric_name, 'my_other_worker')).to eq('1')
MyOtherWorker.perform_async
expect(Redis.new.hget(worker_metric_name, 'my_other_worker')).to eq('2')
end
end

Expand Down
94 changes: 62 additions & 32 deletions spec/sidekiq-instrument/server_middleware_spec.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
# frozen_string_literal: true

require 'sidekiq/instrument/middleware/server'

RSpec.describe Sidekiq::Instrument::ServerMiddleware do
describe '#call' do
let(:expected_dog_options) { { tags: ['queue:default', 'worker:my_worker'] } }
let(:worker_metric_name) do
'sidekiq_instrument_trace_workers::in_queue'
end

before(:all) do
Sidekiq::Testing.server_middleware do |chain|
chain.add described_class
end
end

before(:each) do
Redis.new.flushall
end

after(:all) do
Sidekiq::Testing.server_middleware do |chain|
chain.remove described_class
Expand All @@ -18,20 +27,22 @@

context 'when a job succeeds' do
it 'increments StatsD dequeue counter' do
expect {
expect do
MyWorker.perform_async
}.to trigger_statsd_increment('shared.sidekiq.default.MyWorker.dequeue')
end.to trigger_statsd_increment('shared.sidekiq.default.MyWorker.dequeue')
end

it 'increments DogStatsD dequeue counter' do
expect(Sidekiq::Instrument::Statter.dogstatsd).to receive(:increment).with('sidekiq.dequeue', expected_dog_options).once
expect(
Sidekiq::Instrument::Statter.dogstatsd
).to receive(:increment).with('sidekiq.dequeue', expected_dog_options).once
MyWorker.perform_async
end

it 'measures StatsD job runtime' do
expect {
expect do
MyWorker.perform_async
}.to trigger_statsd_measure('shared.sidekiq.default.MyWorker.runtime')
end.to trigger_statsd_measure('shared.sidekiq.default.MyWorker.runtime')
end

it 'measures DogStatsD job runtime' do
Expand All @@ -40,53 +51,72 @@
end

context 'with WorkerMetrics.enabled true' do
let(:worker_metric_name) do
"sidekiq_instrument_trace_workers::in_queue"
end
it 'increments the enqueue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
Redis.new.hdel worker_metric_name ,'my_other_worker'
MyOtherWorker.perform_async
expect(
Redis.new.hget worker_metric_name ,'my_other_worker'
).to eq('-1')
it 'decrements the in_queue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
Redis.new.hdel(worker_metric_name, 'my_other_worker')
MyOtherWorker.perform_async
expect(Redis.new.hget(worker_metric_name, 'my_other_worker')).to eq('-1')
hkim3162 marked this conversation as resolved.
Show resolved Hide resolved
end
end

context 'with WorkerMetrics.enabled true, and redis_config not given' do
let(:worker_metric_name) do
"sidekiq_instrument_trace_workers::in_queue"
end
it 'increments the enqueue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
Redis.new.hdel worker_metric_name ,'my_other_worker'
context 'with WorkerMetrics.enabled true and an errored job' do
it 'decrements the in_queue counter' do
Sidekiq::Instrument::WorkerMetrics.enabled = true
MyOtherWorker.perform_async
expect(Redis.new.hget(worker_metric_name, 'my_other_worker')).to eq('-1')
begin
MyOtherWorker.perform_async
expect(
Redis.new.hget worker_metric_name ,'my_other_worker'
).to eq('-1')
rescue StandardError
nil
end
expect(Redis.new.hget(worker_metric_name, 'my_other_worker')).to eq('-2')
end
end
end

context 'when a job fails' do
before { allow_any_instance_of(MyWorker).to receive(:perform).and_raise('foo') }
before do
allow_any_instance_of(MyWorker).to receive(:perform).and_raise('foo')
end

it 'increments the StatsD failure counter' do
expect {
MyWorker.perform_async rescue nil
}.to trigger_statsd_increment('shared.sidekiq.default.MyWorker.error')
expect do
MyWorker.perform_async
rescue StandardError
nil
end.to trigger_statsd_increment('shared.sidekiq.default.MyWorker.error')
end

it 'increments the DogStatsD failure counter' do
expect(Sidekiq::Instrument::Statter.dogstatsd).to receive(:increment).with('sidekiq.dequeue', expected_dog_options).once
expect(
Sidekiq::Instrument::Statter.dogstatsd
).to receive(:increment).with('sidekiq.dequeue', expected_dog_options).once
expect(Sidekiq::Instrument::Statter.dogstatsd).not_to receive(:time)
expect(Sidekiq::Instrument::Statter.dogstatsd).to receive(:increment).with('sidekiq.error', expected_dog_options).once
MyWorker.perform_async rescue nil
expect(
Sidekiq::Instrument::Statter.dogstatsd
).to receive(:increment).with('sidekiq.error', expected_dog_options).once

begin
MyWorker.perform_async
rescue StandardError
nil
end
end

it 're-raises the error' do
expect { MyWorker.perform_async }.to raise_error 'foo'
end

it 'calls the decrement counter' do
expect(
Sidekiq::Instrument::WorkerMetrics
).to receive(:trace_workers_decrement_counter).with('my_worker').once
begin
MyWorker.perform_async
rescue StandardError
nil
end
end
end

context 'without optional DogStatsD client' do
Expand Down
Loading