Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/GH-314 ingest #317

Merged
merged 14 commits into from
Feb 10, 2025
19 changes: 16 additions & 3 deletions aggregator/pom.xml
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@
<skipITs>false</skipITs>
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>3.15.1</quarkus.platform.version>
<quarkus.platform.version>3.18.2</quarkus.platform.version>

<sonar.organization>spoud</sonar.organization>
<sonar.host.url>https://sonarcloud.io</sonar.host.url>
@@ -39,10 +39,23 @@
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.duckdb</groupId>
<artifactId>duckdb_jdbc</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
@@ -81,11 +94,11 @@
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-reactive</artifactId>
<artifactId>quarkus-rest</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
<artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
package io.spoud.kcc.aggregator.olap;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.quarkus.logging.Log;
import io.quarkus.runtime.Startup;
import io.quarkus.scheduler.Scheduled;
import io.spoud.kcc.data.AggregatedDataWindowed;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import lombok.RequiredArgsConstructor;
import org.apache.commons.codec.digest.DigestUtils;

import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.HashSet;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;

@Startup
@RequiredArgsConstructor
@ApplicationScoped
public class AggregatedMetricsRepository {
private final OlapConfigProperties olapConfig;
private final Queue<AggregatedDataWindowed> rowBuffer = new ConcurrentLinkedQueue<>();
private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private Connection connection;

private static void ensureIdentifierIsSafe(String identifier) {
if (!identifier.matches("^[a-zA-Z0-9_]+$")) {
throw new IllegalArgumentException("Invalid identifier. Expected only letters, numbers, and underscores");
}
}

@PostConstruct
public void init() {
if (olapConfig.enabled()) {
getConnection().ifPresent((conn) -> {
try {
createTableIfNotExists(conn);
} catch (SQLException e) {
Log.error("Failed to create OLAP table", e);
}
});
} else {
Log.info("OLAP module is disabled.");
}
}

private Optional<Connection> getConnection() {
try {
if (connection == null) {
Class.forName("org.duckdb.DuckDBDriver"); // force load the driver
connection = DriverManager.getConnection(olapConfig.databaseUrl());
}
return Optional.of(connection);
} catch (Exception e) {
Log.warn("Failed to get read-write connection to OLAP database", e);
return Optional.empty();
}
}

private void createTableIfNotExists(Connection connection) throws SQLException {
try (var statement = connection.createStatement()) {
statement.execute("""
CREATE TABLE IF NOT EXISTS main.aggregated_data (
start_time TIMESTAMPTZ NOT NULL,
end_time TIMESTAMPTZ NOT NULL,
initial_metric_name VARCHAR NOT NULL,
entity_type VARCHAR NOT NULL,
name VARCHAR NOT NULL,
tags JSON NOT NULL,
context JSON NOT NULL,
value DOUBLE NOT NULL,
target VARCHAR NOT NULL,
id VARCHAR PRIMARY KEY
)
""");
Log.infof("Created OLAP DB table: main.aggregated_data");
}
}

@Scheduled(every = "${cc.olap.database.flush-interval.seconds}s")
public void flushToDb() {
getConnection().ifPresent((conn) -> {
var skipped = 0;
var count = 0;
var startTime = Instant.now();
try (var stmt = conn.prepareStatement("INSERT OR REPLACE INTO main.aggregated_data VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) {
for (var metric = rowBuffer.poll(); metric != null; metric = rowBuffer.poll()) {
Log.debugv("Ingesting metric: {0}", metric);
var start = metric.getStartTime();
var end = metric.getEndTime();
var tags = "";
var context = "";
try {
tags = OBJECT_MAPPER.writeValueAsString(metric.getTags());
context = OBJECT_MAPPER.writeValueAsString(metric.getContext());
} catch (JsonProcessingException e) {
Log.warn("Failed to serialize tags or context. Skipping metric...", e);
skipped++;
continue;
}
var target = metric.getContext().getOrDefault("topic", "unknown"); // for now, the only possible target is the topic
var id = DigestUtils.sha1Hex(String.valueOf(start) +
end +
metric.getInitialMetricName() +
metric.getEntityType().name() +
metric.getName() +
tags +
context +
target);
stmt.setObject(1, start.atOffset(ZoneOffset.UTC));
stmt.setObject(2, end.atOffset(ZoneOffset.UTC));
stmt.setString(3, metric.getInitialMetricName());
stmt.setString(4, metric.getEntityType().name());
stmt.setString(5, metric.getName());
stmt.setString(6, tags);
stmt.setString(7, context);
stmt.setDouble(8, metric.getValue());
stmt.setString(9, target);
stmt.setString(10, id);
stmt.addBatch();
count++;
}
stmt.executeBatch();
} catch (SQLException e) {
Log.error("Failed to ingest ALL metrics to OLAP database", e);
return;
}
if (count != 0 || skipped != 0) {
Log.infof("Ingested %d metrics. Skipped %d metrics. Duration: %s", count, skipped, Duration.between(startTime, Instant.now()));
}
});
}

public void insertRow(AggregatedDataWindowed row) {
if (!olapConfig.enabled()) {
return;
}
rowBuffer.add(row);
if (rowBuffer.size() >= olapConfig.databaseMaxBufferedRows()) {
flushToDb();
}
}

public Set<String> getAllTagKeys() {
return getAllJsonKeys("tags");
}

public Set<String> getAllContextKeys() {
return getAllJsonKeys("context");
}

public Set<String> getAllMetrics() {
return getConnection()
.map(conn -> {
try (var statement = conn.prepareStatement("SELECT DISTINCT initial_metric_name FROM main.aggregated_data")) {
var result = statement.executeQuery();
var metrics = new HashSet<String>();
while (result.next()) {
metrics.add(result.getString(1));
}
return metrics;
} catch (SQLException e) {
Log.error("Failed to get all metrics", e);
}
return new HashSet<String>();
})
.orElse(new HashSet<>());
}

private Set<String> getAllJsonKeys(String column) {
return getConnection()
.map(conn -> {
try (var statement = conn.prepareStatement("SELECT unnest(json_keys( " + column + " )) FROM main.aggregated_data")) {
return getStatementResultAsStrings(statement, true);
} catch (Exception e) {
Log.error("Failed to get keys of column: " + column, e);
}
return new HashSet<String>();
})
.orElse(new HashSet<>());
}

private Set<String> getAllJsonKeyValues(String column, String key) {
ensureIdentifierIsSafe(key);
return getConnection()
.map(conn -> {
try (var statement = conn.prepareStatement("SELECT DISTINCT %s->>'%s' FROM main.aggregated_data".formatted(column, key))) {
return getStatementResultAsStrings(statement, false);
} catch (Exception e) {
Log.error("Failed to get keys of column: " + column, e);
}
return new HashSet<String>();
})
.orElse(new HashSet<>());
}

private Set<String> getStatementResultAsStrings(PreparedStatement statement, boolean removeBrackets) throws SQLException {
var result = statement.executeQuery();
var keys = new HashSet<String>();
while (result.next()) {
var keyValue = result.getString(1);
if (keyValue != null) {
keys.add(removeBrackets && keyValue.endsWith("]") && keyValue.startsWith("[") ?
keyValue.substring(1, keyValue.length() - 1) : keyValue);
}
}
return keys;
}

public Set<String> getAllTagValues(String tagKey) {
return getAllJsonKeyValues("tags", tagKey);
}

public Set<String> getAllContextValues(String contextKey) {
return getAllJsonKeyValues("context", contextKey);
}

public Path exportData(Instant startDate, Instant endDate, String format) {
var finalFormat = (format == null ? "csv" : format).toLowerCase();
var finalStartDate = startDate == null ? Instant.now().minus(Duration.ofDays(30)) : startDate;
var finalEndDate = endDate == null ? Instant.now() : endDate;

Log.infof("Generating report for the period from %s to %s", finalStartDate, finalEndDate);

var tmpFileName = Path.of(System.getProperty("java.io.tmpdir"), "olap_export_" + UUID.randomUUID() + "." + finalFormat);
return getConnection().map((conn) -> {
try (var statement = conn.prepareStatement("COPY (SELECT * FROM main.aggregated_data WHERE start_time >= ? AND end_time <= ?) TO '" + tmpFileName + "'"
+ (finalFormat.equals("csv") ? "(HEADER, DELIMITER ',')" : ""))) {
statement.setObject(1, finalStartDate.atOffset(ZoneOffset.UTC));
statement.setObject(2, finalEndDate.atOffset(ZoneOffset.UTC));
statement.execute();
return tmpFileName;
} catch (SQLException e) {
Log.error("Failed to export data", e);
return null;
}
}).orElse(null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package io.spoud.kcc.aggregator.olap;

import io.quarkus.logging.Log;
import jakarta.inject.Inject;
import jakarta.ws.rs.*;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.StreamingOutput;
import org.jboss.resteasy.reactive.RestQuery;

import java.nio.file.Files;
import java.time.Instant;
import java.util.Optional;

@Path("/olap/export")
public class DataExportResource {
@Inject
AggregatedMetricsRepository aggregatedMetricsRepository;

@GET
@Produces("text/csv")
public Response genCsvExport(@RestQuery DateTimeParameter fromDate, @RestQuery DateTimeParameter toDate) {
return serveExport("csv", fromDate, toDate);
}

@GET
@Produces("application/jsonl")
public Response genJsonLinesExport(@RestQuery DateTimeParameter fromDate, @RestQuery DateTimeParameter toDate) {
return serveExport("json", fromDate, toDate);
}

// Note that this is just an alias for the application/jsonl endpoint. In both cases, json lines are returned.
@GET
@Produces("application/json")
public Response genJsonExport(@RestQuery DateTimeParameter fromDate, @RestQuery DateTimeParameter toDate) {
return serveExport("json", fromDate, toDate);
}

private Response serveExport(String format, DateTimeParameter fromDate, DateTimeParameter toDate) {
var exportPath = aggregatedMetricsRepository.exportData(
Optional.ofNullable(fromDate).map(DateTimeParameter::instant).orElse(null),
Optional.ofNullable(toDate).map(DateTimeParameter::instant).orElse(null), format);
return Optional.ofNullable(exportPath)
.map(path -> Response.ok().entity((StreamingOutput) output -> {
try {
Files.copy(path, output);
} finally {
Log.infof("Export file %s has been downloaded and will be deleted now.", path);
Files.delete(path);
}
}).header("Content-Disposition", "attachment; filename=export." + format).build())
.orElse(Response.status(204).build());
}

public record DateTimeParameter(Instant instant) {
public static DateTimeParameter fromString(String value) {
try {
return new DateTimeParameter(Instant.parse(value));
} catch (Exception e) {
throw new BadRequestException("Invalid date time format. Expected ISO-8601 date time in UTC time zone (e.g. 2020-01-01T00:00:00Z).");
}
}
}
}
Loading