Skip to content

Commit

Permalink
Merge pull request #18 from rubrikinc/Grouping-logic
Browse files Browse the repository at this point in the history
Fix Grouping logic
  • Loading branch information
Athishpranav2003 authored Jul 25, 2024
2 parents 8b5da7a + 4bcc611 commit fec3bcb
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 7 deletions.
2 changes: 1 addition & 1 deletion fluent-plugin-quota-throttle.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)

Gem::Specification.new do |spec|
spec.name = "fluent-plugin-quota-throttle"
spec.version = "0.0.2"
spec.version = "0.0.3"
spec.authors = ["Athish Pranav D", "Dipendra Singh", "Rubrik Inc."]
spec.email = ["[email protected]", "[email protected]"]
spec.summary = %q{Fluentd filter for throttling logs based on a configurable quotas.}
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/config_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ module ConfigParser
# +action+: (String) The action to take when the quota is reached. Must be one of the predefined actions in @@allowed_actions.
class Quota

attr_accessor :name, :desc, :group_by, :match_by, :bucket_size, :duration, :action
attr_reader :name, :desc, :group_by, :match_by, :bucket_size, :duration, :action

@@allowed_actions = Set["drop", "reemit"]

Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/filter_quota_throttle.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def filter(tag, time, record)
# +timestamp+: (Time) The timestamp of the record
def quota_breached(tag, timestamp, record, bucket, quota)
if bucket.last_warning.nil? || Time.now - bucket.last_warning > @warning_delay
log.warn "Quota breached for group #{bucket.group} in quota #{quota.name}"
log.warn "Quota breached for {group: #{bucket.group}, quota: #{quota.name}, total_logs: #{bucket.bucket_count_total}, limit: #{bucket.bucket_limit}, current_rate: #{bucket.approx_rate_per_second}}"
bucket.last_warning = Time.now
end
case quota.action
Expand Down
9 changes: 7 additions & 2 deletions lib/fluent/plugin/rate_limiter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ module RateLimiter
##
# Bucket class, contains the rate limiting logic for each group
# Attributes:
# +group+: Group for which the bucket is created
# +bucket_count+: Number of requests in the bucket
# +bucket_count_total+: Number of requests in the bucket including the dropped requests
# +bucket_last_reset+: Time when the bucket was last reset
# +approx_rate_per_second+: Approximate rate of requests per second
# +rate_last_reset+: Time when the rate was last reset
Expand All @@ -16,12 +18,13 @@ module RateLimiter
# +bucket_period+: Time period for the bucket
# +rate_limit+: Maximum number of requests allowed per second
class Bucket
attr_accessor :bucket_count, :bucket_last_reset, :approx_rate_per_second, :rate_last_reset, :curr_count, :last_warning
attr_accessor :bucket_count, :bucket_count_total, :bucket_last_reset, :approx_rate_per_second, :rate_last_reset, :curr_count, :last_warning
attr_reader :bucket_limit, :bucket_period, :rate_limit, :timeout_s, :group
def initialize( group, bucket_limit, bucket_period)
now = Time.now
@group = group
@bucket_count = 0
@bucket_count_total = 0
@bucket_last_reset = now
@approx_rate_per_second = 0
@rate_last_reset = now
Expand All @@ -43,6 +46,7 @@ def allow
end
now = Time.now
@curr_count += 1
@bucket_count_total += 1
time_lapsed = now - @rate_last_reset

if time_lapsed.to_i >= 1
Expand Down Expand Up @@ -80,6 +84,7 @@ def reset_bucket
now = Time.now
unless @bucket_count == -1 && @approx_rate_per_second > @rate_limit
@bucket_count = 0
@bucket_count_total = 0
@bucket_last_reset = now
end
end
Expand All @@ -99,7 +104,7 @@ def initialize
# +group+: Group for which the bucket is required
# +quota+: Quota object containing the bucket size and duration
def get_bucket(group, quota)
@buckets[group] = @buckets.delete(group) || Bucket.new( group, quota.bucket_size, quota.duration)
@buckets[[group, quota.name]] = @buckets.delete([group, quota.name]) || Bucket.new( group, quota.bucket_size, quota.duration)
end

# Cleans the buckets that have expired
Expand Down
6 changes: 4 additions & 2 deletions test/modules/rate_limiter_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def test_bucket_allow_free
def test_bucket_allow_full
11.times { @bucket.allow }
assert_equal false, @bucket.allow
assert_equal -1, @bucket.bucket_count
assert_equal 12, @bucket.bucket_count_total
end

def test_reset_bucket
Expand Down Expand Up @@ -63,10 +65,10 @@ def test_clean_buckets
group2 = "value2"
@bucket_store.get_bucket(group2, @quota)
lru_group, lru_counter = @bucket_store.instance_variable_get(:@buckets).first
assert_equal group1, lru_group
assert_equal [group1, @quota.name], lru_group
sleep(5)
@bucket_store.clean_buckets
lru_group, lru_counter = @bucket_store.instance_variable_get(:@buckets).first
assert_equal group2, lru_group
assert_equal [group2, @quota.name], lru_group
end
end

0 comments on commit fec3bcb

Please sign in to comment.