The reference Java client that allows query, write and management (bucket, organization, users) for the InfluxDB 2.x.
- Querying data using Flux language
- Querying data using InfluxQL
- Writing data using
- InfluxDB 2.x Management API
- sources, buckets
- tasks
- authorizations
- health check
- Advanced Usage
For querying data we use QueryApi that allow perform synchronous, asynchronous and also use raw query response.
For POJO mapping, snake_case column names are mapped to camelCase field names if exact matches not found.
The synchronous query is not intended for large query results because the Flux response can be potentially unbound.
package example;
import java.util.List;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.QueryApi;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
public class SynchronousQuery {
private static char[] token = "my-token".toCharArray();
private static String org = "my-org";
public static void main(final String[] args) {
InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token, org);
String flux = "from(bucket:\"my-bucket\") |> range(start: 0)";
QueryApi queryApi = influxDBClient.getQueryApi();
//
// Query data
//
List<FluxTable> tables = queryApi.query(flux);
for (FluxTable fluxTable : tables) {
List<FluxRecord> records = fluxTable.getRecords();
for (FluxRecord fluxRecord : records) {
System.out.println(fluxRecord.getTime() + ": " + fluxRecord.getValueByKey("_value"));
}
}
influxDBClient.close();
}
}
The synchronous query offers a possibility map FluxRecords to POJO:
package example;
import java.time.Instant;
import java.util.List;
import com.influxdb.annotations.Column;
import com.influxdb.annotations.Measurement;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.QueryApi;
public class SynchronousQueryPojo {
private static char[] token = "my-token".toCharArray();
private static String org = "my-org";
public static void main(final String[] args) {
InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token, org);
//
// Query data
//
String flux = "from(bucket:\"my-bucket\") |> range(start: 0) |> filter(fn: (r) => r._measurement == \"temperature\")";
QueryApi queryApi = influxDBClient.getQueryApi();
//
// Map to POJO
//
List<Temperature> temperatures = queryApi.query(flux, Temperature.class);
for (Temperature temperature : temperatures) {
System.out.println(temperature.location + ": " + temperature.value + " at " + temperature.time);
}
influxDBClient.close();
}
@Measurement(name = "temperature")
public static class Temperature {
@Column(tag = true)
String location;
@Column
Double value;
@Column(timestamp = true)
Instant time;
}
}
The Asynchronous query offers possibility to process unbound query and allow user to handle exceptions, stop receiving more results and notify that all data arrived.
package example;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.QueryApi;
public class AsynchronousQuery {
private static char[] token = "my-token".toCharArray();
private static String org = "my-org";
public static void main(final String[] args) throws InterruptedException {
InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token, org);
//
// Query data
//
String flux = "from(bucket:\"my-bucket\") |> range(start: 0)";
QueryApi queryApi = influxDBClient.getQueryApi();
queryApi.query(flux, (cancellable, fluxRecord) -> {
//
// The callback to consume a FluxRecord.
//
// cancelable - object has the cancel method to stop asynchronous query
//
System.out.println(fluxRecord.getTime() + ": " + fluxRecord.getValueByKey("_value"));
}, throwable -> {
//
// The callback to consume any error notification.
//
System.out.println("Error occurred: " + throwable.getMessage());
}, () -> {
//
// The callback to consume a notification about successfully end of stream.
//
System.out.println("Query completed");
});
Thread.sleep(5_000);
influxDBClient.close();
}
}
And there is also a possibility map FluxRecords to POJO:
package example;
import java.time.Instant;
import com.influxdb.annotations.Column;
import com.influxdb.annotations.Measurement;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.QueryApi;
public class AsynchronousQueryPojo {
private static char[] token = "my-token".toCharArray();
private static String org = "my-org";
public static void main(final String[] args) throws InterruptedException {
InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token, org);
//
// Query data
//
String flux = "from(bucket:\"my-bucket\") |> range(start: 0) |> filter(fn: (r) => r._measurement == \"temperature\")";
QueryApi queryApi = influxDBClient.getQueryApi();
//
// Map to POJO
//
queryApi.query(flux, Temperature.class, (cancellable, temperature) -> {
//
// The callback to consume a FluxRecord mapped to POJO.
//
// cancelable - object has the cancel method to stop asynchronous query
//
System.out.println(temperature.location + ": " + temperature.value + " at " + temperature.time);
});
Thread.sleep(5_000);
influxDBClient.close();
}
@Measurement(name = "temperature")
public static class Temperature {
@Column(tag = true)
String location;
@Column
Double value;
@Column(timestamp = true)
Instant time;
}
}
The Raw query allows direct processing original CSV response:
package example;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.QueryApi;
public class RawQuery {
private static char[] token = "my-token".toCharArray();
private static String org = "my-org";
public static void main(final String[] args) {
InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token, org);
//
// Query data
//
String flux = "from(bucket:\"my-bucket\") |> range(start: 0)";
QueryApi queryApi = influxDBClient.getQueryApi();
String csv = queryApi.queryRaw(flux);
System.out.println("CSV response: " + csv);
influxDBClient.close();
}
}
The Asynchronous version allows processing line by line:
package example;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.QueryApi;
public class RawQueryAsynchronous {
private static char[] token = "my-token".toCharArray();
private static String org = "my-org";
public static void main(final String[] args) throws Exception {
InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token, org);
//
// Query data
//
String flux = "from(bucket:\"my-bucket\") |> range(start: 0)";
QueryApi queryApi = influxDBClient.getQueryApi();
queryApi.queryRaw(flux, (cancellable, line) -> {
//
// The callback to consume a line of CSV response
//
// cancelable - object has the cancel method to stop asynchronous query
//
System.out.println("Response: " + line);
});
Thread.sleep(5_000);
influxDBClient.close();
}
}
InfluxDB Cloud supports Parameterized Queries that let you dynamically change values in a query using the InfluxDB API. Parameterized queries make Flux queries more reusable and can also be used to help prevent injection attacks.
InfluxDB Cloud inserts the params object into the Flux query as a Flux record named params
. Use dot or bracket
notation to access parameters in the params
record in your Flux query. Parameterized Flux queries support only int
, float
, and string
data types. To convert the supported data types into
other Flux basic data types, use Flux type conversion functions.
Parameterized query example:
⚠️ Parameterized Queries are supported only in InfluxDB Cloud, currently there is no support in InfluxDB OSS.
package example;
import java.time.Instant;
import java.time.Period;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.QueryApi;
import com.influxdb.client.WriteApiBlocking;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.query.FluxTable;
public class ParameterizedQuery {
public static void main(String[] args) {
String url = "https://us-west-2-1.aws.cloud2.influxdata.com";
String token = "my-token";
String org = "my-org";
String bucket = "my-bucket";
try (InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray(), org, bucket)) {
QueryApi queryApi = client.getQueryApi();
Instant yesterday = Instant.now().minus(Period.ofDays(1));
Point p = Point.measurement("temperature")
.addTag("location", "north")
.addField("value", 60.0)
.time(yesterday, WritePrecision.NS);
WriteApiBlocking writeApi = client.getWriteApiBlocking();
writeApi.writePoint(p);
//
// Query range start parameter using Instant
//
Map<String, Object> params = new HashMap<>();
params.put("bucketParam", bucket);
params.put("startParam", yesterday.toString());
String parametrizedQuery = "from(bucket: params.bucketParam) |> range(start: time(v: params.startParam))";
List<FluxTable> query = queryApi.query(parametrizedQuery, org, params);
query.forEach(fluxTable -> fluxTable.getRecords()
.forEach(r -> System.out.println(r.getTime() + ": " + r.getValueByKey("_value"))));
//
// Query range start parameter using duration
//
params.put("startParam", "-1d10s");
parametrizedQuery = "from(bucket: params.bucketParam) |> range(start: duration(v: params.startParam))";
query = queryApi.query(parametrizedQuery, org, params);
query.forEach(fluxTable -> fluxTable.getRecords()
.forEach(r -> System.out.println(r.getTime() + ": " + r.getValueByKey("_value"))));
}
}
}
The InfluxQL
can be used with /query compatibility
endpoint which uses the database and retention policy specified in the query request to map the request to an InfluxDB bucket.
For more information, see: .
This is an example of how to use this library to run a query with influxQL:
package example;
import java.math.BigDecimal;
import java.time.Instant;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.InfluxQLQueryApi;
import com.influxdb.client.domain.InfluxQLQuery;
import com.influxdb.query.InfluxQLQueryResult;
public class InfluxQLExample {
private static char[] token = "my-token".toCharArray();
private static String org = "my-org";
private static String database = "my-org";
public static void main(final String[] args) {
try (InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token, org)) {
//
// Query data
//
String influxQL = "SELECT FIRST(\"free\") FROM \"influxql\"";
InfluxQLQueryApi queryApi = influxDBClient.getInfluxQLQueryApi();
// send request
InfluxQLQueryResult result = queryApi.query(new InfluxQLQuery(influxQL, database).setPrecision(InfluxQLQuery.InfluxQLPrecision.SECONDS),
(columnName, rawValue, resultIndex, seriesName) -> {
// convert columns
switch (columnName) {
case "time":
return Instant.ofEpochSecond(Long.parseLong(rawValue));
case "first":
return new BigDecimal(rawValue);
default:
throw new IllegalArgumentException("unexpected column " + columnName);
}
});
for (InfluxQLQueryResult.Result resultResult : result.getResults()) {
for (InfluxQLQueryResult.Series series : resultResult.getSeries()) {
for (InfluxQLQueryResult.Series.Record record : series.getValues()) {
System.out.println(record.getValueByKey("time") + ": " + record.getValueByKey("first"));
}
}
}
}
}
}
When the data are grouped by tag(s) using GROUP BY
clause, series tags are accessible
via InfluxQLQueryResult.Series.getTags()
method, eg.
...
for (InfluxQLQueryResult.Result resultResult : result.getResults()) {
for (InfluxQLQueryResult.Series series : resultResult.getSeries()) {
for (Map.Entry<String, String> tag : series.getTags().entrySet()) {
System.out.println(tag.getKey() + "=" + tag.getValue());
}
}
}
...
The client offers two types of API to ingesting data:
- Synchronous blocking API
- Asynchronous non-blocking API which supports batching, retrying and jittering
The WriteApiBlocking provides a synchronous blocking API to writing data using InfluxDB Line Protocol, Data Point and POJO.
It's up to user to handle a server or a http exception.
package example;
import java.time.Instant;
import com.influxdb.annotations.Column;
import com.influxdb.annotations.Measurement;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApiBlocking;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.exceptions.InfluxException;
public class WriteDataBlocking {
private static char[] token = "my-token".toCharArray();
private static String org = "my-org";
private static String bucket = "my-bucket";
public static void main(final String[] args) {
InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token, org, bucket);
WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
try {
//
// Write by LineProtocol
//
String record = "temperature,location=north value=60.0";
writeApi.writeRecord(WritePrecision.NS, record);
//
// Write by Data Point
//
Point point = Point.measurement("temperature")
.addTag("location", "west")
.addField("value", 55D)
.time(Instant.now().toEpochMilli(), WritePrecision.MS);
writeApi.writePoint(point);
//
// Write by POJO
//
Temperature temperature = new Temperature();
temperature.location = "south";
temperature.value = 62D;
temperature.time = Instant.now();
writeApi.writeMeasurement(WritePrecision.NS, temperature);
} catch (InfluxException ie) {
System.out.println("InfluxException: " + ie);
}
influxDBClient.close();
}
@Measurement(name = "temperature")
private static class Temperature {
@Column(tag = true)
String location;
@Column
Double value;
@Column(timestamp = true)
Instant time;
}
}
⚠️ TheWriteApi
is supposed to be use as a singleton. Don't create a new instance for every write!
For writing data we use WriteApi that is an asynchronous non-blocking API and supports:
- writing data using InfluxDB Line Protocol, Data Point, POJO
- use batching for writes
- use client backpressure strategy
- produces events that allow user to be notified and react to this events
WriteSuccessEvent
- published when arrived the success response from Platform serverBackpressureEvent
- published when is client backpressure appliedWriteErrorEvent
- published when occurs a unhandled exceptionWriteRetriableErrorEvent
- published when occurs a retriable error
- use GZIP compression for data
The writes are processed in batches which are configurable by WriteOptions
:
Property | Description | Default Value |
---|---|---|
batchSize | the number of data point to collect in batch | 1000 |
flushInterval | the number of milliseconds before the batch is written | 1000 |
jitterInterval | the number of milliseconds to increase the batch flush interval by a random amount | 0 |
retryInterval | the number of milliseconds to retry unsuccessful write. The retry interval is used when the InfluxDB server does not specify "Retry-After" header. | 5000 |
maxRetries | the number of max retries when write fails | 5 |
maxRetryDelay | the maximum delay between each retry attempt in milliseconds | 125_000 |
maxRetryTime | maximum total retry timeout in milliseconds | 180_000 |
exponentialBase | the base for the exponential retry delay, the next delay is computed using random exponential backoff as a random value within the interval retryInterval * exponentialBase^(attempts-1) and retryInterval * exponentialBase^(attempts) . Example for retryInterval=5_000, exponentialBase=2, maxRetryDelay=125_000, total=5 Retry delays are random distributed values within the ranges of [5_000-10_000, 10_000-20_000, 20_000-40_000, 40_000-80_000, 80_000-125_000] |
|
bufferLimit | the maximum number of unwritten stored points | 10000 |
backpressureStrategy | the strategy to deal with buffer overflow | DROP_OLDEST |
The backpressure presents the problem of what to do with a growing backlog of unconsumed data points.
The key feature of backpressure is to provide the capability to avoid consuming the unexpected amount of system resources.
This situation is not common and can be caused by several problems: generating too much measurements in short interval,
long term unavailability of the InfluxDB server, network issues.
The size of backlog is configured by
WriteOptions.bufferLimit
and backpressure strategy by WriteOptions.backpressureStrategy
.
DROP_OLDEST
- Drop the oldest data points from the backlogDROP_LATEST
- Drop the latest data points from the backlogERROR
- Signal a exceptionBLOCK
- (not implemented yet) Wait specified time for space in buffer to become availabletimeout
- how long to wait before giving upunit
- TimeUnit of the timeout
If is used the strategy DROP_OLDEST
or DROP_LATEST
there is a possibility to react on backpressure event and slowdown the producing new measurements:
WriteApi writeApi = influxDBClient.getWriteApi(writeOptions);
writeApi.listenEvents(BackpressureEvent.class, value -> {
//
// slowdown producers
//...
});
There is also a synchronous blocking version of WriteApi
- WriteApiBlocking.
Write Measurement into specified bucket:
package example;
import java.time.Instant;
import com.influxdb.annotations.Column;
import com.influxdb.annotations.Measurement;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
public class WritePojo {
private static char[] token = "my-token".toCharArray();
private static String org = "my-org";
private static String bucket = "my-bucket";
public static void main(final String[] args) {
InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token, org, bucket);
//
// Write data
//
try (WriteApi writeApi = influxDBClient.makeWriteApi()) {
//
// Write by POJO
//
Temperature temperature = new Temperature();
temperature.location = "south";
temperature.value = 62D;
temperature.time = Instant.now();
writeApi.writeMeasurement(WritePrecision.NS, temperature);
}
influxDBClient.close();
}
@Measurement(name = "temperature")
private static class Temperature {
@Column(tag = true)
String location;
@Column
Double value;
@Column(timestamp = true)
Instant time;
}
}
Write Data point into specified bucket:
package example;
import java.time.Instant;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
public class WriteDataPoint {
private static char[] token = "my-token".toCharArray();
private static String org = "my-org";
private static String bucket = "my-bucket";
public static void main(final String[] args) {
InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token, org, bucket);
//
// Write data
//
try (WriteApi writeApi = influxDBClient.makeWriteApi()) {
//
// Write by Data Point
//
Point point = Point.measurement("temperature")
.addTag("location", "west")
.addField("value", 55D)
.time(Instant.now().toEpochMilli(), WritePrecision.MS);
writeApi.writePoint(point);
}
influxDBClient.close();
}
}
Write Line Protocol record into specified bucket:
package example;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
public class WriteLineProtocol {
private static char[] token = "my-token".toCharArray();
private static String org = "my-org";
private static String bucket = "my-bucket";
public static void main(final String[] args) {
InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token, org, bucket);
//
// Write data
//
try (WriteApi writeApi = influxDBClient.makeWriteApi()) {
//
// Write by LineProtocol
//
String record = "temperature,location=north value=60.0";
writeApi.writeRecord(WritePrecision.NS, record);
}
influxDBClient.close();
}
}
Sometimes is useful to store same information in every measurement e.g. hostname
, location
, customer
.
The client is able to use static value, system property or env property as a tag value.
The expressions:
California Miner
- static value${version}
- system property${env.hostname}
- environment property
In a configuration file you are able to specify default tags by influx2.measurement
prefix.
influx2.tags.id = 132-987-655
influx2.tags.customer = California Miner
influx2.tags.hostname = ${env.hostname}
influx2.tags.sensor-version = ${version}
InfluxDBClientOptions options = InfluxDBClientOptions.builder()
.url(url)
.authenticateToken(token)
.addDefaultTag("id", "132-987-655")
.addDefaultTag("customer", "California Miner")
.addDefaultTag("hostnamer", "${env.hostname}")
.addDefaultTag("sensor-version", "${version}")
.build();
Both of configurations will produce the Line protocol:
mine-sensor,id=132-987-655,customer="California Miner",hostname=example.com,sensor-version=v1.00 altitude=10
WriteApi writeApi = influxDBClient.makeWriteApi();
writeApi.listenEvents(WriteSuccessEvent.class, event -> {
String data = event.getLineProtocol();
//
// handle success
//
});
WriteApi writeApi = influxDBClient.makeWriteApi();
writeApi.listenEvents(WriteErrorEvent.class, event -> {
Throwable exception = event.getThrowable();
//
// handle error
//
});
The client has following management API:
API endpoint | Description | Javadoc |
---|---|---|
/api/v2/authorizations | Managing authorization data | AuthorizationsApi |
/api/v2/buckets | Managing bucket data | BucketsApi |
/api/v2/orgs | Managing organization data | OrganizationsApi |
/api/v2/users | Managing user data | UsersApi |
/api/v2/sources | Managing sources | SourcesApi |
/api/v2/tasks | Managing one-off and recurring tasks | TasksApi |
/api/v2/scrapers | Managing ScraperTarget data | ScraperTargetsApi |
/api/v2/labels | Managing resource labels | LabelsApi |
/api/v2/telegrafs | Managing telegraf config data | TelegrafsApi |
/api/v2/setup | Managing onboarding setup | InfluxDBClient#onBoarding() |
/ready | Get the readiness of an instance at startup | InfluxDBClient#ready() |
/health | Get the health of an instance anytime during execution | InfluxDBClient#health() |
The following example demonstrates how to use a InfluxDB 2.x Management API. For further information see endpoints Javadoc.
package example;
import java.util.Arrays;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.domain.Authorization;
import com.influxdb.client.domain.Bucket;
import com.influxdb.client.domain.Permission;
import com.influxdb.client.domain.PermissionResource;
import com.influxdb.client.domain.BucketRetentionRules;
public class InfluxDB2ManagementExample {
private static char[] token = "my-token".toCharArray();
public static void main(final String[] args) {
InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token);
//
// Create bucket "iot_bucket" with data retention set to 3,600 seconds
//
BucketRetentionRules retention = new BucketRetentionRules();
retention.setEverySeconds(3600);
Bucket bucket = influxDBClient.getBucketsApi().createBucket("iot-bucket", retention, "12bdc4164c2e8141");
//
// Create access token to "iot_bucket"
//
PermissionResource resource = new PermissionResource();
resource.setId(bucket.getId());
resource.setOrgID("12bdc4164c2e8141");
resource.setType(PermissionResource.TYPE_BUCKETS);
// Read permission
Permission read = new Permission();
read.setResource(resource);
read.setAction(Permission.ActionEnum.READ);
// Write permission
Permission write = new Permission();
write.setResource(resource);
write.setAction(Permission.ActionEnum.WRITE);
Authorization authorization = influxDBClient.getAuthorizationsApi()
.createAuthorization("12bdc4164c2e8141", Arrays.asList(read, write));
//
// Created token that can be use for writes to "iot_bucket"
//
String token = authorization.getToken();
System.out.println("Token: " + token);
influxDBClient.close();
}
}
The WriteApiBlocking provides a synchronous blocking API to writing data using InfluxDB Line Protocol, Data Point and POJO.
It's up to user to handle a server or a http exception.
package example;
import java.time.Instant;
import com.influxdb.annotations.Column;
import com.influxdb.annotations.Measurement;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApiBlocking;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.exceptions.InfluxException;
public class WriteDataBlocking {
private static char[] token = "my-token".toCharArray();
private static String org = "my-org";
private static String bucket = "my-bucket";
public static void main(final String[] args) {
InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token, org, bucket);
WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
try {
//
// Write by LineProtocol
//
String record = "temperature,location=north value=60.0";
writeApi.writeRecord(WritePrecision.NS, record);
//
// Write by Data Point
//
Point point = Point.measurement("temperature")
.addTag("location", "west")
.addField("value", 55D)
.time(Instant.now().toEpochMilli(), WritePrecision.MS);
writeApi.writePoint(point);
//
// Write by POJO
//
Temperature temperature = new Temperature();
temperature.location = "south";
temperature.value = 62D;
temperature.time = Instant.now();
writeApi.writeMeasurement(WritePrecision.NS, temperature);
} catch (InfluxException ie) {
System.out.println("InfluxException: " + ie);
}
influxDBClient.close();
}
@Measurement(name = "temperature")
private static class Temperature {
@Column(tag = true)
String location;
@Column
Double value;
@Column(timestamp = true)
Instant time;
}
}
The example below show how to create a check for monitoring a stock price. A Slack notification is created if the price is lesser than 35
.
The Check set status to Critical
if the current
value for a stock
measurement is lesser than 35
.
Organization org = ...;
String query = "from(bucket: \"my-bucket\") "
+ "|> range(start: v.timeRangeStart, stop: v.timeRangeStop) "
+ "|> filter(fn: (r) => r._measurement == \"stock\") "
+ "|> filter(fn: (r) => r.company == \"zyz\") "
+ "|> aggregateWindow(every: 5s, fn: mean) "
+ "|> filter(fn: (r) => r._field == \"current\") "
+ "|> yield(name: \"mean\")";
LesserThreshold threshold = new LesserThreshold();
threshold.setLevel(CheckStatusLevel.CRIT);
threshold.setValue(35F);
String message = "The Stock price for XYZ is on: ${ r._level } level!";
influxDBClient
.getChecksApi()
.createThresholdCheck("XYZ Stock value", query, "5s", message, threshold, org.getId());
String url = "https://hooks.slack.com/services/x/y/z";
SlackNotificationEndpoint endpoint = influxDBClient
.getNotificationEndpointsApi()
.createSlackEndpoint("Slack Endpoint", url, org.getId());
influxDBClient
.getNotificationRulesApi()
.createSlackRule("Critical status to Slack", "10s", "${ r._message }", RuleStatusLevel.CRIT, endpoint, org.getId());
The following example demonstrates how to delete data from InfluxDB 2.x.
package example;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import com.influxdb.client.DeleteApi;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.exceptions.InfluxException;
public class DeleteData {
private static char[] token = "my-token".toCharArray();
public static void main(final String[] args) {
InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token);
DeleteApi deleteApi = influxDBClient.getDeleteApi();
try {
OffsetDateTime start = OffsetDateTime.now().minus(1, ChronoUnit.HOURS);
OffsetDateTime stop = OffsetDateTime.now();
deleteApi.delete(start, stop, "", "my-bucket", "my-org");
} catch (InfluxException ie) {
System.out.println("InfluxException: " + ie);
}
influxDBClient.close();
}
}
A client can be configured via configuration file. The configuration file has to be named as influx2.properties
and has to be in root of classpath.
The following options are supported:
Property name | default | description |
---|---|---|
influx2.url | - | the url to connect to InfluxDB |
influx2.org | - | default destination organization for writes and queries |
influx2.bucket | - | default destination bucket for writes |
influx2.token | - | the token to use for the authorization |
influx2.logLevel | NONE | rest client verbosity level |
influx2.readTimeout | 10000 ms | read timeout |
influx2.writeTimeout | 10000 ms | write timeout |
influx2.connectTimeout | 10000 ms | socket timeout |
influx2.precision | NS | default precision for unix timestamps in the line protocol |
influx2.clientType | - | to customize the User-Agent HTTP header |
The influx2.readTimeout
, influx2.writeTimeout
and influx2.connectTimeout
supports ms
, s
and m
as unit. Default is milliseconds.
influx2.url=http://localhost:8086
influx2.org=my-org
influx2.bucket=my-bucket
influx2.token=my-token
influx2.logLevel=BODY
influx2.readTimeout=5s
influx2.writeTimeout=10s
influx2.connectTimeout=5s
and then:
InfluxDBClient influxDBClient = InfluxDBClientFactory.create();
A client can be constructed using a connection string that can contain the InfluxDBClientOptions parameters encoded into the URL.
InfluxDBClient influxDBClient = InfluxDBClientFactory
.create("http://localhost:8086?readTimeout=5000&connectTimeout=5000&logLevel=BASIC", token)
The following options are supported:
Property name | default | description |
---|---|---|
org | - | default destination organization for writes and queries |
bucket | - | default destination bucket for writes |
token | - | the token to use for the authorization |
logLevel | NONE | rest client verbosity level |
readTimeout | 10000 ms | read timeout |
writeTimeout | 10000 ms | write timeout |
connectTimeout | 10000 ms | socket timeout |
precision | NS | default precision for unix timestamps in the line protocol |
clientType | - | to customize the User-Agent HTTP header |
The readTimeout
, writeTimeout
and connectTimeout
supports ms
, s
and m
as unit. Default is milliseconds.
InfluxDBClient
does not enable gzip compress for http requests by default. If you want to enable gzip to reduce transfer data's size, you can call:
influxDBClient.enableGzip();
You can configure the client to tunnel requests through an HTTP proxy. To configure the proxy use a okHttpClient
configuration:
OkHttpClient.Builder okHttpBuilder = new OkHttpClient.Builder()
.proxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxy", 8088)));
InfluxDBClientOptions options = InfluxDBClientOptions.builder()
.url("http://localhost:9999")
.authenticateToken("my-token".toCharArray())
.okHttpClient(okHttpBuilder)
.build();
InfluxDBClient client = InfluxDBClientFactory.create(options);
If you need to use proxy authentication then use something like:
OkHttpClient.Builder okHttpBuilder = new OkHttpClient.Builder()
.proxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxy", 8088)))
.proxyAuthenticator(new Authenticator() {
@Override
public Request authenticate(@Nullable final Route route, @Nonnull final Response response) {
return response.request().newBuilder()
.header("Proxy-Authorization", "Token proxy-token")
.build();
}
});
InfluxDBClientOptions options = InfluxDBClientOptions.builder()
.url("http://localhost:9999")
.authenticateToken("my-token".toCharArray())
.okHttpClient(okHttpBuilder)
.build();
InfluxDBClient client = InfluxDBClientFactory.create(options);
⚠️ If your proxy notify the client with permanent redirect (HTTP 301
) to different host. The client removesAuthorization
header, because otherwise the contents ofAuthorization
is sent to third parties which is a security vulnerability.
You can bypass this behaviour by:
String token = "my-token";
OkHttpClient.Builder okHttpClient = new OkHttpClient.Builder()
.addNetworkInterceptor(new Interceptor() {
@Nonnull
@Override
public Response intercept(@Nonnull final Chain chain) throws IOException {
Request authorization = chain.request().newBuilder()
.header("Authorization", "Token " + token)
.build();
return chain.proceed(authorization);
}
});
InfluxDBClientOptions options = InfluxDBClientOptions.builder()
.url("http://localhost:9999")
.authenticateToken(token.toCharArray())
.okHttpClient(okHttpClient)
.build();
InfluxDBClient client = InfluxDBClientFactory.create(options);
The Requests and Responses can be logged by changing the LogLevel. LogLevel values are NONE, BASIC, HEADER, BODY. Note that
applying the BODY
LogLevel will disable chunking while streaming and will load the whole response into memory.
influxDBClient.setLogLevel(LogLevel.HEADERS)
Server availability can be checked using the influxDBClient.health()
endpoint.
Construct queries using the flux-dsl query builder
package example;
import java.time.temporal.ChronoUnit;
import java.util.List;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.QueryApi;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import com.influxdb.query.dsl.Flux;
import com.influxdb.query.dsl.functions.restriction.Restrictions;
public class SynchronousQueryDSL {
private static char[] token = "my-token".toCharArray();
private static String org = "my-org";
public static void main(final String[] args) {
InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token, org);
Flux flux = Flux.from("my-bucket")
.range(-30L, ChronoUnit.MINUTES)
.filter(Restrictions.and(Restrictions.measurement().equal("cpu")))
.limit(10);
QueryApi queryApi = influxDBClient.getQueryApi();
//
// Query data
//
List<FluxTable> tables = queryApi.query(flux.toString());
for (FluxTable fluxTable : tables) {
List<FluxRecord> records = fluxTable.getRecords();
for (FluxRecord fluxRecord : records) {
System.out.println(fluxRecord.getTime() + ": " + fluxRecord.getValueByKey("_value"));
}
}
influxDBClient.close();
}
}
The latest version for Maven dependency:
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>7.2.0</version>
</dependency>
Or when using with Gradle:
dependencies {
implementation "com.influxdb:influxdb-client-java:7.2.0"
}
The snapshots are deployed into OSS Snapshot repository.
<repository>
<id>ossrh</id>
<name>OSS Snapshot repository</name>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
repositories {
maven { url "https://oss.sonatype.org/content/repositories/snapshots" }
}