influxdb_client_3
is a Python module that provides a simple and convenient way to interact with InfluxDB 3.0. This module supports both writing data to InfluxDB and querying data using the Flight client, which allows you to execute SQL and InfluxQL queries on InfluxDB 3.0.
We offer a "Getting Started: InfluxDB 3.0 Python Client Library" video that goes over how to use the library and goes over the examples.
pyarrow
(automatically installed)pandas
(optional)
You can install 'influxdb3-python' using pip
:
pip install influxdb3-python
Note: This does not include Pandas support. If you would like to use key features such as to_pandas()
and write_file()
you will need to install pandas
separately.
Note: Please make sure you are using 3.6 or above. For the best performance use 3.11+
One of the easiest ways to get started is to checkout the "Pokemon Trainer Cookbook". This scenario takes you through the basics of both the client library and Pyarrow.
from influxdb_client_3 import InfluxDBClient3, Point
If you are using InfluxDB Cloud, then you should note that:
- You will need to supply your org id, this is not necessary for InfluxDB Dedicated.
- Use bucket name for the
database
argument.
client = InfluxDBClient3(token="your-token",
host="your-host",
org="your-org",
database="your-database")
You can write data using the Point class, or supplying line protocol.
point = Point("measurement").tag("location", "london").field("temperature", 42)
client.write(point)
point = "measurement fieldname=0"
client.write(point)
Users can import data from CSV, JSON, Feather, ORC, Parquet
import influxdb_client_3 as InfluxDBClient3
import pandas as pd
import numpy as np
from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError
class BatchingCallback(object):
def __init__(self):
self.write_count = 0
def success(self, conf, data: str):
self.write_count += 1
print(f"Written batch: {conf}, data: {data}")
def error(self, conf, data: str, exception: InfluxDBError):
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
def retry(self, conf, data: str, exception: InfluxDBError):
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
callback = BatchingCallback()
write_options = WriteOptions(batch_size=100,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)
wco = write_client_options(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry,
write_options=write_options
)
with InfluxDBClient3.InfluxDBClient3(
token="INSERT_TOKEN",
host="eu-central-1-1.aws.cloud2.influxdata.com",
org="6a841c0c08328fb1",
database="python", write_client_options=wco) as client:
client.write_file(
file='./out.csv',
timestamp_column='time', tag_columns=["provider", "machineID"])
print(f'DONE writing from csv in {callback.write_count} batch(es)')
client._write_api.write(bucket="pokemon-codex", record=pd_df, data_frame_measurement_name='caught', data_frame_tag_columns=['trainer', 'id', 'num'], data_frame_timestamp_column='timestamp')
client._write_api.write(bucket="pokemon-codex", record=pl_df, data_frame_measurement_name='caught', data_frame_tag_columns=['trainer', 'id', 'num'], data_frame_timestamp_column='timestamp')
query = "select * from measurement"
reader = client.query(query=query, language="sql")
table = reader.read_all()
print(table.to_pandas().to_markdown())
query = "select * from measurement"
reader = client.query(query=query, language="influxql")
table = reader.read_all()
print(table.to_pandas().to_markdown())
Currently, Windows users require an extra installation when querying via Flight natively. This is due to the fact gRPC cannot locate Windows root certificates. To work around this please follow these steps:
Install certifi
pip install certifi
Next include certifi within the flight client options:
import influxdb_client_3 as InfluxDBClient3
import pandas as pd
import numpy as np
from influxdb_client_3 import flight_client_options
import certifi
fh = open(certifi.where(), "r")
cert = fh.read()
fh.close()
client = InfluxDBClient3.InfluxDBClient3(
token="",
host="b0c7cce5-8dbc-428e-98c6-7f996fb96467.a.influxdb.io",
org="6a841c0c08328fb1",
database="flightdemo",
flight_client_options=flight_client_options(
tls_root_certs=cert))
table = client.query(
query="SELECT * FROM flight WHERE time > now() - 4h",
language="influxql")
print(table.to_pandas())
You may also include your own root certificate via this manor aswell.