Skip to content

Commit 2b3e25c

Browse files
committed
storage part 2
Signed-off-by: Alex Chi <[email protected]>
1 parent 4298877 commit 2b3e25c

File tree

10 files changed

+265
-64
lines changed

10 files changed

+265
-64
lines changed

code/03-00/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ log = "0.4"
1717
prettytable-rs = { version = "0.8", default-features = false }
1818
rustyline = "9"
1919
sqlparser = "0.13"
20+
tempfile = "3"
2021
thiserror = "1"
2122
tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync", "macros", "fs"] }
2223
tokio-stream = "0.1"

code/03-00/src/db.rs

+3-9
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::executor::{ExecuteError, ExecutorBuilder};
1212
use crate::logical_planner::{LogicalPlanError, LogicalPlanner};
1313
use crate::parser::{parse, ParserError};
1414
use crate::physical_planner::{PhysicalPlanError, PhysicalPlanner};
15-
use crate::storage::DiskStorage;
15+
use crate::storage::{DiskStorage, StorageOptions};
1616

1717
/// The database instance.
1818
pub struct Database {
@@ -21,17 +21,11 @@ pub struct Database {
2121
runtime: Runtime,
2222
}
2323

24-
impl Default for Database {
25-
fn default() -> Self {
26-
Self::new()
27-
}
28-
}
29-
3024
impl Database {
3125
/// Create a new database instance.
32-
pub fn new() -> Self {
26+
pub fn new(options: StorageOptions) -> Self {
3327
let catalog = Arc::new(DatabaseCatalog::new());
34-
let storage = Arc::new(DiskStorage::new());
28+
let storage = Arc::new(DiskStorage::new(options));
3529
let parallel = matches!(std::env::var("LIGHT_PARALLEL"), Ok(s) if s == "1");
3630
let runtime = if parallel {
3731
tokio::runtime::Builder::new_multi_thread()

code/03-00/src/executor/insert.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,18 @@ impl InsertExecutor {
3737
)
3838
.collect_vec();
3939
let mut count = 0;
40+
41+
let mut txn = table.write().await?;
42+
4043
#[for_await]
4144
for chunk in self.child {
4245
let chunk = transform_chunk(chunk?, &output_columns);
4346
count += chunk.cardinality();
44-
table.append(chunk).await?;
47+
txn.append(chunk).await?;
4548
}
49+
50+
txn.commit().await?;
51+
4652
yield DataChunk::single(count as i32);
4753
}
4854
}

code/03-00/src/executor/seq_scan.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,12 @@ impl SeqScanExecutor {
1313
#[try_stream(boxed, ok = DataChunk, error = ExecuteError)]
1414
pub async fn execute(self) {
1515
let table = self.storage.get_table(self.table_ref_id)?;
16-
for chunk in table.all_chunks().await? {
16+
let txn = table.read().await?;
17+
18+
for chunk in txn.all_chunks().await? {
1719
yield chunk;
1820
}
21+
22+
txn.commit().await?;
1923
}
2024
}

code/03-00/src/main.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
//! A simple interactive shell of the database.
22
3+
use risinglight_03_00::storage::StorageOptions;
34
use risinglight_03_00::Database;
45
use rustyline::error::ReadlineError;
56
use rustyline::Editor;
67

78
fn main() {
89
env_logger::init();
910

10-
let db = Database::new();
11+
let db = Database::new(StorageOptions {
12+
base_path: "risinglight.db".into(),
13+
});
1114

1215
let mut rl = Editor::<()>::new();
1316
loop {

code/03-00/src/storage/column.rs

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
use anyhow::anyhow;
2+
use bytes::{Buf, BufMut};
3+
4+
use super::StorageResult;
5+
use crate::array::{Array, ArrayBuilder, I32Array, I32ArrayBuilder};
6+
7+
/// Encode an `I32Array` into a `Vec<u8>`.
8+
pub fn encode_int32_column(a: &I32Array, mut buffer: impl BufMut) -> StorageResult<()> {
9+
for item in a.iter() {
10+
if let Some(item) = item {
11+
buffer.put_i32_le(*item);
12+
} else {
13+
return Err(anyhow!("nullable encoding not supported!").into());
14+
}
15+
}
16+
Ok(())
17+
}
18+
19+
pub fn decode_int32_column(mut data: impl Buf) -> StorageResult<I32Array> {
20+
let mut builder = I32ArrayBuilder::with_capacity(data.remaining() / 4);
21+
while data.has_remaining() {
22+
builder.push(Some(&data.get_i32_le()));
23+
}
24+
Ok(builder.finish())
25+
}

code/03-00/src/storage/mod.rs

+106-51
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
//! On-disk storage
22
3+
mod column;
4+
mod rowset;
5+
36
use std::collections::HashMap;
47
use std::path::PathBuf;
8+
use std::sync::atomic::AtomicU32;
59
use std::sync::{Arc, RwLock};
610

711
use anyhow::anyhow;
8-
use bytes::{Buf, BufMut};
912

10-
use crate::array::{Array, ArrayBuilder, ArrayImpl, DataChunk, I32Array, I32ArrayBuilder};
13+
use self::rowset::{DiskRowset, RowSetBuilder};
14+
use crate::array::DataChunk;
1115
use crate::catalog::{ColumnDesc, TableRefId};
1216

1317
/// The error type of storage operations.
@@ -26,13 +30,16 @@ pub struct DiskStorage {
2630
/// All tables in the current storage engine.
2731
tables: RwLock<HashMap<TableRefId, StorageTableRef>>,
2832

33+
/// Generator for RowSet id.
34+
rowset_id_generator: Arc<AtomicU32>,
35+
2936
/// The storage options.
3037
options: Arc<StorageOptions>,
3138
}
3239

3340
pub struct StorageOptions {
3441
/// The directory of the storage
35-
base_path: PathBuf,
42+
pub base_path: PathBuf,
3643
}
3744

3845
pub fn err(error: impl Into<anyhow::Error>) -> StorageError {
@@ -49,22 +56,21 @@ pub struct DiskTable {
4956

5057
/// The storage options.
5158
options: Arc<StorageOptions>,
52-
}
5359

54-
impl Default for DiskStorage {
55-
fn default() -> Self {
56-
Self::new()
57-
}
60+
/// Generator for RowSet id.
61+
rowset_id_generator: Arc<AtomicU32>,
62+
63+
/// RowSets in the table
64+
rowsets: RwLock<Vec<DiskRowset>>,
5865
}
5966

6067
impl DiskStorage {
6168
/// Create a new in-memory storage.
62-
pub fn new() -> Self {
69+
pub fn new(options: StorageOptions) -> Self {
6370
DiskStorage {
6471
tables: RwLock::new(HashMap::new()),
65-
options: Arc::new(StorageOptions {
66-
base_path: "risinglight.db".into(),
67-
}),
72+
options: Arc::new(options),
73+
rowset_id_generator: Arc::new(AtomicU32::new(0)),
6874
}
6975
}
7076

@@ -75,6 +81,8 @@ impl DiskStorage {
7581
id,
7682
options: self.options.clone(),
7783
column_descs: column_descs.into(),
84+
rowsets: RwLock::new(Vec::new()),
85+
rowset_id_generator: self.rowset_id_generator.clone(),
7886
};
7987
let res = tables.insert(id, table.into());
8088
if res.is_some() {
@@ -93,61 +101,108 @@ impl DiskStorage {
93101
}
94102
}
95103

96-
/// Encode an `I32Array` into a `Vec<u8>`.
97-
fn encode_int32_column(a: &I32Array) -> StorageResult<Vec<u8>> {
98-
let mut buffer = Vec::with_capacity(a.len() * 4);
99-
for item in a.iter() {
100-
if let Some(item) = item {
101-
buffer.put_i32_le(*item);
102-
} else {
103-
return Err(anyhow!("nullable encoding not supported!").into());
104-
}
104+
impl DiskTable {
105+
/// Start a transaction which only contains write.
106+
pub async fn write(self: &Arc<Self>) -> StorageResult<DiskTransaction> {
107+
let rowsets = self.rowsets.read().unwrap();
108+
Ok(DiskTransaction {
109+
read_only: false,
110+
table: self.clone(),
111+
rowset_snapshot: rowsets.clone(),
112+
builder: None,
113+
finished: false,
114+
})
105115
}
106-
Ok(buffer)
107-
}
108116

109-
fn decode_int32_column(mut data: &[u8]) -> StorageResult<I32Array> {
110-
let mut builder = I32ArrayBuilder::with_capacity(data.len() / 4);
111-
while data.has_remaining() {
112-
builder.push(Some(&data.get_i32_le()));
117+
/// Start a transaction which only contains read.
118+
pub async fn read(self: &Arc<Self>) -> StorageResult<DiskTransaction> {
119+
let rowsets = self.rowsets.read().unwrap();
120+
Ok(DiskTransaction {
121+
read_only: true,
122+
table: self.clone(),
123+
rowset_snapshot: rowsets.clone(),
124+
builder: None,
125+
finished: false,
126+
})
113127
}
114-
Ok(builder.finish())
115-
}
116128

117-
impl DiskTable {
118-
fn table_path(&self) -> PathBuf {
129+
pub fn table_path(&self) -> PathBuf {
119130
self.options.base_path.join(self.id.table_id.to_string())
120131
}
121132

122-
fn column_path(&self, column_id: usize) -> PathBuf {
123-
self.table_path().join(format!("{}.col", column_id))
133+
pub fn rowset_path_of(&self, rowset_id: u32) -> PathBuf {
134+
self.table_path().join(rowset_id.to_string())
124135
}
136+
}
137+
138+
pub struct DiskTransaction {
139+
/// If this txn is read only.
140+
read_only: bool,
141+
142+
/// Reference to table object
143+
table: Arc<DiskTable>,
144+
145+
/// Current snapshot of RowSets
146+
rowset_snapshot: Vec<DiskRowset>,
147+
148+
/// Builder for the RowSet
149+
builder: Option<RowSetBuilder>,
125150

151+
/// Indicates whether the transaction is committed or aborted. If
152+
/// the [`SecondaryTransaction`] object is dropped without finishing,
153+
/// the transaction will panic.
154+
finished: bool,
155+
}
156+
157+
impl Drop for DiskTransaction {
158+
fn drop(&mut self) {
159+
if !self.finished {
160+
warn!("Transaction dropped without committing or aborting");
161+
}
162+
}
163+
}
164+
165+
impl DiskTransaction {
126166
/// Append a chunk to the table.
127-
pub async fn append(&self, chunk: DataChunk) -> StorageResult<()> {
128-
for (idx, column) in chunk.arrays().iter().enumerate() {
129-
if let ArrayImpl::Int32(column) = column {
130-
let column_path = self.column_path(idx);
131-
let data = encode_int32_column(column)?;
132-
tokio::fs::create_dir_all(column_path.parent().unwrap())
133-
.await
134-
.map_err(err)?;
135-
tokio::fs::write(column_path, data).await.map_err(err)?;
136-
} else {
137-
return Err(anyhow!("unsupported column type").into());
138-
}
167+
pub async fn append(&mut self, chunk: DataChunk) -> StorageResult<()> {
168+
if self.read_only {
169+
return Err(anyhow!("cannot append chunks in read only txn!").into());
170+
}
171+
if self.builder.is_none() {
172+
self.builder = Some(RowSetBuilder::new(self.table.column_descs.clone()));
139173
}
174+
let builder = self.builder.as_mut().unwrap();
175+
176+
builder.append(chunk)?;
177+
178+
Ok(())
179+
}
180+
181+
pub async fn commit(mut self) -> StorageResult<()> {
182+
self.finished = true;
183+
184+
if let Some(builder) = self.builder.take() {
185+
use std::sync::atomic::Ordering::SeqCst;
186+
let rowset_id = self.table.rowset_id_generator.fetch_add(1, SeqCst);
187+
let rowset_path = self
188+
.table
189+
.options
190+
.base_path
191+
.join(self.table.rowset_path_of(rowset_id));
192+
let rowset = builder.flush(rowset_id, rowset_path).await?;
193+
let mut rowsets = self.table.rowsets.write().unwrap();
194+
rowsets.push(rowset);
195+
}
196+
140197
Ok(())
141198
}
142199

143200
/// Get all chunks of the table.
144201
pub async fn all_chunks(&self) -> StorageResult<Vec<DataChunk>> {
145-
let mut columns = vec![];
146-
for (idx, _) in self.column_descs.iter().enumerate() {
147-
let column_path = self.column_path(idx);
148-
let data = tokio::fs::read(column_path).await.map_err(err)?;
149-
columns.push(decode_int32_column(&data)?);
202+
let mut chunks = vec![];
203+
for rowset in &self.rowset_snapshot {
204+
chunks.push(rowset.as_chunk().await?);
150205
}
151-
Ok(vec![columns.into_iter().map(ArrayImpl::Int32).collect()])
206+
Ok(chunks)
152207
}
153208
}

0 commit comments

Comments
 (0)