Skip to content

Commit

Permalink
Merge branch 'master' into zero_copy_schema
Browse files Browse the repository at this point in the history
  • Loading branch information
numinnex committed Mar 1, 2025
2 parents c6930c0 + ddfa081 commit c272164
Show file tree
Hide file tree
Showing 13 changed files with 111 additions and 87 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/backwards_compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ jobs:
clean: false

- name: Build iggy-server (PR)
run: IGGY_CI_BUILD=true cargo build --bin iggy-server
run: IGGY_CI_BUILD=true cargo build

- name: Restore local_data directory (PR)
run: cp -r ../local_data .
Expand Down
52 changes: 26 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ To help us incorporate your changes efficiently, please adhere to the following
## Commit Message Rules

- **Description**: Provide a concise description of the changes.
- **Tags**: Avoid using GitHub tags like `feat:` or `chore:`.
- **Style**: Use an imperative style in the subject line (e.g., "Fix bug" rather than "Fixed bug" or "Fixes bug").
- **Brevity**: Keep the subject line under 80 characters.
- **Rationale**: Explain the 'why' and 'what' of your changes in the summary.
Expand Down
2 changes: 1 addition & 1 deletion bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ toml = "0.8.20"
tracing = { version = "0.1.41" }
tracing-appender = "0.2.3"
tracing-subscriber = { version = "0.3.19", features = ["fmt", "env-filter"] }
uuid = { version = "1.13.2", features = ["serde"] }
uuid = { version = "1.14.0", features = ["serde"] }

[[bin]]
name = "iggy-bench"
Expand Down
2 changes: 1 addition & 1 deletion bench/report/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sysinfo = "0.33.1"
tracing = "0.1"
uuid = { version = "1.13.2", features = ["serde"] }
uuid = { version = "1.14.0", features = ["serde"] }
2 changes: 1 addition & 1 deletion examples/src/multi-tenant/consumer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> {
}

print_info("Disconnecting root client");
root_client.disconnect().await?;
root_client.shutdown().await?;

print_info("Creating clients for each tenant");
let mut tenants = Vec::new();
Expand Down
2 changes: 1 addition & 1 deletion examples/src/multi-tenant/producer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ async fn main() -> anyhow::Result<(), Box<dyn Error>> {
}

print_info("Disconnecting root client");
root_client.disconnect().await?;
root_client.shutdown().await?;

print_info("Creating clients for each tenant");
let mut tenants = Vec::new();
Expand Down
4 changes: 2 additions & 2 deletions integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ iggy = { path = "../sdk", features = ["iggy-cli"] }
keyring = "3.6.1"
lazy_static = "1.5.0"
libc = "0.2.169"
log = "0.4.25"
log = "0.4.26"
predicates = "3.1.3"
regex = "1.11.1"
serial_test = "3.2.0"
Expand All @@ -29,7 +29,7 @@ test-case = "3.3.1"
tokio = { version = "1.43.0", features = ["full"] }
tracing-subscriber = { version = "0.3.19", features = ["fmt", "env-filter"] }
twox-hash = { version = "2.1.0", features = ["xxhash32"] }
uuid = { version = "1.13.2", features = ["v7", "fast-rng", "zerocopy"] }
uuid = { version = "1.14.0", features = ["v7", "fast-rng", "zerocopy"] }
zip = "2.2.2"

# Some tests are failing in CI due to lack of IPv6 interfaces
Expand Down
2 changes: 1 addition & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ tokio-rustls = { version = "0.26.1" }
toml = "0.8.20"
tracing = { version = "0.1.41" }
trait-variant = { version = "0.1.2" }
uuid = { version = "1.13.2", features = ["v7", "fast-rng", "zerocopy"] }
uuid = { version = "1.14.0", features = ["v7", "fast-rng", "zerocopy"] }
webpki-roots = { version = "0.26.8" }

