Skip to content

Commit ce3becc

Browse files
arov00Alexander Rovnergaetancollaud
authored
Feature/GH-314 ingest (#317)
* add olap DB ingestion functionality * remove redundant functionality, add tests * add export endpoint * add endpoints for json export * test data export * force-load the driver * increase flush interval * fix error message * pr feedback * document database url param * add graphql + getHistory query * Revert "add graphql + getHistory query" This reverts commit 2d4a5e2. * simplify config * update versions --------- Co-authored-by: Alexander Rovner <[email protected]> Co-authored-by: Gaétan Collaud <[email protected]>
1 parent 136d8a6 commit ce3becc

File tree

8 files changed

+570
-4
lines changed

8 files changed

+570
-4
lines changed

aggregator/pom.xml

+16-3
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
<skipITs>false</skipITs>
1414
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
1515
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
16-
<quarkus.platform.version>3.15.1</quarkus.platform.version>
16+
<quarkus.platform.version>3.18.2</quarkus.platform.version>
1717

1818
<sonar.organization>spoud</sonar.organization>
1919
<sonar.host.url>https://sonarcloud.io</sonar.host.url>
@@ -39,10 +39,23 @@
3939
</dependencies>
4040
</dependencyManagement>
4141
<dependencies>
42+
<dependency>
43+
<groupId>org.duckdb</groupId>
44+
<artifactId>duckdb_jdbc</artifactId>
45+
<version>1.2.0</version>
46+
</dependency>
4247
<dependency>
4348
<groupId>io.quarkus</groupId>
4449
<artifactId>quarkus-micrometer-registry-prometheus</artifactId>
4550
</dependency>
51+
<dependency>
52+
<groupId>io.quarkus</groupId>
53+
<artifactId>quarkus-scheduler</artifactId>
54+
</dependency>
55+
<dependency>
56+
<groupId>io.quarkus</groupId>
57+
<artifactId>quarkus-rest</artifactId>
58+
</dependency>
4659
<dependency>
4760
<groupId>io.quarkus</groupId>
4861
<artifactId>quarkus-arc</artifactId>
@@ -81,11 +94,11 @@
8194
</dependency>
8295
<dependency>
8396
<groupId>io.quarkus</groupId>
84-
<artifactId>quarkus-rest-client-reactive</artifactId>
97+
<artifactId>quarkus-rest</artifactId>
8598
</dependency>
8699
<dependency>
87100
<groupId>io.quarkus</groupId>
88-
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
101+
<artifactId>quarkus-messaging-kafka</artifactId>
89102
</dependency>
90103
<dependency>
91104
<groupId>io.quarkus</groupId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
package io.spoud.kcc.aggregator.olap;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import io.quarkus.logging.Log;
6+
import io.quarkus.runtime.Startup;
7+
import io.quarkus.scheduler.Scheduled;
8+
import io.spoud.kcc.data.AggregatedDataWindowed;
9+
import jakarta.annotation.PostConstruct;
10+
import jakarta.enterprise.context.ApplicationScoped;
11+
import lombok.RequiredArgsConstructor;
12+
import org.apache.commons.codec.digest.DigestUtils;
13+
14+
import java.nio.file.Path;
15+
import java.sql.Connection;
16+
import java.sql.DriverManager;
17+
import java.sql.PreparedStatement;
18+
import java.sql.SQLException;
19+
import java.time.Duration;
20+
import java.time.Instant;
21+
import java.time.ZoneOffset;
22+
import java.util.HashSet;
23+
import java.util.Optional;
24+
import java.util.Queue;
25+
import java.util.Set;
26+
import java.util.UUID;
27+
import java.util.concurrent.ConcurrentLinkedQueue;
28+
29+
@Startup
30+
@RequiredArgsConstructor
31+
@ApplicationScoped
32+
public class AggregatedMetricsRepository {
33+
private final OlapConfigProperties olapConfig;
34+
private final Queue<AggregatedDataWindowed> rowBuffer = new ConcurrentLinkedQueue<>();
35+
private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
36+
private Connection connection;
37+
38+
private static void ensureIdentifierIsSafe(String identifier) {
39+
if (!identifier.matches("^[a-zA-Z0-9_]+$")) {
40+
throw new IllegalArgumentException("Invalid identifier. Expected only letters, numbers, and underscores");
41+
}
42+
}
43+
44+
@PostConstruct
45+
public void init() {
46+
if (olapConfig.enabled()) {
47+
getConnection().ifPresent((conn) -> {
48+
try {
49+
createTableIfNotExists(conn);
50+
} catch (SQLException e) {
51+
Log.error("Failed to create OLAP table", e);
52+
}
53+
});
54+
} else {
55+
Log.info("OLAP module is disabled.");
56+
}
57+
}
58+
59+
private Optional<Connection> getConnection() {
60+
try {
61+
if (connection == null) {
62+
Class.forName("org.duckdb.DuckDBDriver"); // force load the driver
63+
connection = DriverManager.getConnection(olapConfig.databaseUrl());
64+
}
65+
return Optional.of(connection);
66+
} catch (Exception e) {
67+
Log.warn("Failed to get read-write connection to OLAP database", e);
68+
return Optional.empty();
69+
}
70+
}
71+
72+
private void createTableIfNotExists(Connection connection) throws SQLException {
73+
try (var statement = connection.createStatement()) {
74+
statement.execute("""
75+
CREATE TABLE IF NOT EXISTS main.aggregated_data (
76+
start_time TIMESTAMPTZ NOT NULL,
77+
end_time TIMESTAMPTZ NOT NULL,
78+
initial_metric_name VARCHAR NOT NULL,
79+
entity_type VARCHAR NOT NULL,
80+
name VARCHAR NOT NULL,
81+
tags JSON NOT NULL,
82+
context JSON NOT NULL,
83+
value DOUBLE NOT NULL,
84+
target VARCHAR NOT NULL,
85+
id VARCHAR PRIMARY KEY
86+
)
87+
""");
88+
Log.infof("Created OLAP DB table: main.aggregated_data");
89+
}
90+
}
91+
92+
@Scheduled(every = "${cc.olap.database.flush-interval.seconds}s")
93+
public void flushToDb() {
94+
getConnection().ifPresent((conn) -> {
95+
var skipped = 0;
96+
var count = 0;
97+
var startTime = Instant.now();
98+
try (var stmt = conn.prepareStatement("INSERT OR REPLACE INTO main.aggregated_data VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) {
99+
for (var metric = rowBuffer.poll(); metric != null; metric = rowBuffer.poll()) {
100+
Log.debugv("Ingesting metric: {0}", metric);
101+
var start = metric.getStartTime();
102+
var end = metric.getEndTime();
103+
var tags = "";
104+
var context = "";
105+
try {
106+
tags = OBJECT_MAPPER.writeValueAsString(metric.getTags());
107+
context = OBJECT_MAPPER.writeValueAsString(metric.getContext());
108+
} catch (JsonProcessingException e) {
109+
Log.warn("Failed to serialize tags or context. Skipping metric...", e);
110+
skipped++;
111+
continue;
112+
}
113+
var target = metric.getContext().getOrDefault("topic", "unknown"); // for now, the only possible target is the topic
114+
var id = DigestUtils.sha1Hex(String.valueOf(start) +
115+
end +
116+
metric.getInitialMetricName() +
117+
metric.getEntityType().name() +
118+
metric.getName() +
119+
tags +
120+
context +
121+
target);
122+
stmt.setObject(1, start.atOffset(ZoneOffset.UTC));
123+
stmt.setObject(2, end.atOffset(ZoneOffset.UTC));
124+
stmt.setString(3, metric.getInitialMetricName());
125+
stmt.setString(4, metric.getEntityType().name());
126+
stmt.setString(5, metric.getName());
127+
stmt.setString(6, tags);
128+
stmt.setString(7, context);
129+
stmt.setDouble(8, metric.getValue());
130+
stmt.setString(9, target);
131+
stmt.setString(10, id);
132+
stmt.addBatch();
133+
count++;
134+
}
135+
stmt.executeBatch();
136+
} catch (SQLException e) {
137+
Log.error("Failed to ingest ALL metrics to OLAP database", e);
138+
return;
139+
}
140+
if (count != 0 || skipped != 0) {
141+
Log.infof("Ingested %d metrics. Skipped %d metrics. Duration: %s", count, skipped, Duration.between(startTime, Instant.now()));
142+
}
143+
});
144+
}
145+
146+
public void insertRow(AggregatedDataWindowed row) {
147+
if (!olapConfig.enabled()) {
148+
return;
149+
}
150+
rowBuffer.add(row);
151+
if (rowBuffer.size() >= olapConfig.databaseMaxBufferedRows()) {
152+
flushToDb();
153+
}
154+
}
155+
156+
public Set<String> getAllTagKeys() {
157+
return getAllJsonKeys("tags");
158+
}
159+
160+
public Set<String> getAllContextKeys() {
161+
return getAllJsonKeys("context");
162+
}
163+
164+
public Set<String> getAllMetrics() {
165+
return getConnection()
166+
.map(conn -> {
167+
try (var statement = conn.prepareStatement("SELECT DISTINCT initial_metric_name FROM main.aggregated_data")) {
168+
var result = statement.executeQuery();
169+
var metrics = new HashSet<String>();
170+
while (result.next()) {
171+
metrics.add(result.getString(1));
172+
}
173+
return metrics;
174+
} catch (SQLException e) {
175+
Log.error("Failed to get all metrics", e);
176+
}
177+
return new HashSet<String>();
178+
})
179+
.orElse(new HashSet<>());
180+
}
181+
182+
private Set<String> getAllJsonKeys(String column) {
183+
return getConnection()
184+
.map(conn -> {
185+
try (var statement = conn.prepareStatement("SELECT unnest(json_keys( " + column + " )) FROM main.aggregated_data")) {
186+
return getStatementResultAsStrings(statement, true);
187+
} catch (Exception e) {
188+
Log.error("Failed to get keys of column: " + column, e);
189+
}
190+
return new HashSet<String>();
191+
})
192+
.orElse(new HashSet<>());
193+
}
194+
195+
private Set<String> getAllJsonKeyValues(String column, String key) {
196+
ensureIdentifierIsSafe(key);
197+
return getConnection()
198+
.map(conn -> {
199+
try (var statement = conn.prepareStatement("SELECT DISTINCT %s->>'%s' FROM main.aggregated_data".formatted(column, key))) {
200+
return getStatementResultAsStrings(statement, false);
201+
} catch (Exception e) {
202+
Log.error("Failed to get keys of column: " + column, e);
203+
}
204+
return new HashSet<String>();
205+
})
206+
.orElse(new HashSet<>());
207+
}
208+
209+
private Set<String> getStatementResultAsStrings(PreparedStatement statement, boolean removeBrackets) throws SQLException {
210+
var result = statement.executeQuery();
211+
var keys = new HashSet<String>();
212+
while (result.next()) {
213+
var keyValue = result.getString(1);
214+
if (keyValue != null) {
215+
keys.add(removeBrackets && keyValue.endsWith("]") && keyValue.startsWith("[") ?
216+
keyValue.substring(1, keyValue.length() - 1) : keyValue);
217+
}
218+
}
219+
return keys;
220+
}
221+
222+
public Set<String> getAllTagValues(String tagKey) {
223+
return getAllJsonKeyValues("tags", tagKey);
224+
}
225+
226+
public Set<String> getAllContextValues(String contextKey) {
227+
return getAllJsonKeyValues("context", contextKey);
228+
}
229+
230+
public Path exportData(Instant startDate, Instant endDate, String format) {
231+
var finalFormat = (format == null ? "csv" : format).toLowerCase();
232+
var finalStartDate = startDate == null ? Instant.now().minus(Duration.ofDays(30)) : startDate;
233+
var finalEndDate = endDate == null ? Instant.now() : endDate;
234+
235+
Log.infof("Generating report for the period from %s to %s", finalStartDate, finalEndDate);
236+
237+
var tmpFileName = Path.of(System.getProperty("java.io.tmpdir"), "olap_export_" + UUID.randomUUID() + "." + finalFormat);
238+
return getConnection().map((conn) -> {
239+
try (var statement = conn.prepareStatement("COPY (SELECT * FROM main.aggregated_data WHERE start_time >= ? AND end_time <= ?) TO '" + tmpFileName + "'"
240+
+ (finalFormat.equals("csv") ? "(HEADER, DELIMITER ',')" : ""))) {
241+
statement.setObject(1, finalStartDate.atOffset(ZoneOffset.UTC));
242+
statement.setObject(2, finalEndDate.atOffset(ZoneOffset.UTC));
243+
statement.execute();
244+
return tmpFileName;
245+
} catch (SQLException e) {
246+
Log.error("Failed to export data", e);
247+
return null;
248+
}
249+
}).orElse(null);
250+
}
251+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package io.spoud.kcc.aggregator.olap;
2+
3+
import io.quarkus.logging.Log;
4+
import jakarta.inject.Inject;
5+
import jakarta.ws.rs.*;
6+
import jakarta.ws.rs.core.Response;
7+
import jakarta.ws.rs.core.StreamingOutput;
8+
import org.jboss.resteasy.reactive.RestQuery;
9+
10+
import java.nio.file.Files;
11+
import java.time.Instant;
12+
import java.util.Optional;
13+
14+
@Path("/olap/export")
15+
public class DataExportResource {
16+
@Inject
17+
AggregatedMetricsRepository aggregatedMetricsRepository;
18+
19+
@GET
20+
@Produces("text/csv")
21+
public Response genCsvExport(@RestQuery DateTimeParameter fromDate, @RestQuery DateTimeParameter toDate) {
22+
return serveExport("csv", fromDate, toDate);
23+
}
24+
25+
@GET
26+
@Produces("application/jsonl")
27+
public Response genJsonLinesExport(@RestQuery DateTimeParameter fromDate, @RestQuery DateTimeParameter toDate) {
28+
return serveExport("json", fromDate, toDate);
29+
}
30+
31+
// Note that this is just an alias for the application/jsonl endpoint. In both cases, json lines are returned.
32+
@GET
33+
@Produces("application/json")
34+
public Response genJsonExport(@RestQuery DateTimeParameter fromDate, @RestQuery DateTimeParameter toDate) {
35+
return serveExport("json", fromDate, toDate);
36+
}
37+
38+
private Response serveExport(String format, DateTimeParameter fromDate, DateTimeParameter toDate) {
39+
var exportPath = aggregatedMetricsRepository.exportData(
40+
Optional.ofNullable(fromDate).map(DateTimeParameter::instant).orElse(null),
41+
Optional.ofNullable(toDate).map(DateTimeParameter::instant).orElse(null), format);
42+
return Optional.ofNullable(exportPath)
43+
.map(path -> Response.ok().entity((StreamingOutput) output -> {
44+
try {
45+
Files.copy(path, output);
46+
} finally {
47+
Log.infof("Export file %s has been downloaded and will be deleted now.", path);
48+
Files.delete(path);
49+
}
50+
}).header("Content-Disposition", "attachment; filename=export." + format).build())
51+
.orElse(Response.status(204).build());
52+
}
53+
54+
public record DateTimeParameter(Instant instant) {
55+
public static DateTimeParameter fromString(String value) {
56+
try {
57+
return new DateTimeParameter(Instant.parse(value));
58+
} catch (Exception e) {
59+
throw new BadRequestException("Invalid date time format. Expected ISO-8601 date time in UTC time zone (e.g. 2020-01-01T00:00:00Z).");
60+
}
61+
}
62+
}
63+
}

0 commit comments

Comments
 (0)