5
5
//! initial point from where it should start reading. A sink should use this
6
6
//! utility to persist the position once a block has been processed.
7
7
8
+ use r2d2_redis:: {
9
+ r2d2:: { self , Pool } ,
10
+ redis:: Commands ,
11
+ RedisConnectionManager ,
12
+ } ;
8
13
use std:: {
9
14
sync:: RwLock ,
10
15
time:: { Duration , Instant } ,
@@ -27,29 +32,40 @@ pub struct FileConfig {
27
32
pub path : String ,
28
33
}
29
34
35
+ #[ derive( Debug , Deserialize ) ]
36
+ pub struct RedisConfig {
37
+ pub url : String ,
38
+ pub key : String ,
39
+ }
40
+
30
41
/// A cursor provider that uses the file system as the source for persistence
31
42
pub ( crate ) struct FileStorage ( FileConfig ) ;
32
43
33
44
/// An ephemeral cursor that lives only in memory
34
45
pub ( crate ) struct MemoryStorage ( PointArg ) ;
35
46
47
+ pub ( crate ) struct RedisStorage ( RedisConfig ) ;
48
+
36
49
enum Storage {
37
50
File ( FileStorage ) ,
38
51
Memory ( MemoryStorage ) ,
52
+ Redis ( RedisStorage ) ,
39
53
}
40
54
41
55
impl CanStore for Storage {
42
56
fn read_cursor ( & self ) -> Result < PointArg , Error > {
43
57
match self {
44
58
Storage :: File ( x) => x. read_cursor ( ) ,
45
59
Storage :: Memory ( x) => x. read_cursor ( ) ,
60
+ Storage :: Redis ( x) => x. read_cursor ( ) ,
46
61
}
47
62
}
48
63
49
64
fn write_cursor ( & self , point : PointArg ) -> Result < ( ) , Error > {
50
65
match self {
51
66
Storage :: File ( x) => x. write_cursor ( point) ,
52
67
Storage :: Memory ( x) => x. write_cursor ( point) ,
68
+ Storage :: Redis ( x) => x. write_cursor ( point) ,
53
69
}
54
70
}
55
71
}
@@ -59,6 +75,7 @@ impl CanStore for Storage {
59
75
pub enum Config {
60
76
File ( FileConfig ) ,
61
77
Memory ( PointArg ) ,
78
+ Redis ( RedisConfig ) ,
62
79
}
63
80
64
81
#[ derive( Clone ) ]
@@ -80,6 +97,7 @@ impl Provider {
80
97
storage : match config {
81
98
Config :: File ( x) => Storage :: File ( FileStorage ( x) ) ,
82
99
Config :: Memory ( x) => Storage :: Memory ( MemoryStorage ( x) ) ,
100
+ Config :: Redis ( x) => Storage :: Redis ( RedisStorage ( x) ) ,
83
101
} ,
84
102
}
85
103
}
@@ -168,3 +186,31 @@ impl CanStore for MemoryStorage {
168
186
Ok ( ( ) )
169
187
}
170
188
}
189
+
190
+ impl RedisStorage {
191
+ pub fn get_pool ( & self ) -> Result < Pool < RedisConnectionManager > , Error > {
192
+ let manager = RedisConnectionManager :: new ( self . 0 . url . clone ( ) ) ?;
193
+ let pool = r2d2:: Pool :: builder ( ) . build ( manager) ?;
194
+ Ok ( pool)
195
+ }
196
+ }
197
+
198
+ impl CanStore for RedisStorage {
199
+ fn read_cursor ( & self ) -> Result < PointArg , Error > {
200
+ let pool = self . get_pool ( ) ?;
201
+ let mut conn = pool. get ( ) ?;
202
+ // let data: String = conn.get("oura-cursor")?;
203
+ let data: String = conn. get ( self . 0 . key . clone ( ) ) ?;
204
+ let point: PointArg = serde_json:: from_str ( & data) ?;
205
+ Ok ( point)
206
+ }
207
+
208
+ fn write_cursor ( & self , point : PointArg ) -> Result < ( ) , Error > {
209
+ let pool = self . get_pool ( ) ?;
210
+ let mut conn = pool. get ( ) ?;
211
+ let data_to_write = serde_json:: to_string ( & point) ?;
212
+ // conn.set("oura-cursor", data_to_write)?;
213
+ conn. set ( self . 0 . key . clone ( ) , data_to_write) ?;
214
+ Ok ( ( ) )
215
+ }
216
+ }
0 commit comments