Skip to content

Commit de9c79c

Browse files
Dynamic watches setup: wait until the first subscriber added (#232) (#234)
Signed-off-by: Danil-Grigorev <[email protected]>
1 parent 90ac3bb commit de9c79c

File tree

5 files changed

+51
-18
lines changed

5 files changed

+51
-18
lines changed

src/api/capi_cluster.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use std::collections::BTreeMap;
22

33
use cluster_api_rs::capi_cluster::{ClusterSpec, ClusterStatus};
4-
use fleet_api_rs::{fleet_bundle_namespace_mapping::BundleNamespaceMappingNamespaceSelector, fleet_clustergroup::{ClusterGroupSelector, ClusterGroupSpec}};
4+
use fleet_api_rs::{
5+
fleet_bundle_namespace_mapping::BundleNamespaceMappingNamespaceSelector,
6+
fleet_clustergroup::{ClusterGroupSelector, ClusterGroupSpec},
7+
};
58
use kube::{
69
api::{ObjectMeta, TypeMeta},
710
Resource, ResourceExt as _,

src/controller.rs

-6
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use clap::Parser;
1515
use futures::stream::SelectAll;
1616
use futures::{Stream, StreamExt};
1717

18-
use k8s_openapi::api::core::v1::Namespace;
1918
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{Condition, Time};
2019
use kube::api::{DynamicObject, Patch, PatchParams};
2120
use kube::core::DeserializeGuard;
@@ -33,7 +32,6 @@ use kube::{
3332
use kube::{Resource, ResourceExt};
3433

3534
use std::collections::BTreeMap;
36-
use std::future;
3735

3836
use std::ops::Deref;
3937
use std::pin::Pin;
@@ -280,10 +278,6 @@ pub async fn run_cluster_controller(state: State) {
280278
.expect("failed to create kube Client");
281279

282280
let (sub, reader) = state.dispatcher.subscribe();
283-
let sub = sub
284-
.map(|n: Arc<Namespace>| Ok(n.deref().clone()))
285-
.predicate_filter(predicates::labels)
286-
.filter_map(|n| future::ready(n.ok().map(Arc::new)));
287281
let ns_controller = Controller::for_shared_stream(sub, reader)
288282
.shutdown_on_signal()
289283
.run(

src/controllers/controller.rs

+28-10
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ where
7676
.map_err(GetOrCreateError::Create)?;
7777

7878
info!("Created fleet object");
79-
ctx.diagnostics
79+
match ctx
80+
.diagnostics
8081
.read()
8182
.await
8283
.recorder(ctx.client.clone())
@@ -95,7 +96,12 @@ where
9596
},
9697
&res.object_ref(&()),
9798
)
98-
.await?;
99+
.await
100+
{
101+
// Ignore forbidden errors on namespace deletion
102+
Err(kube::Error::Api(e)) if &e.reason == "Forbidden" => (),
103+
e => e?,
104+
};
99105

100106
Ok(Action::await_change())
101107
}
@@ -120,7 +126,8 @@ where
120126
.map_err(PatchError::Patch)?;
121127

122128
info!("Updated fleet object");
123-
ctx.diagnostics
129+
match ctx
130+
.diagnostics
124131
.read()
125132
.await
126133
.recorder(ctx.client.clone())
@@ -139,7 +146,12 @@ where
139146
},
140147
&res.object_ref(&()),
141148
)
142-
.await?;
149+
.await
150+
{
151+
// Ignore forbidden errors on namespace deletion
152+
Err(kube::Error::Api(e)) if &e.reason == "Forbidden" => (),
153+
e => e?,
154+
};
143155

144156
Ok(Action::await_change())
145157
}
@@ -193,7 +205,12 @@ where
193205
}
194206

195207
async fn cleanup(&self, ctx: Arc<Context>) -> crate::Result<Action> {
196-
ctx.diagnostics
208+
if let Some(mut bundle) = self.to_bundle(ctx.clone()).await? {
209+
return Ok(bundle.cleanup(ctx).await?);
210+
}
211+
212+
match ctx
213+
.diagnostics
197214
.read()
198215
.await
199216
.recorder(ctx.client.clone())
@@ -208,11 +225,12 @@ where
208225
},
209226
&self.object_ref(&()),
210227
)
211-
.await?;
212-
213-
if let Some(mut bundle) = self.to_bundle(ctx.clone()).await? {
214-
return Ok(bundle.cleanup(ctx).await?);
215-
}
228+
.await
229+
{
230+
// Ignore forbidden errors on namespace deletion
231+
Err(kube::Error::Api(e)) if &e.reason == "Forbidden" => (),
232+
e => e?,
233+
};
216234

217235
Ok(Action::await_change())
218236
}

src/controllers/helm/install.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,14 @@ impl FleetOptions {
189189
pub fn patch_fleet(&self, version: &str) -> FleetPatchResult<Child> {
190190
let mut upgrade = Command::new("helm");
191191

192-
upgrade.args(["upgrade", "fleet", "fleet/fleet", "--reuse-values", "--version", version]);
192+
upgrade.args([
193+
"upgrade",
194+
"fleet",
195+
"fleet/fleet",
196+
"--reuse-values",
197+
"--version",
198+
version,
199+
]);
193200

194201
if !self.namespace.is_empty() {
195202
upgrade.args(["--namespace", &self.namespace]);

src/multi_dispatcher.rs

+11
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::{
33
pin::Pin,
44
sync::Arc,
55
task::{Context, Poll},
6+
time::Duration,
67
};
78

89
use async_broadcast::{InactiveReceiver, Receiver, Sender};
@@ -18,6 +19,7 @@ use kube::{
1819
};
1920
use pin_project::pin_project;
2021
use serde::de::DeserializeOwned;
22+
use tokio::time::sleep;
2123

2224
#[derive(Clone)]
2325
pub struct MultiDispatcher {
@@ -70,6 +72,11 @@ impl MultiDispatcher {
7072
}
7173
}
7274
}
75+
76+
// Subscribers count returns the number of current receiving streams
77+
pub(crate) fn subscribers_count(&self) -> usize {
78+
self.dispatch_tx.receiver_count()
79+
}
7380
}
7481

7582
/// `BroadcastStream` allows to stream shared list of dynamic objects,
@@ -223,6 +230,10 @@ where
223230
W: Stream<Item = Result<Event<DynamicObject>>> + Unpin,
224231
{
225232
stream! {
233+
while writer.subscribers_count() == 0 {
234+
sleep(Duration::from_millis(100)).await;
235+
}
236+
226237
while let Some(event) = broadcast.next().await {
227238
match event {
228239
Ok(ev) => {

0 commit comments

Comments
 (0)