Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalize counter values #199

Merged
merged 2 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions limitador/src/storage/disk/rocksdb_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ impl CounterStorage for RocksDbStorage {
Ok(counter.max_value() >= value.value() + delta)
}

fn add_counter(&self, _limit: &Limit) -> Result<(), StorageErr> {
Ok(())
}

fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> {
let key = key_for_counter(counter);
self.insert_or_update(&key, counter, delta)?;
Expand Down
23 changes: 13 additions & 10 deletions limitador/src/storage/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ impl CounterStorage for InMemoryStorage {
Ok(counter.max_value() >= value + delta)
}

fn add_counter(&self, limit: &Limit) -> Result<(), StorageErr> {
if limit.variables().is_empty() {
let mut limits_by_namespace = self.limits_for_namespace.write().unwrap();
limits_by_namespace
.entry(limit.namespace().clone())
.or_insert_with(HashMap::new)
.entry(limit.clone())
.or_insert_with(AtomicExpiringValue::default);
}
Ok(())
}

fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> {
let mut limits_by_namespace = self.limits_for_namespace.write().unwrap();
let now = SystemTime::now();
Expand Down Expand Up @@ -84,7 +96,7 @@ impl CounterStorage for InMemoryStorage {
delta: i64,
load_counters: bool,
) -> Result<Authorization, StorageErr> {
let mut limits_by_namespace = self.limits_for_namespace.write().unwrap();
let limits_by_namespace = self.limits_for_namespace.write().unwrap();
let mut first_limited = None;
let mut counter_values_to_update: Vec<(&AtomicExpiringValue, u64)> = Vec::new();
let mut qualified_counter_values_to_updated: Vec<(Arc<AtomicExpiringValue>, u64)> =
Expand All @@ -110,15 +122,6 @@ impl CounterStorage for InMemoryStorage {
None
};

// Normalize counters and values
for counter in counters.iter().filter(|c| !c.is_qualified()) {
limits_by_namespace
.entry(counter.limit().namespace().clone())
.or_insert_with(HashMap::new)
.entry(counter.limit().clone())
.or_insert_with(AtomicExpiringValue::default);
}

// Process simple counters
for counter in counters.iter_mut().filter(|c| !c.is_qualified()) {
let atomic_expiring_value: &AtomicExpiringValue = limits_by_namespace
Expand Down
10 changes: 4 additions & 6 deletions limitador/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,9 @@ impl Storage {

pub fn add_limit(&self, limit: Limit) -> bool {
let namespace = limit.namespace().clone();
self.limits
.write()
.unwrap()
.entry(namespace)
.or_default()
.insert(limit)
let mut limits = self.limits.write().unwrap();
self.counters.add_counter(&limit).unwrap();
limits.entry(namespace).or_default().insert(limit)
}

pub fn update_limit(&self, update: &Limit) -> bool {
Expand Down Expand Up @@ -263,6 +260,7 @@ impl AsyncStorage {

pub trait CounterStorage: Sync + Send {
fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result<bool, StorageErr>;
fn add_counter(&self, limit: &Limit) -> Result<(), StorageErr>;
fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr>;
fn check_and_update(
&self,
Expand Down
4 changes: 4 additions & 0 deletions limitador/src/storage/redis/redis_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ impl CounterStorage for RedisStorage {
}
}

fn add_counter(&self, _limit: &Limit) -> Result<(), StorageErr> {
Ok(())
}

fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> {
let mut con = self.conn_pool.get()?;

Expand Down
4 changes: 4 additions & 0 deletions limitador/src/storage/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ impl CounterStorage for WasmStorage {
Ok(self.counter_is_within_limits(counter, stored_counters.get(counter), delta))
}

fn add_counter(&self, _limit: &Limit) -> Result<(), StorageErr> {
Ok(())
}

fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> {
let mut counters = self.counters.write().unwrap();
self.insert_or_update_counter(&mut counters, counter, delta);
Expand Down