Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 32263e3

Browse files
committedFeb 20, 2022
storage part 2
Signed-off-by: Alex Chi <[email protected]>
1 parent 64347c0 commit 32263e3

File tree

10 files changed

+265
-64
lines changed

10 files changed

+265
-64
lines changed
 

‎code/03-00/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ log = "0.4"
1717
prettytable-rs = { version = "0.8", default-features = false }
1818
rustyline = "9"
1919
sqlparser = "0.13"
20+
tempfile = "3"
2021
thiserror = "1"
2122
tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync", "macros", "fs"] }
2223
tokio-stream = "0.1"

‎code/03-00/src/db.rs

+3-9
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::executor::{ExecuteError, ExecutorBuilder};
1212
use crate::logical_planner::{LogicalPlanError, LogicalPlanner};
1313
use crate::parser::{parse, ParserError};
1414
use crate::physical_planner::{PhysicalPlanError, PhysicalPlanner};
15-
use crate::storage::DiskStorage;
15+
use crate::storage::{DiskStorage, StorageOptions};
1616

1717
/// The database instance.
1818
pub struct Database {
@@ -21,17 +21,11 @@ pub struct Database {
2121
runtime: Runtime,
2222
}
2323

24-
impl Default for Database {
25-
fn default() -> Self {
26-
Self::new()
27-
}
28-
}
29-
3024
impl Database {
3125
/// Create a new database instance.
32-
pub fn new() -> Self {
26+
pub fn new(options: StorageOptions) -> Self {
3327
let catalog = Arc::new(DatabaseCatalog::new());
34-
let storage = Arc::new(DiskStorage::new());
28+
let storage = Arc::new(DiskStorage::new(options));
3529
let parallel = matches!(std::env::var("LIGHT_PARALLEL"), Ok(s) if s == "1");
3630
let runtime = if parallel {
3731
tokio::runtime::Builder::new_multi_thread()

‎code/03-00/src/executor/insert.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,18 @@ impl InsertExecutor {
3737
)
3838
.collect_vec();
3939
let mut count = 0;
40+
41+
let mut txn = table.write().await?;
42+
4043
#[for_await]
4144
for chunk in self.child {
4245
let chunk = transform_chunk(chunk?, &output_columns);
4346
count += chunk.cardinality();
44-
table.append(chunk).await?;
47+
txn.append(chunk).await?;
4548
}
49+
50+
txn.commit().await?;
51+
4652
yield DataChunk::single(count as i32);
4753
}
4854
}

‎code/03-00/src/executor/seq_scan.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,12 @@ impl SeqScanExecutor {
1313
#[try_stream(boxed, ok = DataChunk, error = ExecuteError)]
1414
pub async fn execute(self) {
1515
let table = self.storage.get_table(self.table_ref_id)?;
16-
for chunk in table.all_chunks().await? {
16+
let txn = table.read().await?;
17+
18+
for chunk in txn.all_chunks().await? {
1719
yield chunk;
1820
}
21+
22+
txn.commit().await?;
1923
}
2024
}

‎code/03-00/src/main.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
//! A simple interactive shell of the database.
22
3+
use risinglight_03_00::storage::StorageOptions;
34
use risinglight_03_00::Database;
45
use rustyline::error::ReadlineError;
56
use rustyline::Editor;
67

78
fn main() {
89
env_logger::init();
910

10-
let db = Database::new();
11+
let db = Database::new(StorageOptions {
12+
base_path: "risinglight.db".into(),
13+
});
1114

1215
let mut rl = Editor::<()>::new();
1316
loop {

‎code/03-00/src/storage/column.rs

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
use anyhow::anyhow;
2+
use bytes::{Buf, BufMut};
3+
4+
use super::StorageResult;
5+
use crate::array::{Array, ArrayBuilder, I32Array, I32ArrayBuilder};
6+
7+
/// Encode an `I32Array` into a `Vec<u8>`.
8+
pub fn encode_int32_column(a: &I32Array, mut buffer: impl BufMut) -> StorageResult<()> {
9+
for item in a.iter() {
10+
if let Some(item) = item {
11+
buffer.put_i32_le(*item);
12+
} else {
13+
return Err(anyhow!("nullable encoding not supported!").into());
14+
}
15+
}
16+
Ok(())
17+
}
18+
19+
pub fn decode_int32_column(mut data: impl Buf) -> StorageResult<I32Array> {
20+
let mut builder = I32ArrayBuilder::with_capacity(data.remaining() / 4);
21+
while data.has_remaining() {
22+
builder.push(Some(&data.get_i32_le()));
23+
}
24+
Ok(builder.finish())
25+
}

‎code/03-00/src/storage/mod.rs

+106-51
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
//! On-disk storage
22
3+
mod column;
4+
mod rowset;
5+
36
use std::collections::HashMap;
47
use std::path::PathBuf;
8+
use std::sync::atomic::AtomicU32;
59
use std::sync::{Arc, RwLock};
610

711
use anyhow::anyhow;
8-
use bytes::{Buf, BufMut};
912

10-
use crate::array::{Array, ArrayBuilder, ArrayImpl, DataChunk, I32Array, I32ArrayBuilder};
13+
use self::rowset::{DiskRowset, RowSetBuilder};
14+
use crate::array::DataChunk;
1115
use crate::catalog::{ColumnDesc, TableRefId};
1216

1317
/// The error type of storage operations.
@@ -26,13 +30,16 @@ pub struct DiskStorage {
2630
/// All tables in the current storage engine.
2731
tables: RwLock<HashMap<TableRefId, StorageTableRef>>,
2832

33+
/// Generator for RowSet id.
34+
rowset_id_generator: Arc<AtomicU32>,
35+
2936
/// The storage options.
3037
options: Arc<StorageOptions>,
3138
}
3239

3340
pub struct StorageOptions {
3441
/// The directory of the storage
35-
base_path: PathBuf,
42+
pub base_path: PathBuf,
3643
}
3744

3845
pub fn err(error: impl Into<anyhow::Error>) -> StorageError {
@@ -49,22 +56,21 @@ pub struct DiskTable {
4956

5057
/// The storage options.
5158
options: Arc<StorageOptions>,
52-
}
5359

54-
impl Default for DiskStorage {
55-
fn default() -> Self {
56-
Self::new()
57-
}
60+
/// Generator for RowSet id.
61+
rowset_id_generator: Arc<AtomicU32>,
62+
63+
/// RowSets in the table
64+
rowsets: RwLock<Vec<DiskRowset>>,
5865
}
5966

6067
impl DiskStorage {
6168
/// Create a new in-memory storage.
62-
pub fn new() -> Self {
69+
pub fn new(options: StorageOptions) -> Self {
6370
DiskStorage {
6471
tables: RwLock::new(HashMap::new()),
65-
options: Arc::new(StorageOptions {
66-
base_path: "risinglight.db".into(),
67-
}),
72+
options: Arc::new(options),
73+
rowset_id_generator: Arc::new(AtomicU32::new(0)),
6874
}
6975
}
7076

@@ -75,6 +81,8 @@ impl DiskStorage {
7581
id,
7682
options: self.options.clone(),
7783
column_descs: column_descs.into(),
84+
rowsets: RwLock::new(Vec::new()),
85+
rowset_id_generator: self.rowset_id_generator.clone(),
7886
};
7987
let res = tables.insert(id, table.into());
8088
if res.is_some() {
@@ -93,61 +101,108 @@ impl DiskStorage {
93101
}
94102
}
95103

96-
/// Encode an `I32Array` into a `Vec<u8>`.
97-
fn encode_int32_column(a: &I32Array) -> StorageResult<Vec<u8>> {
98-
let mut buffer = Vec::with_capacity(a.len() * 4);
99-
for item in a.iter() {
100-
if let Some(item) = item {
101-
buffer.put_i32_le(*item);
102-
} else {
103-
return Err(anyhow!("nullable encoding not supported!").into());
104-
}
104+
impl DiskTable {
105+
/// Start a transaction which only contains write.
106+
pub async fn write(self: &Arc<Self>) -> StorageResult<DiskTransaction> {
107+
let rowsets = self.rowsets.read().unwrap();
108+
Ok(DiskTransaction {
109+
read_only: false,
110+
table: self.clone(),
111+
rowset_snapshot: rowsets.clone(),
112+
builder: None,
113+
finished: false,
114+
})
105115
}
106-
Ok(buffer)
107-
}
108116

109-
fn decode_int32_column(mut data: &[u8]) -> StorageResult<I32Array> {
110-
let mut builder = I32ArrayBuilder::with_capacity(data.len() / 4);
111-
while data.has_remaining() {
112-
builder.push(Some(&data.get_i32_le()));
117+
/// Start a transaction which only contains read.
118+
pub async fn read(self: &Arc<Self>) -> StorageResult<DiskTransaction> {
119+
let rowsets = self.rowsets.read().unwrap();
120+
Ok(DiskTransaction {
121+
read_only: true,
122+
table: self.clone(),
123+
rowset_snapshot: rowsets.clone(),
124+
builder: None,
125+
finished: false,
126+
})
113127
}
114-
Ok(builder.finish())
115-
}
116128

117-
impl DiskTable {
118-
fn table_path(&self) -> PathBuf {
129+
pub fn table_path(&self) -> PathBuf {
119130
self.options.base_path.join(self.id.table_id.to_string())
120131
}
121132

122-
fn column_path(&self, column_id: usize) -> PathBuf {
123-
self.table_path().join(format!("{}.col", column_id))
133+
pub fn rowset_path_of(&self, rowset_id: u32) -> PathBuf {
134+
self.table_path().join(rowset_id.to_string())
124135
}
136+
}
137+
138+
pub struct DiskTransaction {
139+
/// If this txn is read only.
140+
read_only: bool,
141+
142+
/// Reference to table object
143+
table: Arc<DiskTable>,
144+
145+
/// Current snapshot of RowSets
146+
rowset_snapshot: Vec<DiskRowset>,
147+
148+
/// Builder for the RowSet
149+
builder: Option<RowSetBuilder>,
125150

151+
/// Indicates whether the transaction is committed or aborted. If
152+
/// the [`SecondaryTransaction`] object is dropped without finishing,
153+
/// the transaction will panic.
154+
finished: bool,
155+
}
156+
157+
impl Drop for DiskTransaction {
158+
fn drop(&mut self) {
159+
if !self.finished {
160+
warn!("Transaction dropped without committing or aborting");
161+
}
162+
}
163+
}
164+
165+
impl DiskTransaction {
126166
/// Append a chunk to the table.
127-
pub async fn append(&self, chunk: DataChunk) -> StorageResult<()> {
128-
for (idx, column) in chunk.arrays().iter().enumerate() {
129-
if let ArrayImpl::Int32(column) = column {
130-
let column_path = self.column_path(idx);
131-
let data = encode_int32_column(column)?;
132-
tokio::fs::create_dir_all(column_path.parent().unwrap())
133-
.await
134-
.map_err(err)?;
135-
tokio::fs::write(column_path, data).await.map_err(err)?;
136-
} else {
137-
return Err(anyhow!("unsupported column type").into());
138-
}
167+
pub async fn append(&mut self, chunk: DataChunk) -> StorageResult<()> {
168+
if self.read_only {
169+
return Err(anyhow!("cannot append chunks in read only txn!").into());
170+
}
171+
if self.builder.is_none() {
172+
self.builder = Some(RowSetBuilder::new(self.table.column_descs.clone()));
139173
}
174+
let builder = self.builder.as_mut().unwrap();
175+
176+
builder.append(chunk)?;
177+
178+
Ok(())
179+
}
180+
181+
pub async fn commit(mut self) -> StorageResult<()> {
182+
self.finished = true;
183+
184+
if let Some(builder) = self.builder.take() {
185+
use std::sync::atomic::Ordering::SeqCst;
186+
let rowset_id = self.table.rowset_id_generator.fetch_add(1, SeqCst);
187+
let rowset_path = self
188+
.table
189+
.options
190+
.base_path
191+
.join(self.table.rowset_path_of(rowset_id));
192+
let rowset = builder.flush(rowset_id, rowset_path).await?;
193+
let mut rowsets = self.table.rowsets.write().unwrap();
194+
rowsets.push(rowset);
195+
}
196+
140197
Ok(())
141198
}
142199

143200
/// Get all chunks of the table.
144201
pub async fn all_chunks(&self) -> StorageResult<Vec<DataChunk>> {
145-
let mut columns = vec![];
146-
for (idx, _) in self.column_descs.iter().enumerate() {
147-
let column_path = self.column_path(idx);
148-
let data = tokio::fs::read(column_path).await.map_err(err)?;
149-
columns.push(decode_int32_column(&data)?);
202+
let mut chunks = vec![];
203+
for rowset in &self.rowset_snapshot {
204+
chunks.push(rowset.as_chunk().await?);
150205
}
151-
Ok(vec![columns.into_iter().map(ArrayImpl::Int32).collect()])
206+
Ok(chunks)
152207
}
153208
}

‎code/03-00/src/storage/rowset.rs

+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
use std::path::{Path, PathBuf};
2+
use std::sync::Arc;
3+
4+
use anyhow::anyhow;
5+
use itertools::Itertools;
6+
7+
use super::column::{decode_int32_column, encode_int32_column};
8+
use super::{err, StorageResult};
9+
use crate::array::{ArrayImpl, DataChunk};
10+
use crate::catalog::ColumnDesc;
11+
12+
fn column_path(rowset_path: impl AsRef<Path>, column_id: usize) -> PathBuf {
13+
rowset_path.as_ref().join(format!("{}.col", column_id))
14+
}
15+
16+
#[derive(Clone)]
17+
pub struct DiskRowset {
18+
/// Columns of the current RowSet.
19+
column_descs: Arc<[ColumnDesc]>,
20+
21+
/// Id of the current rowset within the table.
22+
#[allow(dead_code)]
23+
rowset_id: u32,
24+
25+
/// Base path of the RowSet
26+
rowset_path: PathBuf,
27+
}
28+
29+
impl DiskRowset {
30+
pub async fn as_chunk(&self) -> StorageResult<DataChunk> {
31+
let mut columns = vec![];
32+
for (idx, _) in self.column_descs.iter().enumerate() {
33+
let column_path = column_path(&self.rowset_path, idx);
34+
let data = tokio::fs::read(column_path).await.map_err(err)?;
35+
columns.push(decode_int32_column(&data[..])?);
36+
}
37+
Ok(columns.into_iter().map(ArrayImpl::Int32).collect())
38+
}
39+
}
40+
41+
pub struct RowSetBuilder {
42+
/// Columns of the current RowSet.
43+
column_descs: Arc<[ColumnDesc]>,
44+
45+
/// Buffer of all column data
46+
buffer: Vec<Vec<u8>>,
47+
}
48+
49+
impl RowSetBuilder {
50+
pub fn new(column_descs: Arc<[ColumnDesc]>) -> Self {
51+
RowSetBuilder {
52+
buffer: (0..column_descs.len()).map(|_| vec![]).collect_vec(),
53+
column_descs,
54+
}
55+
}
56+
57+
pub fn append(&mut self, chunk: DataChunk) -> StorageResult<()> {
58+
for (idx, column) in chunk.arrays().iter().enumerate() {
59+
if let ArrayImpl::Int32(column) = column {
60+
encode_int32_column(column, &mut self.buffer[idx])?;
61+
} else {
62+
return Err(anyhow!("unsupported column type").into());
63+
}
64+
}
65+
Ok(())
66+
}
67+
68+
pub async fn flush(
69+
self,
70+
rowset_id: u32,
71+
rowset_path: impl AsRef<Path>,
72+
) -> StorageResult<DiskRowset> {
73+
let rowset_path = rowset_path.as_ref();
74+
75+
tokio::fs::create_dir_all(rowset_path).await.map_err(err)?;
76+
77+
for (idx, _) in self.column_descs.iter().enumerate() {
78+
let column_path = column_path(rowset_path, idx);
79+
tokio::fs::write(column_path, &self.buffer[idx])
80+
.await
81+
.map_err(err)?;
82+
}
83+
84+
Ok(DiskRowset {
85+
column_descs: self.column_descs,
86+
rowset_id,
87+
rowset_path: rowset_path.into(),
88+
})
89+
}
90+
}

‎code/03-00/src/test.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,22 @@
11
use std::path::Path;
22

3+
use tempfile::tempdir;
34
use test_case::test_case;
45

56
use crate::array::DataChunk;
7+
use crate::storage::StorageOptions;
68
use crate::types::DataValue;
79
use crate::{Database, Error};
810

911
#[test_case("03-01.slt")]
12+
#[test_case("03-02.slt")]
1013
fn test(name: &str) {
1114
init_logger();
1215
let script = std::fs::read_to_string(Path::new("../sql").join(name)).unwrap();
13-
let mut tester = sqllogictest::Runner::new(Database::new());
16+
let tempdir = tempdir().unwrap();
17+
let mut tester = sqllogictest::Runner::new(Database::new(StorageOptions {
18+
base_path: tempdir.path().into(),
19+
}));
1420
if let Err(err) = tester.run_script(&script) {
1521
panic!("{}", err);
1622
}

‎code/sql/03-02.slt

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# 03-01: RowSet tests
2+
3+
statement ok
4+
CREATE TABLE t (a INT NOT NULL, b INT NOT NULL, c INT NOT NULL)
5+
6+
statement ok
7+
INSERT INTO t VALUES (1,10,100)
8+
9+
statement ok
10+
INSERT INTO t VALUES (2,20,200), (3,30,300)
11+
12+
query III rowsort
13+
SELECT * FROM t
14+
----
15+
1 10 100
16+
2 20 200
17+
3 30 300

0 commit comments

Comments
 (0)
Please sign in to comment.