Skip to content

Commit

Permalink
fix: orchestrate adapter health check such that monitoring of adapter…
Browse files Browse the repository at this point in the history
…s works as expected (#2335)

* fix: orchestrate adapter health check such that monitoring of adapters works fine

* style: fix formatting
  • Loading branch information
bossenti authored Dec 15, 2023
1 parent 2246efd commit 7d4c1bc
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -66,28 +65,32 @@ public void run() {
* {@link org.apache.streampipes.manager.health.PipelineHealthCheck}).
*/
public void checkAndRestoreAdapters() {
// Get all running adapters
Map<String, AdapterDescription> allRunningInstancesAdapterDescriptions =
this.getAllRunningInstancesAdapterDescriptions();
// Get all adapters that are supposed to run according to the backend storage
Map<String, AdapterDescription> adapterInstancesSupposedToRun =
this.getAllAdaptersSupposedToRun();

// Get all worker containers that run adapters
// group all adapter instances supposed to run by their worker service URL
Map<String, List<AdapterDescription>> groupByWorker =
this.getAllWorkersWithAdapters(allRunningInstancesAdapterDescriptions);
this.getAllWorkersWithAdapters(adapterInstancesSupposedToRun);

// Get adapters that are not running anymore
Map<String, AdapterDescription> allAdaptersToRecover =
this.getAdaptersToRecover(groupByWorker, allRunningInstancesAdapterDescriptions);
this.getAdaptersToRecover(groupByWorker, adapterInstancesSupposedToRun);

try {
if (!allRunningInstancesAdapterDescriptions.isEmpty()) {
if (!adapterInstancesSupposedToRun.isEmpty()) {
// Filter adapters so that only healthy and running adapters are updated in the metrics endpoint
updateMonitoringMetrics(
allRunningInstancesAdapterDescriptions
.entrySet()
.stream()
.filter((entry -> !allAdaptersToRecover.containsKey(entry.getKey())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
);
var adaptersToMonitor = adapterInstancesSupposedToRun
.entrySet()
.stream()
.filter((entry -> !allAdaptersToRecover.containsKey(entry.getKey())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

if (!adaptersToMonitor.isEmpty()) {
updateMonitoringMetrics(adaptersToMonitor);
} else {
LOG.info("No running adapter instances to monitor.");
}
}
} catch (NoSuchElementException e) {
LOG.error("Could not update adapter metrics due to an invalid state. ({})", e.getMessage());
Expand Down Expand Up @@ -118,6 +121,15 @@ protected void updateMonitoringMetrics(Map<String, AdapterDescription> runningAd
}

private void updateTotalEventsPublished(AdapterMetrics adapterMetrics, String adapterId, String adapterName) {

// Check if the adapter is already registered; if not, register it first.
// This step is crucial, especially when the StreamPipes Core service is restarted,
// and there are existing running adapters that need proper registration.
// Note: Proper registration is usually handled during the initial start of the adapter.
if (!adapterMetrics.contains(adapterId)) {
adapterMetrics.register(adapterId, adapterName);
}

adapterMetrics.updateTotalEventsPublished(
adapterId,
adapterName,
Expand All @@ -128,7 +140,18 @@ private void updateTotalEventsPublished(AdapterMetrics adapterMetrics, String ad
}


public Map<String, AdapterDescription> getAllRunningInstancesAdapterDescriptions() {
/**
* Retrieves a map of all adapter instances that are supposed to be running according to the backend storage.
* <p>
* This method queries the adapter storage to obtain information about all adapters
* and filters the running instances. The resulting map is keyed by the element ID
* of each running adapter, and the corresponding values are the respective
* {@link AdapterDescription} objects.
*
* @return A map containing all adapter instances supposed to be running according to the backend storage.
* The keys are element IDs, and the values are the corresponding adapter descriptions.
*/
public Map<String, AdapterDescription> getAllAdaptersSupposedToRun() {
Map<String, AdapterDescription> result = new HashMap<>();
List<AdapterDescription> allRunningInstancesAdapterDescription = this.adapterStorage.getAllAdapters();
allRunningInstancesAdapterDescription
Expand All @@ -144,41 +167,69 @@ public Map<String, AdapterDescription> getAllRunningInstancesAdapterDescriptions
}

public Map<String, List<AdapterDescription>> getAllWorkersWithAdapters(
Map<String, AdapterDescription> allRunningInstancesAdapterDescription) {
Map<String, AdapterDescription> adapterInstancesSupposedToRun
) {

Map<String, List<AdapterDescription>> groupByWorker = new HashMap<>();
allRunningInstancesAdapterDescription.values().forEach(ad -> {
String selectedEndpointUrl = ad.getSelectedEndpointUrl();
if (selectedEndpointUrl != null) {
if (groupByWorker.containsKey(selectedEndpointUrl)) {
groupByWorker.get(selectedEndpointUrl).add(ad);
} else {
List<AdapterDescription> tmp = new ArrayList<>();
tmp.add(ad);
groupByWorker.put(selectedEndpointUrl, tmp);
}
}
});
adapterInstancesSupposedToRun.values()
.forEach(ad -> {
String selectedEndpointUrl = ad.getSelectedEndpointUrl();
if (selectedEndpointUrl != null) {
if (groupByWorker.containsKey(selectedEndpointUrl)) {
groupByWorker.get(selectedEndpointUrl)
.add(ad);
} else {
groupByWorker.put(selectedEndpointUrl, List.of(ad));
}
}
});

return groupByWorker;
}

/**
* Retrieves a map of adapters to recover by comparing the provided groupings of adapter instances
* with the instances supposed to run according to the storage.
* For every adapter instance it is verified that it actually runs on a worker node.
* If this is not the case, it is added to the output of adapters to recover.
*
* @param adapterInstancesGroupedByWorker A map grouping adapter instances by worker.
* @param adapterInstancesSupposedToRun The map containing all adapter instances supposed to be running.
* @return A new map containing adapter instances to recover, filtered based on running instances.
*/
public Map<String, AdapterDescription> getAdaptersToRecover(
Map<String, List<AdapterDescription>> groupByWorker,
Map<String, AdapterDescription> allRunningInstancesAdapterDescription) {
groupByWorker.keySet().forEach(adapterEndpointUrl -> {
try {
List<AdapterDescription> allRunningInstancesOfOneWorker =
WorkerRestClient.getAllRunningAdapterInstanceDescriptions(
adapterEndpointUrl + WorkerPaths.getRunningAdaptersPath());
allRunningInstancesOfOneWorker.forEach(adapterDescription ->
allRunningInstancesAdapterDescription.remove(adapterDescription.getElementId()));
} catch (AdapterException e) {
LOG.info("Could not recover adapter at endpoint {} due to {}", adapterEndpointUrl, e.getMessage());
}
});
Map<String, List<AdapterDescription>> adapterInstancesGroupedByWorker,
Map<String, AdapterDescription> adapterInstancesSupposedToRun
) {

return allRunningInstancesAdapterDescription;
// NOTE: This line is added to prevent modifying the existing map of instances supposed to run
// It looks like the parameter `adapterInstancesSupposedToRun` is not required at all,
// but this should be checked more carefully.
Map<String, AdapterDescription> adaptersToRecover = new HashMap<>(adapterInstancesSupposedToRun);

adapterInstancesGroupedByWorker.keySet()
.forEach(adapterEndpointUrl -> {
try {
List<AdapterDescription> allRunningInstancesOfOneWorker =
WorkerRestClient.getAllRunningAdapterInstanceDescriptions(
adapterEndpointUrl + WorkerPaths.getRunningAdaptersPath());

// only keep adapters where there is no running adapter instance
// therefore, all others are removed
allRunningInstancesOfOneWorker.forEach(
adapterDescription ->
adaptersToRecover.remove(
adapterDescription.getElementId()));
} catch (AdapterException e) {
LOG.info(
"Could not recover adapter at endpoint {} due to {}",
adapterEndpointUrl,
e.getMessage()
);
}
});

return adaptersToRecover;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.resource.management.SpResourceManager;
import org.apache.streampipes.storage.api.IAdapterStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;

import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -51,14 +50,13 @@ public void getAllRunningInstancesAdapterDescriptionsEmpty() {
var healthCheck = new AdapterHealthCheck(
adapterInstanceStorageMock,
new AdapterMasterManagement(
StorageDispatcher.INSTANCE.getNoSqlStore()
.getAdapterInstanceStorage(),
adapterInstanceStorageMock,
new SpResourceManager().manageAdapters(),
new SpResourceManager().manageDataStreams(),
AdapterMetricsManager.INSTANCE.getAdapterMetrics()
)
);
var result = healthCheck.getAllRunningInstancesAdapterDescriptions();
var result = healthCheck.getAllAdaptersSupposedToRun();

assertTrue(result.isEmpty());
}
Expand All @@ -82,14 +80,13 @@ public void getAllRunningInstancesAdapterDescriptionsMixed() {
var healthCheck = new AdapterHealthCheck(
adapterInstanceStorageMock,
new AdapterMasterManagement(
StorageDispatcher.INSTANCE.getNoSqlStore()
.getAdapterInstanceStorage(),
adapterInstanceStorageMock,
new SpResourceManager().manageAdapters(),
new SpResourceManager().manageDataStreams(),
AdapterMetricsManager.INSTANCE.getAdapterMetrics()
)
);
var result = healthCheck.getAllRunningInstancesAdapterDescriptions();
var result = healthCheck.getAllAdaptersSupposedToRun();

assertEquals(1, result.size());
assertTrue(result.containsKey(nameRunningAdapter));
Expand Down

0 comments on commit 7d4c1bc

Please sign in to comment.