|
3 | 3 | import java.util.ArrayList;
|
4 | 4 | import java.util.List;
|
5 | 5 | import java.util.stream.Collectors;
|
| 6 | +import org.slf4j.Logger; |
| 7 | +import org.slf4j.LoggerFactory; |
6 | 8 | import org.springframework.boot.context.properties.ConfigurationProperties;
|
7 | 9 | import pl.allegro.tech.hermes.common.kafka.KafkaParameters;
|
8 | 10 | import pl.allegro.tech.hermes.infrastructure.dc.DatacenterNameProvider;
|
9 | 11 |
|
10 | 12 | @ConfigurationProperties(prefix = "frontend.kafka")
|
11 | 13 | public class KafkaClustersProperties {
|
12 | 14 |
|
| 15 | + Logger logger = LoggerFactory.getLogger(KafkaClustersProperties.class); |
| 16 | + |
13 | 17 | private List<KafkaProperties> clusters = new ArrayList<>();
|
14 | 18 |
|
15 | 19 | private String namespace = "";
|
@@ -66,11 +70,19 @@ public List<KafkaParameters> toRemoteKafkaProperties(
|
66 | 70 | "No properties for datacenter: " + currentDatacenterName + " defined."))
|
67 | 71 | .getRemoteDatacenters();
|
68 | 72 |
|
69 |
| - return this.clusters.stream() |
70 |
| - .filter( |
71 |
| - cluster -> |
72 |
| - remoteDatacenters.contains(cluster.getDatacenter()) |
73 |
| - && !cluster.getDatacenter().equals(currentDatacenterName)) |
74 |
| - .collect(Collectors.toList()); |
| 73 | + List<KafkaParameters> filteredClusters = |
| 74 | + this.clusters.stream() |
| 75 | + .filter( |
| 76 | + cluster -> |
| 77 | + remoteDatacenters.contains(cluster.getDatacenter()) |
| 78 | + && !cluster.getDatacenter().equals(currentDatacenterName)) |
| 79 | + .collect(Collectors.toList()); |
| 80 | + |
| 81 | + List<String> filteredRemoteDatacenters = |
| 82 | + filteredClusters.stream().map(KafkaParameters::getDatacenter).collect(Collectors.toList()); |
| 83 | + |
| 84 | + logger.info("Remote datacenters for {}: {}", currentDatacenterName, filteredRemoteDatacenters); |
| 85 | + |
| 86 | + return filteredClusters; |
75 | 87 | }
|
76 | 88 | }
|
0 commit comments