Skip to content

Commit 6fd75a7

Browse files
committed
feat: add redis cursor (#5)
1 parent 1a168a4 commit 6fd75a7

File tree

3 files changed

+131
-14
lines changed

3 files changed

+131
-14
lines changed

Cargo.lock

+80-13
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "oura"
33
description = "The tail of Cardano"
4-
version = "1.10.1"
4+
version = "1.11.0"
55
edition = "2021"
66
repository = "https://github.com/txpipe/oura"
77
homepage = "https://github.com/txpipe/oura"
@@ -37,6 +37,8 @@ strum_macros = "0.26.4"
3737
prometheus_exporter = { version = "0.8.5", default-features = false }
3838
unicode-truncate = "0.2.0"
3939
time = "0.3.36"
40+
#TODO: put this under a feature
41+
r2d2_redis = { version = "0.14.0" }
4042

4143
# feature logs
4244
file-rotate = { version = "0.7.1", optional = true }
@@ -76,6 +78,7 @@ google-cloud-googleapis = { version = "0.15.0", optional = true }
7678
# features: rabbitmqsink
7779
lapin = { version = "2.1.1", optional = true }
7880

81+
7982
[features]
8083
default = []
8184
web = ["reqwest"]
@@ -88,3 +91,4 @@ aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"]
8891
redissink = ["redis", "tokio"]
8992
gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "tokio", "web", "google-cloud-gax"]
9093
rabbitmqsink = ["lapin", "tokio"]
94+

src/utils/cursor.rs

+46
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@
55
//! initial point from where it should start reading. A sink should use this
66
//! utility to persist the position once a block has been processed.
77
8+
use r2d2_redis::{
9+
r2d2::{self, Pool},
10+
redis::Commands,
11+
RedisConnectionManager,
12+
};
813
use std::{
914
sync::RwLock,
1015
time::{Duration, Instant},
@@ -27,29 +32,40 @@ pub struct FileConfig {
2732
pub path: String,
2833
}
2934

35+
#[derive(Debug, Deserialize)]
36+
pub struct RedisConfig {
37+
pub url: String,
38+
pub key: String,
39+
}
40+
3041
/// A cursor provider that uses the file system as the source for persistence
3142
pub(crate) struct FileStorage(FileConfig);
3243

3344
/// An ephemeral cursor that lives only in memory
3445
pub(crate) struct MemoryStorage(PointArg);
3546

47+
pub(crate) struct RedisStorage(RedisConfig);
48+
3649
enum Storage {
3750
File(FileStorage),
3851
Memory(MemoryStorage),
52+
Redis(RedisStorage),
3953
}
4054

4155
impl CanStore for Storage {
4256
fn read_cursor(&self) -> Result<PointArg, Error> {
4357
match self {
4458
Storage::File(x) => x.read_cursor(),
4559
Storage::Memory(x) => x.read_cursor(),
60+
Storage::Redis(x) => x.read_cursor(),
4661
}
4762
}
4863

4964
fn write_cursor(&self, point: PointArg) -> Result<(), Error> {
5065
match self {
5166
Storage::File(x) => x.write_cursor(point),
5267
Storage::Memory(x) => x.write_cursor(point),
68+
Storage::Redis(x) => x.write_cursor(point),
5369
}
5470
}
5571
}
@@ -59,6 +75,7 @@ impl CanStore for Storage {
5975
pub enum Config {
6076
File(FileConfig),
6177
Memory(PointArg),
78+
Redis(RedisConfig),
6279
}
6380

6481
#[derive(Clone)]
@@ -80,6 +97,7 @@ impl Provider {
8097
storage: match config {
8198
Config::File(x) => Storage::File(FileStorage(x)),
8299
Config::Memory(x) => Storage::Memory(MemoryStorage(x)),
100+
Config::Redis(x) => Storage::Redis(RedisStorage(x)),
83101
},
84102
}
85103
}
@@ -168,3 +186,31 @@ impl CanStore for MemoryStorage {
168186
Ok(())
169187
}
170188
}
189+
190+
impl RedisStorage {
191+
pub fn get_pool(&self) -> Result<Pool<RedisConnectionManager>, Error> {
192+
let manager = RedisConnectionManager::new(self.0.url.clone())?;
193+
let pool = r2d2::Pool::builder().build(manager)?;
194+
Ok(pool)
195+
}
196+
}
197+
198+
impl CanStore for RedisStorage {
199+
fn read_cursor(&self) -> Result<PointArg, Error> {
200+
let pool = self.get_pool()?;
201+
let mut conn = pool.get()?;
202+
// let data: String = conn.get("oura-cursor")?;
203+
let data: String = conn.get(self.0.key.clone())?;
204+
let point: PointArg = serde_json::from_str(&data)?;
205+
Ok(point)
206+
}
207+
208+
fn write_cursor(&self, point: PointArg) -> Result<(), Error> {
209+
let pool = self.get_pool()?;
210+
let mut conn = pool.get()?;
211+
let data_to_write = serde_json::to_string(&point)?;
212+
// conn.set("oura-cursor", data_to_write)?;
213+
conn.set(self.0.key.clone(), data_to_write)?;
214+
Ok(())
215+
}
216+
}

0 commit comments

Comments
 (0)