[build-dependencies]
Expand Down
5 changes: 4 additions & 1 deletion sdk/src/models/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,14 +424,17 @@ impl<'msg> Iterator for IggyMessageIterator<'msg> {
pos += 4;
let id = u128::from_le_bytes(data[pos..pos + 16].try_into().ok()?);
pos += 16;
/*
let total_length = u64::from_le_bytes(data[pos..pos + 8].try_into().ok()?);
// Subtract the id and payload_length field
let msg_len = total_length - 16 - 8;
pos += 8;
let payload_length = u64::from_le_bytes(data[pos..pos + 8].try_into().ok()?);
pos += 8;
let headers_length = msg_len - payload_length;

*/
let payload_length = decode_var(&data[pos..], &mut pos);
let headers_length = decode_var(&data[pos..], &mut pos);
let payload = &data[pos..pos + payload_length as usize];
pos += payload_length as usize;
let headers = &data[pos..pos + headers_length as usize];
Expand Down
9 changes: 6 additions & 3 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.213"
version = "0.4.214"
edition = "2021"
build = "src/build.rs"
license = "Apache-2.0"
Expand Down Expand Up @@ -64,6 +64,9 @@ opentelemetry_sdk = { version = "0.28.0", features = [
"logs",
"trace",
"tokio",
"experimental_async_runtime",
"experimental_logs_batch_log_processor_with_async_runtime",
"experimental_trace_batch_span_processor_with_async_runtime"
] }
prometheus-client = "0.23.1"
quinn = { version = "0.11.6" }
Expand All @@ -72,7 +75,7 @@ reqwest = { version = "0.12.12", features = [
"rustls-tls",
"rustls-tls-no-provider",
] }
ring = "0.17.9"
ring = "0.17.10"
rust-s3 = { version = "0.35.1", features = ["default"] }
rustls = { version = "0.23.23" }
rustls-pemfile = "2.2.0"
Expand All @@ -99,7 +102,7 @@ tracing-opentelemetry = { version = "0.29.0" }
tracing-subscriber = { version = "0.3.19", features = ["fmt", "env-filter"] }
twox-hash = { version = "2.1.0", features = ["xxhash32"] }
ulid = "1.2.0"
uuid = { version = "1.13.2", features = ["v7", "fast-rng", "zerocopy"] }
uuid = { version = "1.14.0", features = ["v7", "fast-rng", "zerocopy"] }

[dev-dependencies]
mockall = "0.13.1"
Expand Down
65 changes: 41 additions & 24 deletions server/src/log/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use opentelemetry::trace::TracerProvider;
use opentelemetry::KeyValue;
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::{WithExportConfig, WithHttpConfig};
use opentelemetry_sdk::logs::log_processor_with_async_runtime;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::runtime;
use opentelemetry_sdk::trace::span_processor_with_async_runtime;
use opentelemetry_sdk::Resource;
use std::io::{self, Write};
use std::path::PathBuf;
Expand Down Expand Up @@ -183,18 +186,25 @@ impl Logging {
.expect("Failed to initialize gRPC logger."),
)
.build(),
TelemetryTransport::HTTP => opentelemetry_sdk::logs::SdkLoggerProvider::builder()
.with_resource(resource.clone())
.with_batch_exporter(
opentelemetry_otlp::LogExporter::builder()
.with_http()
.with_http_client(reqwest::Client::new())
.with_endpoint(self.telemetry_config.logs.endpoint.clone())
.with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
.build()
.expect("Failed to initialize HTTP logger."),
)
.build(),
TelemetryTransport::HTTP => {
let log_exporter = opentelemetry_otlp::LogExporter::builder()
.with_http()
.with_http_client(reqwest::Client::new())
.with_endpoint(self.telemetry_config.logs.endpoint.clone())
.with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
.build()
.expect("Failed to initialize HTTP logger.");
opentelemetry_sdk::logs::SdkLoggerProvider::builder()
.with_resource(resource.clone())
.with_log_processor(
log_processor_with_async_runtime::BatchLogProcessor::builder(
log_exporter,
runtime::Tokio,
)
.build(),
)
.build()
}
};

let tracer_provider = match self.telemetry_config.traces.transport {
Expand All @@ -208,18 +218,25 @@ impl Logging {
.expect("Failed to initialize gRPC tracer."),
)
.build(),
TelemetryTransport::HTTP => opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_resource(resource.clone())
.with_batch_exporter(
opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_http_client(reqwest::Client::new())
.with_endpoint(self.telemetry_config.traces.endpoint.clone())
.with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
.build()
.expect("Failed to initialize HTTP tracer."),
)
.build(),
TelemetryTransport::HTTP => {
let trace_exporter = opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_http_client(reqwest::Client::new())
.with_endpoint(self.telemetry_config.traces.endpoint.clone())
.with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
.build()
.expect("Failed to initialize HTTP tracer.");
opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_resource(resource.clone())
.with_span_processor(
span_processor_with_async_runtime::BatchSpanProcessor::builder(
trace_exporter,
runtime::Tokio,
)
.build(),
)
.build()
}
};

let tracer = tracer_provider.tracer(service_name);
Expand Down
Loading

0 comments on commit c272164

Please sign in to comment.