From ba747ae1ed16c7386da901adf6475a8916a5192b Mon Sep 17 00:00:00 2001 From: Alissa Tung Date: Wed, 1 Feb 2023 15:18:22 +0800 Subject: [PATCH 1/5] tests: init utils --- src/hstreamdb/src/producer.rs | 8 ++ src/hstreamdb/tests/common.rs | 108 ++++++++++++++++++++++++ src/hstreamdb/tests/integration_test.rs | 23 +++++ src/hstreamdb/tests/tls_test.rs | 59 +++++++------ 4 files changed, 172 insertions(+), 26 deletions(-) create mode 100644 src/hstreamdb/tests/common.rs create mode 100644 src/hstreamdb/tests/integration_test.rs diff --git a/src/hstreamdb/src/producer.rs b/src/hstreamdb/src/producer.rs index 9efeed1..c5dc79f 100644 --- a/src/hstreamdb/src/producer.rs +++ b/src/hstreamdb/src/producer.rs @@ -71,6 +71,14 @@ impl FlushSettings { pub fn builder() -> FlushSettingsBuilder { default() } + + pub fn sync() -> Self { + Self { + len: 0, + size: 0, + deadline: None, + } + } } #[derive(Default)] diff --git a/src/hstreamdb/tests/common.rs b/src/hstreamdb/tests/common.rs new file mode 100644 index 0000000..02463ba --- /dev/null +++ b/src/hstreamdb/tests/common.rs @@ -0,0 +1,108 @@ +use std::env; + +use hstreamdb::appender::Appender; +use hstreamdb::common::{CompressionType, SpecialOffset, Stream}; +use hstreamdb::producer::{FlushSettings, Producer}; +use hstreamdb::{ChannelProviderSettings, Record, Subscription}; +use hstreamdb_test_utils::rand_alphanumeric; + +pub async fn init_client() -> anyhow::Result { + let server_url = env::var("TEST_SERVER_ADDR").unwrap(); + let channel_provider_settings = ChannelProviderSettings::default(); + let client = hstreamdb::Client::new(server_url, channel_provider_settings).await?; + Ok(Client(client)) +} + +pub struct Client(hstreamdb::Client); + +impl Client { + pub async fn new_stream(&self) -> anyhow::Result { + let stream_name = rand_alphanumeric(20); + let stream = self + .0 + .create_stream(Stream { + stream_name, + replication_factor: 3, + backlog_duration: 60 * 60 * 24, + shard_count: 20, + creation_time: None, + }) + .await?; + Ok(stream) + } + + pub async fn new_subscription>( + &self, + stream_name: T, + ) -> anyhow::Result { + let subscription = self + .0 + .create_subscription(Subscription { + subscription_id: rand_alphanumeric(20), + stream_name: stream_name.into(), + ack_timeout_seconds: 60 * 15, + max_unacked_records: 1000, + offset: SpecialOffset::Earliest, + creation_time: None, + }) + .await?; + Ok(subscription) + } + + pub async fn new_sync_producer>( + &self, + stream_name: T, + ) -> anyhow::Result<(Appender, Producer)> { + let producer = self + .0 + .new_producer( + stream_name.into(), + CompressionType::None, + None, + FlushSettings::sync(), + ChannelProviderSettings::default(), + None, + ) + .await?; + Ok(producer) + } + + pub async fn write_rand>( + &self, + stream_name: T, + appender_num: usize, + record_num: usize, + payload_size: usize, + ) -> anyhow::Result<()> { + let (appender, producer) = self.new_sync_producer(stream_name).await?; + + for _ in 0..appender_num { + let appender = appender.clone(); + tokio::spawn(async move { + for _ in 0..record_num { + let payload = rand_alphanumeric(payload_size); + match appender + .append(Record { + partition_key: "".to_string(), + payload: hstreamdb::Payload::RawRecord(payload.into_bytes()), + }) + .await + { + Ok(_) => (), + Err(err) => log::error!("{}", err), + }; + } + }); + } + drop(appender); + producer.start().await; + Ok(()) + } + + pub async fn new_stream_subscription(&self) -> anyhow::Result<(Stream, Subscription)> { + let stream = self.new_stream().await?; + let stream_name = stream.stream_name.clone(); + let subscription = self.new_subscription(stream_name).await?; + Ok((stream, subscription)) + } +} diff --git a/src/hstreamdb/tests/integration_test.rs b/src/hstreamdb/tests/integration_test.rs new file mode 100644 index 0000000..6701c09 --- /dev/null +++ b/src/hstreamdb/tests/integration_test.rs @@ -0,0 +1,23 @@ +mod common; + +use common::init_client; + +#[tokio::test(flavor = "multi_thread")] +async fn utils_base_test() { + let client = init_client().await.unwrap(); + + let stream = client.new_stream().await.unwrap(); + let stream_name = &stream.stream_name; + let _subscription = client.new_subscription(stream_name).await.unwrap(); + let _producer = client.new_sync_producer(stream_name).await.unwrap(); + + let appender_num = 5; + let record_num = 50; + let payload_size = 2000; + client + .write_rand(stream_name, appender_num, record_num, payload_size) + .await + .unwrap(); + + client.new_stream_subscription().await.unwrap(); +} diff --git a/src/hstreamdb/tests/tls_test.rs b/src/hstreamdb/tests/tls_test.rs index 3177294..5c708ed 100644 --- a/src/hstreamdb/tests/tls_test.rs +++ b/src/hstreamdb/tests/tls_test.rs @@ -1,32 +1,39 @@ -// use std::{env, fs}; +use std::env; -// use hstreamdb::tls::{Certificate, ClientTlsConfig, Identity}; -// use hstreamdb::{ChannelProviderSettings, Client}; +use hstreamdb::tls::{Certificate, ClientTlsConfig, Identity}; +use hstreamdb::{ChannelProviderSettings, Client}; -// async fn _test_tls_impl() { -// env::set_var("RUST_LOG", "DEBUG"); -// env_logger::init(); +#[tokio::test(flavor = "multi_thread")] +async fn test_tls() { + if let Ok(_) = env::var("ENDPOINT") { + test_tls_impl().await + } else { + log::warn!("cloud endpoint is not presented"); + log::warn!("ignore tls tests"); + } +} -// let server_url: &str = todo!(); -// let tls_dir: &str = todo!(); +async fn test_tls_impl() { + env::set_var("RUST_LOG", "DEBUG"); + env_logger::init(); -// let ca_certificate = -// Certificate::from_pem(fs::read(format!("{tls_dir}/root_ca.crt")).unwrap()); -// let cert = fs::read(format!("{tls_dir}/client.crt")).unwrap(); -// let key = fs::read(format!("{tls_dir}/client.key")).unwrap(); + let server_url: String = env::var("ENDPOINT").unwrap(); + let ca_certificate: String = env::var("ROOT_CA").unwrap(); + let cert = env::var("CLIENT_CRT").unwrap(); + let key = env::var("CLIENT_KEY").unwrap(); -// let client = Client::new( -// server_url, -// ChannelProviderSettings::builder() -// .set_tls_config( -// ClientTlsConfig::new() -// .ca_certificate(ca_certificate) -// .identity(Identity::from_pem(cert, key)), -// ) -// .build(), -// ) -// .await -// .unwrap(); + let client = Client::new( + server_url, + ChannelProviderSettings::builder() + .set_tls_config( + ClientTlsConfig::new() + .ca_certificate(Certificate::from_pem(ca_certificate)) + .identity(Identity::from_pem(cert, key)), + ) + .build(), + ) + .await + .unwrap(); -// log::info!("{:?}", client.list_streams().await.unwrap()); -// } + log::info!("{:?}", client.list_streams().await.unwrap()); +} From bbaf9a08b77bb817515e675c299e6a53788758f6 Mon Sep 17 00:00:00 2001 From: Alissa Tung Date: Wed, 1 Feb 2023 15:24:22 +0800 Subject: [PATCH 2/5] refine --- src/hstreamdb/tests/common.rs | 2 +- src/hstreamdb/tests/tls_test.rs | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/hstreamdb/tests/common.rs b/src/hstreamdb/tests/common.rs index 02463ba..1dc107f 100644 --- a/src/hstreamdb/tests/common.rs +++ b/src/hstreamdb/tests/common.rs @@ -13,7 +13,7 @@ pub async fn init_client() -> anyhow::Result { Ok(Client(client)) } -pub struct Client(hstreamdb::Client); +pub struct Client(pub hstreamdb::Client); impl Client { pub async fn new_stream(&self) -> anyhow::Result { diff --git a/src/hstreamdb/tests/tls_test.rs b/src/hstreamdb/tests/tls_test.rs index 5c708ed..f004eb7 100644 --- a/src/hstreamdb/tests/tls_test.rs +++ b/src/hstreamdb/tests/tls_test.rs @@ -1,3 +1,5 @@ +mod common; + use std::env; use hstreamdb::tls::{Certificate, ClientTlsConfig, Identity}; @@ -36,4 +38,12 @@ async fn test_tls_impl() { .unwrap(); log::info!("{:?}", client.list_streams().await.unwrap()); + + let client = common::Client(client); + + let (stream, _sub) = client.new_stream_subscription().await.unwrap(); + client + .write_rand(stream.stream_name, 10, 2000, 1000) + .await + .unwrap(); } From 9388ae769c9d39e9138de1effcdc905fefcef26e Mon Sep 17 00:00:00 2001 From: Alissa Tung Date: Wed, 1 Feb 2023 15:31:05 +0800 Subject: [PATCH 3/5] fix --- src/hstreamdb/tests/common.rs | 5 +++++ src/hstreamdb/tests/tls_test.rs | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/hstreamdb/tests/common.rs b/src/hstreamdb/tests/common.rs index 1dc107f..b5a8a42 100644 --- a/src/hstreamdb/tests/common.rs +++ b/src/hstreamdb/tests/common.rs @@ -13,6 +13,11 @@ pub async fn init_client() -> anyhow::Result { Ok(Client(client)) } +#[tokio::test(flavor = "multi_thread")] +async fn make_ci_happy() { + init_client().await.unwrap().0.list_streams().await.unwrap(); +} + pub struct Client(pub hstreamdb::Client); impl Client { diff --git a/src/hstreamdb/tests/tls_test.rs b/src/hstreamdb/tests/tls_test.rs index f004eb7..83205ff 100644 --- a/src/hstreamdb/tests/tls_test.rs +++ b/src/hstreamdb/tests/tls_test.rs @@ -7,7 +7,7 @@ use hstreamdb::{ChannelProviderSettings, Client}; #[tokio::test(flavor = "multi_thread")] async fn test_tls() { - if let Ok(_) = env::var("ENDPOINT") { + if env::var("ENDPOINT").is_ok() { test_tls_impl().await } else { log::warn!("cloud endpoint is not presented"); From a42c5428db1023c6a2f3575b1c4e8275e423b7a7 Mon Sep 17 00:00:00 2001 From: Alissa Tung Date: Wed, 1 Feb 2023 16:02:01 +0800 Subject: [PATCH 4/5] more tests --- src/hstreamdb/src/common.rs | 4 +++- src/hstreamdb/src/consumer.rs | 13 +++++++++-- src/hstreamdb/src/lib.rs | 3 ++- src/hstreamdb/tests/common.rs | 31 +++++++++++++++++++++++-- src/hstreamdb/tests/consumer_test.rs | 1 - src/hstreamdb/tests/integration_test.rs | 20 +++++++++++++++- src/x/hstreamdb-erl-nifs/src/lib.rs | 1 - 7 files changed, 64 insertions(+), 9 deletions(-) diff --git a/src/hstreamdb/src/common.rs b/src/hstreamdb/src/common.rs index 3705f26..bddd295 100644 --- a/src/hstreamdb/src/common.rs +++ b/src/hstreamdb/src/common.rs @@ -9,9 +9,11 @@ use tonic::transport; use crate::producer; +pub type SubscriptionId = String; + #[derive(Debug)] pub struct Subscription { - pub subscription_id: String, + pub subscription_id: SubscriptionId, pub stream_name: String, pub ack_timeout_seconds: i32, pub max_unacked_records: i32, diff --git a/src/hstreamdb/src/consumer.rs b/src/hstreamdb/src/consumer.rs index ba30200..f42d4f8 100644 --- a/src/hstreamdb/src/consumer.rs +++ b/src/hstreamdb/src/consumer.rs @@ -4,18 +4,27 @@ use prost_types::Struct; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::UnboundedSender; use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_stream::StreamExt; use tonic::{Request, Streaming}; use crate::client::Client; use crate::common::{self, Payload}; use crate::utils::decode_received_records; +pub struct ConsumerStream(UnboundedReceiverStream<(Payload, Responder)>); + +impl ConsumerStream { + pub async fn next(&mut self) -> Option<(Payload, Responder)> { + self.0.next().await + } +} + impl Client { pub async fn streaming_fetch( &self, consumer_name: String, subscription_id: String, - ) -> common::Result> { + ) -> common::Result { let url = self.lookup_subscription(subscription_id.clone()).await?; log::debug!("lookup subscription for {subscription_id}, url = {url}"); let mut channel = self.channels.channel_at(url).await?; @@ -47,7 +56,7 @@ impl Client { sender, )); - Ok(UnboundedReceiverStream::new(receiver)) + Ok(ConsumerStream(UnboundedReceiverStream::new(receiver))) } } diff --git a/src/hstreamdb/src/lib.rs b/src/hstreamdb/src/lib.rs index 75c2e71..e669340 100644 --- a/src/hstreamdb/src/lib.rs +++ b/src/hstreamdb/src/lib.rs @@ -16,5 +16,6 @@ pub use channel_provider::ChannelProviderSettings; pub use client::Client; pub use common::{ CompressionType, Consumer, Error, ListValue, Payload, Record, RecordId, Result, ShardId, - SpecialOffset, Stream, StreamShardOffset, Struct, Subscription, Timestamp, + SpecialOffset, Stream, StreamShardOffset, Struct, Subscription, SubscriptionId, Timestamp, }; +pub use consumer::ConsumerStream; diff --git a/src/hstreamdb/tests/common.rs b/src/hstreamdb/tests/common.rs index b5a8a42..fc2c9d6 100644 --- a/src/hstreamdb/tests/common.rs +++ b/src/hstreamdb/tests/common.rs @@ -3,7 +3,7 @@ use std::env; use hstreamdb::appender::Appender; use hstreamdb::common::{CompressionType, SpecialOffset, Stream}; use hstreamdb::producer::{FlushSettings, Producer}; -use hstreamdb::{ChannelProviderSettings, Record, Subscription}; +use hstreamdb::{ChannelProviderSettings, ConsumerStream, Record, Subscription, SubscriptionId}; use hstreamdb_test_utils::rand_alphanumeric; pub async fn init_client() -> anyhow::Result { @@ -15,7 +15,15 @@ pub async fn init_client() -> anyhow::Result { #[tokio::test(flavor = "multi_thread")] async fn make_ci_happy() { - init_client().await.unwrap().0.list_streams().await.unwrap(); + let client = init_client().await.unwrap(); + let (stream, sub) = client.new_stream_subscription().await.unwrap(); + let mut consumer = client.new_consumer(sub.subscription_id).await.unwrap(); + let (appender, producer) = client.new_sync_producer(stream.stream_name).await.unwrap(); + appender.append(rand_raw_record(4500)).await.unwrap(); + producer.start().await; + while let Some(x) = consumer.next().await { + x.1.ack().unwrap(); + } } pub struct Client(pub hstreamdb::Client); @@ -110,4 +118,23 @@ impl Client { let subscription = self.new_subscription(stream_name).await?; Ok((stream, subscription)) } + + pub async fn new_consumer( + &self, + subscription_id: SubscriptionId, + ) -> anyhow::Result { + let fetching_stream = self + .0 + .streaming_fetch(rand_alphanumeric(20), subscription_id) + .await + .unwrap(); + Ok(fetching_stream) + } +} + +pub fn rand_raw_record(len: usize) -> Record { + Record { + partition_key: rand_alphanumeric(10), + payload: hstreamdb::Payload::RawRecord(rand_alphanumeric(len).into_bytes()), + } } diff --git a/src/hstreamdb/tests/consumer_test.rs b/src/hstreamdb/tests/consumer_test.rs index e65a4c6..3ffec7d 100644 --- a/src/hstreamdb/tests/consumer_test.rs +++ b/src/hstreamdb/tests/consumer_test.rs @@ -6,7 +6,6 @@ use hstreamdb::producer::FlushSettings; use hstreamdb::{ChannelProviderSettings, Subscription}; use hstreamdb_pb::{SpecialOffset, Stream}; use hstreamdb_test_utils::rand_alphanumeric; -use tokio_stream::StreamExt; #[tokio::test(flavor = "multi_thread")] async fn test_consumer() { diff --git a/src/hstreamdb/tests/integration_test.rs b/src/hstreamdb/tests/integration_test.rs index 6701c09..ff79870 100644 --- a/src/hstreamdb/tests/integration_test.rs +++ b/src/hstreamdb/tests/integration_test.rs @@ -1,6 +1,6 @@ mod common; -use common::init_client; +use common::{init_client, rand_raw_record}; #[tokio::test(flavor = "multi_thread")] async fn utils_base_test() { @@ -21,3 +21,21 @@ async fn utils_base_test() { client.new_stream_subscription().await.unwrap(); } + +#[tokio::test(flavor = "multi_thread")] +async fn sync_producer_should_be_sync() { + let client = init_client().await.unwrap(); + + let (stream, sub) = client.new_stream_subscription().await.unwrap(); + let stream_name = &stream.stream_name; + let (appender, producer) = client.new_sync_producer(stream_name).await.unwrap(); + let mut fetching_stream = client.new_consumer(sub.subscription_id).await.unwrap(); + + tokio::spawn(producer.start()); + + for _ in 0..50 { + appender.append(rand_raw_record(200)).await.unwrap(); + let (_, responder) = fetching_stream.next().await.unwrap(); + responder.ack().unwrap() + } +} diff --git a/src/x/hstreamdb-erl-nifs/src/lib.rs b/src/x/hstreamdb-erl-nifs/src/lib.rs index 52cd3dd..e365fbe 100644 --- a/src/x/hstreamdb-erl-nifs/src/lib.rs +++ b/src/x/hstreamdb-erl-nifs/src/lib.rs @@ -20,7 +20,6 @@ use rustler::{ ResourceArc, Term, }; use tokio::sync::{oneshot, Mutex, MutexGuard}; -use tokio_stream::StreamExt; use tonic::transport::{Certificate, ClientTlsConfig, Identity}; mod runtime; From 4e82196768edba7f303889545417b56b5110e0dd Mon Sep 17 00:00:00 2001 From: alissa-tung Date: Fri, 17 Feb 2023 19:01:14 +0800 Subject: [PATCH 5/5] fix --- src/hstreamdb/src/consumer.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/hstreamdb/src/consumer.rs b/src/hstreamdb/src/consumer.rs index f42d4f8..97d8391 100644 --- a/src/hstreamdb/src/consumer.rs +++ b/src/hstreamdb/src/consumer.rs @@ -10,6 +10,7 @@ use tonic::{Request, Streaming}; use crate::client::Client; use crate::common::{self, Payload}; use crate::utils::decode_received_records; +use crate::SubscriptionId; pub struct ConsumerStream(UnboundedReceiverStream<(Payload, Responder)>); @@ -23,7 +24,7 @@ impl Client { pub async fn streaming_fetch( &self, consumer_name: String, - subscription_id: String, + subscription_id: SubscriptionId, ) -> common::Result { let url = self.lookup_subscription(subscription_id.clone()).await?; log::debug!("lookup subscription for {subscription_id}, url = {url}");