Skip to content

Commit

Permalink
Extend topic with compression_algorithm field. (#858)
Browse files Browse the repository at this point in the history
BREAKING_CHANGE Added new field (compression_algorithm) to the topic
struct.
  • Loading branch information
numinnex authored Mar 31, 2024
1 parent 051b1b2 commit 7efdf99
Show file tree
Hide file tree
Showing 69 changed files with 309 additions and 47 deletions.
14 changes: 13 additions & 1 deletion .github/workflows/backwards_compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
check_commit_message:
runs-on: ubuntu-latest
outputs:
should_skip: ${{ steps.check_commits.outputs.skip || steps.check_pr_body.outputs.skip }}
should_skip: ${{ steps.check_skip.outputs.skip }}
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -61,6 +61,18 @@ jobs:
echo "skip=false" >> $GITHUB_OUTPUT
echo "'BREAKING_CHANGE' not found in pull request body, setting skip=false"
fi
- name: Print commits output
run: echo "${{steps.check_commits.outputs.skip}}"
- name: Print pr_body output
run: echo "${{steps.check_pr_body.outputs.skip}}"
- name: Check For Skip Condition
id: check_skip
run: |
if [[ ${{ steps.check_commits.outputs.skip }} || ${{ steps.check_pr_body.outputs.skip }} ]]; then
echo "skip=true" >> $GITHUB_OUTPUT
else
echo "skip=false" >> $GITHUB_OUTPUT
fi
build_and_test:
runs-on: ubuntu-latest
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions bench/src/benchmarks/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
};
use async_trait::async_trait;
use futures::Future;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::{
client::{StreamClient, TopicClient},
clients::client::{IggyClient, IggyClientBackgroundConfig},
Expand Down Expand Up @@ -94,6 +95,7 @@ pub trait Benchmarkable {
stream_id: Identifier::numeric(stream_id)?,
topic_id: Some(topic_id),
partitions_count,
compression_algorithm: CompressionAlgorithm::None,
name,
message_expiry: None,
max_topic_size: None,
Expand Down
7 changes: 7 additions & 0 deletions cli/src/args/topic.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::args::common::ListMode;
use clap::{Args, Subcommand};
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::identifier::Identifier;
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::expiry::IggyExpiry;
Expand Down Expand Up @@ -94,6 +95,9 @@ pub(crate) struct TopicCreateArgs {
pub(crate) topic_id: Option<u32>,
/// Number of partitions inside the topic
pub(crate) partitions_count: u32,
/// Compression algorithm for the topic, set to "none" for no compression
#[arg(value_parser = clap::value_parser!(CompressionAlgorithm), verbatim_doc_comment)]
pub(crate) compression_algorithm: CompressionAlgorithm,
/// Max topic size
///
/// ("unlimited" or skipping parameter disables max topic size functionality in topic)
Expand Down Expand Up @@ -138,6 +142,9 @@ pub(crate) struct TopicUpdateArgs {
pub(crate) topic_id: Identifier,
/// New name for the topic
pub(crate) name: String,
/// Compression algorithm for the topic, set to "none" for no compression
#[arg(value_parser = clap::value_parser!(CompressionAlgorithm), verbatim_doc_comment)]
pub(crate) compression_algorithm: CompressionAlgorithm,
/// New max topic size
///
/// ("unlimited" or skipping parameter causes removal of max topic size parameter in topic)
Expand Down
2 changes: 2 additions & 0 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ fn get_command(
args.stream_id.clone(),
args.topic_id,
args.partitions_count,
args.compression_algorithm,
args.name.clone(),
args.message_expiry.clone().into(),
args.max_topic_size,
Expand All @@ -104,6 +105,7 @@ fn get_command(
TopicAction::Update(args) => Box::new(UpdateTopicCmd::new(
args.stream_id.clone(),
args.topic_id.clone(),
args.compression_algorithm,
args.name.clone(),
args.message_expiry.clone().into(),
args.max_topic_size,
Expand Down
2 changes: 2 additions & 0 deletions examples/src/getting-started/producer/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use iggy::clients::next_builder::IggyClientNextBuilder;
use iggy::clients::next_client::IggyClientNext;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::messages::send_messages::{Message, Partitioning};
use iggy::next_client::{ClientNext, StreamClientNext, TopicClientNext, UserClientNext};
use iggy::users::defaults::*;
Expand Down Expand Up @@ -48,6 +49,7 @@ async fn init_system(client: &IggyClientNext) {
&STREAM_ID.try_into().unwrap(),
"sample-topic",
1,
CompressionAlgorithm::default(),
None,
Some(TOPIC_ID),
IggyExpiry::NeverExpire,
Expand Down
3 changes: 3 additions & 0 deletions examples/src/shared/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub struct Args {
#[arg(long, default_value = "1")]
pub partitions_count: u32,

#[arg(long, default_value = "1")]
pub compression_algorithm: u8,

#[arg(long, default_value = "1")]
pub consumer_kind: u8,

Expand Down
2 changes: 2 additions & 0 deletions examples/src/shared/system.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::shared::args::Args;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::consumer::{Consumer, ConsumerKind};
use iggy::error::IggyError;
use iggy::identifier::Identifier;
Expand Down Expand Up @@ -69,6 +70,7 @@ pub async fn init_by_producer(args: &Args, client: &dyn ClientNext) -> Result<()
&args.stream_id.try_into()?,
"orders",
args.partitions_count,
CompressionAlgorithm::from_code(args.compression_algorithm)?,
None,
Some(args.topic_id),
IggyExpiry::NeverExpire,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl IggyCmdTestCase for TestConsumerGroupCreateCmd {
stream_id: Identifier::numeric(self.stream_id).unwrap(),
topic_id: Some(self.topic_id),
partitions_count: 0,
compression_algorithm: Default::default(),
name: self.topic_name.clone(),
message_expiry: None,
max_topic_size: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl IggyCmdTestCase for TestConsumerGroupDeleteCmd {
stream_id: Identifier::numeric(self.stream_id).unwrap(),
topic_id: Some(self.topic_id),
partitions_count: 0,
compression_algorithm: Default::default(),
name: self.topic_name.clone(),
message_expiry: None,
max_topic_size: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl IggyCmdTestCase for TestConsumerGroupGetCmd {
stream_id: Identifier::numeric(self.stream_id).unwrap(),
topic_id: Some(self.topic_id),
partitions_count: 0,
compression_algorithm: Default::default(),
name: self.topic_name.clone(),
message_expiry: None,
max_topic_size: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl IggyCmdTestCase for TestConsumerGroupListCmd {
stream_id: Identifier::numeric(self.stream_id).unwrap(),
topic_id: Some(self.topic_id),
partitions_count: 0,
compression_algorithm: Default::default(),
name: self.topic_name.clone(),
message_expiry: None,
max_topic_size: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl IggyCmdTestCase for TestConsumerOffsetGetCmd {
stream_id: Identifier::numeric(self.stream_id).unwrap(),
topic_id: Some(self.topic_id),
partitions_count: 1,
compression_algorithm: Default::default(),
name: self.topic_name.clone(),
message_expiry: None,
max_topic_size: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl IggyCmdTestCase for TestConsumerOffsetSetCmd {
stream_id: Identifier::numeric(self.stream_id).unwrap(),
topic_id: Some(self.topic_id),
partitions_count: 1,
compression_algorithm: Default::default(),
name: self.topic_name.clone(),
message_expiry: None,
max_topic_size: None,
Expand Down
1 change: 1 addition & 0 deletions integration/tests/cli/message/test_message_poll_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl IggyCmdTestCase for TestMessagePollCmd {
stream_id: Identifier::numeric(self.stream_id).unwrap(),
topic_id: Some(self.topic_id),
partitions_count: self.partitions_count,
compression_algorithm: Default::default(),
name: self.topic_name.clone(),
message_expiry: None,
max_topic_size: None,
Expand Down
1 change: 1 addition & 0 deletions integration/tests/cli/message/test_message_send_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ impl IggyCmdTestCase for TestMessageSendCmd {
stream_id: Identifier::numeric(self.stream_id).unwrap(),
topic_id: Some(self.topic_id),
partitions_count: self.partitions_count,
compression_algorithm: Default::default(),
name: self.topic_name.clone(),
message_expiry: None,
max_topic_size: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::cli::common::{
};
use assert_cmd::assert::Assert;
use async_trait::async_trait;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::streams::create_stream::CreateStream;
use iggy::streams::delete_stream::DeleteStream;
use iggy::topics::create_topic::CreateTopic;
Expand All @@ -19,6 +20,7 @@ struct TestPartitionCreateCmd {
topic_id: u32,
topic_name: String,
partitions_count: u32,
compression_algorithm: CompressionAlgorithm,
new_partitions: u32,
using_stream_id: TestStreamId,
using_topic_id: TestTopicId,
Expand All @@ -32,6 +34,7 @@ impl TestPartitionCreateCmd {
topic_id: u32,
topic_name: String,
partitions_count: u32,
compression_algorithm: CompressionAlgorithm,
new_partitions: u32,
using_stream_id: TestStreamId,
using_topic_id: TestTopicId,
Expand All @@ -42,6 +45,7 @@ impl TestPartitionCreateCmd {
topic_id,
topic_name,
partitions_count,
compression_algorithm,
new_partitions,
using_stream_id,
using_topic_id,
Expand Down Expand Up @@ -81,6 +85,7 @@ impl IggyCmdTestCase for TestPartitionCreateCmd {
stream_id: Identifier::numeric(self.stream_id).unwrap(),
topic_id: Some(self.topic_id),
partitions_count: self.partitions_count,
compression_algorithm: self.compression_algorithm,
name: self.topic_name.clone(),
message_expiry: None,
max_topic_size: None,
Expand Down Expand Up @@ -167,6 +172,7 @@ pub async fn should_be_successful() {
1,
String::from("sync"),
1,
Default::default(),
1,
TestStreamId::Numeric,
TestTopicId::Numeric,
Expand All @@ -179,6 +185,7 @@ pub async fn should_be_successful() {
3,
String::from("topic"),
3,
Default::default(),
2,
TestStreamId::Named,
TestTopicId::Numeric,
Expand All @@ -191,6 +198,7 @@ pub async fn should_be_successful() {
1,
String::from("probe"),
0,
Default::default(),
4,
TestStreamId::Numeric,
TestTopicId::Named,
Expand All @@ -203,6 +211,7 @@ pub async fn should_be_successful() {
5,
String::from("test"),
4,
Default::default(),
1,
TestStreamId::Named,
TestTopicId::Named,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ impl IggyCmdTestCase for TestPartitionDeleteCmd {
stream_id: Identifier::numeric(self.stream_id).unwrap(),
topic_id: Some(self.topic_id),
partitions_count: self.partitions_count,
compression_algorithm: Default::default(),
name: self.topic_name.clone(),
message_expiry: None,
max_topic_size: None,
Expand Down
1 change: 1 addition & 0 deletions integration/tests/cli/stream/test_stream_purge_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl IggyCmdTestCase for TestStreamPurgeCmd {
stream_id: Identifier::numeric(self.stream_id).unwrap(),
topic_id: Some(self.topic_id),
partitions_count: 10,
compression_algorithm: Default::default(),
message_expiry: None,
max_topic_size: None,
replication_factor: 1,
Expand Down
1 change: 1 addition & 0 deletions integration/tests/cli/system/test_stats_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl IggyCmdTestCase for TestStatsCmd {
topic_id: Some(1),
stream_id,
partitions_count: 5,
compression_algorithm: Default::default(),
message_expiry: None,
max_topic_size: None,
replication_factor: 1,
Expand Down
Loading

0 comments on commit 7efdf99

Please sign in to comment.