Skip to content

Latest commit

 

History

History
212 lines (104 loc) · 69.5 KB

File metadata and controls

212 lines (104 loc) · 69.5 KB

Agents

LangStream Version: 0.6.2


Compute chat completions (ai-chat-completions)

Sends the messages to the AI Service to compute chat completions. The result is stored in the specified field.

DescriptionTypeRequiredDefault Value
ai-serviceIn case of multiple AI services configured, specify the id of the AI service to use.string
completion-fieldField to use to store the completion results in the output topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field.string
composableWhether this step can be composed with other steps.booleantrue
frequency-penaltyParameter for the completion request. The parameters are passed to the AI Service as is.number
log-fieldField to use to store the log of the completion results in the output topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field.
The log contains useful information for debugging the completion prompts.
string
logit-biasParameter for the completion request. The parameters are passed to the AI Service as is.object
max-tokensParameter for the completion request. The parameters are passed to the AI Service as is.integer
messagesMessages to use for chat completions. You can use the Mustache syntax.array of object
min-chunks-per-messageMinimum number of chunks to send to the stream-to-topic topic. The chunks are sent as soon as they are available.
The chunks are sent in the order they are received from the AI Service.
To improve the TTFB (Time-To-First-Byte), the chunk size starts from 1 and doubles until it reaches the max-chunks-per-message value.
integer20
modelThe model to use for chat completions. The model must be available in the AI Service.string
optionsAdditional options for the model configuration. The structure depends on the model and AI provider.object
presence-penaltyParameter for the completion request. The parameters are passed to the AI Service as is.number
stopParameter for the completion request. The parameters are passed to the AI Service as is.array of string
streamEnable streaming of the results. Use in conjunction with the stream-to-topic parameter.booleantrue
stream-response-completion-fieldField to use to store the completion results in the stream-to-topic topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field.string
stream-to-topicEnable streaming of the results. If enabled, the results are streamed to the specified topic in small chunks. The entire messages will be sent to the output topic instead.string
temperatureParameter for the completion request. The parameters are passed to the AI Service as is.number
top-pParameter for the completion request. The parameters are passed to the AI Service as is.number
userParameter for the completion request. The parameters are passed to the AI Service as is.string
whenExecute the step only when the condition is met.
You can use the expression language to reference the message.
Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string


Compute chat completions (ai-chat-completions).messages

DescriptionTypeRequiredDefault Value
roleRole of the message. The role is used to identify the speaker in the chat.string
contentContent of the message. You can use the Mustache syntax.string


Compute text completions (ai-text-completions)

Sends the text to the AI Service to compute text completions. The result is stored in the specified field.

DescriptionTypeRequiredDefault Value
ai-serviceIn case of multiple AI services configured, specify the id of the AI service to use.string
completion-fieldField to use to store the completion results in the output topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field.string
composableWhether this step can be composed with other steps.booleantrue
frequency-penaltyParameter for the completion request. The parameters are passed to the AI Service as is.number
log-fieldField to use to store the log of the completion results in the output topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field.
The log contains useful information for debugging the completion prompts.
string
logit-biasParameter for the completion request. The parameters are passed to the AI Service as is.object
logprobsLogprobs parameter (only valid for OpenAI).string
logprobs-fieldLog probabilities to a field.string
max-tokensParameter for the completion request. The parameters are passed to the AI Service as is.integer
min-chunks-per-messageMinimum number of chunks to send to the stream-to-topic topic. The chunks are sent as soon as they are available.
The chunks are sent in the order they are received from the AI Service.
To improve the TTFB (Time-To-First-Byte), the chunk size starts from 1 and doubles until it reaches the max-chunks-per-message value.
integer20
modelThe model to use for text completions. The model must be available in the AI Service.string
optionsAdditional options for the model configuration. The structure depends on the model and AI provider.object
presence-penaltyParameter for the completion request. The parameters are passed to the AI Service as is.number
promptPrompt to use for text completions. You can use the Mustache syntax.array of string
stopParameter for the completion request. The parameters are passed to the AI Service as is.array of string
streamEnable streaming of the results. Use in conjunction with the stream-to-topic parameter.booleantrue
stream-response-completion-fieldField to use to store the completion results in the stream-to-topic topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field.string
stream-to-topicEnable streaming of the results. If enabled, the results are streamed to the specified topic in small chunks. The entire messages will be sent to the output topic instead.string
temperatureParameter for the completion request. The parameters are passed to the AI Service as is.number
top-pParameter for the completion request. The parameters are passed to the AI Service as is.number
userParameter for the completion request. The parameters are passed to the AI Service as is.string
whenExecute the step only when the condition is met.
You can use the expression language to reference the message.
Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string


Azure Blob Storage Source (azure-blob-storage-source)

Reads data from Azure blobs. There are three supported ways to authenticate:
- SAS token
- Storage account name and key
- Storage account connection string

DescriptionTypeRequiredDefault Value
containerThe name of the Azure econtainer to read from.stringlangstream-azure-source
endpointEndpoint to connect to. Usually it's https://<storage-account>.blob.core.windows.net.string
file-extensionsComma separated list of file extensions to filter by.stringpdf,docx,html,htm,md,txt
idle-timeTime in seconds to sleep after polling for new files.integer5
sas-tokenAzure SAS token. If not provided, storage account name and key must be provided.string
storage-account-connection-stringAzure storage account connection string. If not provided, SAS token must be provided.string
storage-account-keyAzure storage account key. If not provided, SAS token must be provided.string
storage-account-nameAzure storage account name. If not provided, SAS token must be provided.string


Apache Camel Source (camel-source)

Use Apache Camel components as Source

DescriptionTypeRequiredDefault Value
component-optionsAdditional parmaters to pass to the Camel component in the query string format.
The values are automatically encoded
object
component-uriThe Camel URI of the component to use as Source.string
key-headerHeader to use as key of the recordstring
max-buffered-recordsMaximum number of records to bufferinteger100


Cast record to another schema (cast)

Transforms the data to a target compatible schema.
Some step operations like cast or compute involve conversions from a type to another. When this happens the rules are:
- timestamp, date and time related object conversions assume UTC time zone if it is not explicit.
- date and time related object conversions to/from STRING use the RFC3339 format.
- timestamp related object conversions to/from LONG and DOUBLE are done using the number of milliseconds since EPOCH (1970-01-01T00:00:00Z).
- date related object conversions to/from INTEGER, LONG, FLOAT and DOUBLE are done using the number of days since EPOCH (1970-01-01).
- time related object conversions to/from INTEGER, LONG and DOUBLE are done using the number of milliseconds since midnight (00:00:00).

DescriptionTypeRequiredDefault Value
composableWhether this step can be composed with other steps.booleantrue
partWhen used with KeyValue data, defines if the transformation is done on the key or on the value. If empty, the transformation applies to both the key and the value.string
schema-typeThe target schema type.string
whenExecute the step only when the condition is met.
You can use the expression language to reference the message.
Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string


Compute values from the record (compute)

Computes new properties, values or field values based on an expression evaluated at runtime. If the field already exists, it will be overwritten.

DescriptionTypeRequiredDefault Value
composableWhether this step can be composed with other steps.booleantrue
fieldsAn array of objects describing how to calculate the field valuesarray of object
whenExecute the step only when the condition is met.
You can use the expression language to reference the message.
Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string


Compute values from the record (compute).fields

DescriptionTypeRequiredDefault Value
expressionIt is evaluated at runtime and the result of the evaluation is assigned to the field.
Refer to the language expression documentation for more information on the expression syntax.
string
nameThe name of the field to be computed. Prefix with key. or value. to compute the fields in the key or value parts of the message.
In addition, you can compute values on the following message headers [destinationTopic, messageKey, properties.].
Please note that properties is a map of key/value pairs that are referenced by the dot notation, for example properties.key0.
string
optionalIf true, it marks the field as optional in the schema of the transformed message. This is useful when null is a possible value of the compute expression.booleanfalse
typeThe type of the computed field. This
will translate to the schema type of the new field in the transformed message.
The following types are currently supported :STRING, INT8, INT16, INT32, INT64, FLOAT, DOUBLE, BOOLEAN, DATE, TIME, TIMESTAMP, LOCAL_DATE_TIME, LOCAL_TIME, LOCAL_DATE, INSTANT.
The type field is not required for the message headers [destinationTopic, messageKey, properties.] and STRING will be used.
For the value and key, if it is not provided, then the type will be inferred from the result of the expression evaluation.
string


Compute embeddings of the record (compute-ai-embeddings)

Compute embeddings of the record. The embeddings are stored in the record under a specific field.

DescriptionTypeRequiredDefault Value
ai-serviceIn case of multiple AI services configured, specify the id of the AI service to use.string
argumentsAdditional arguments to pass to the AI Service. (HuggingFace only)object
batch-sizeBatch size for submitting the embeddings requests.integer10
composableWhether this step can be composed with other steps.booleantrue
concurrencyMax number of concurrent requests to the AI Service.integer4
embeddings-fieldField where to store the embeddings.string
flush-intervalFlushing is disabled by default in order to avoid latency spikes.
You should enable this feature in the case of background processing.
integer0
loop-overExecute the agent over a list of documentsstring
modelModel to use for the embeddings. The model must be available in the configured AI Service.stringtext-embedding-ada-002
model-urlURL of the model to use. (HuggingFace only). The default is computed from the model: "djl://ai.djl.huggingface.pytorch/{model}"string
optionsAdditional options to pass to the AI Service. (HuggingFace only)object
textText to create embeddings from. You can use Mustache syntax to compose multiple fields into a single text. Example:
text: "{{{ value.field1 }}} {{{ value.field2 }}}"
string
whenExecute the step only when the condition is met.
You can use the expression language to reference the message.
Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string


Dispatch agent (dispatch)

Dispatches messages to different destinations based on conditions.

DescriptionTypeRequiredDefault Value
routesRoutes.array of object


Dispatch agent (dispatch).routes

DescriptionTypeRequiredDefault Value
destinationDestination of the message.string
actionAction on the message. Possible values are "dispatch" or "drop".stringdispatch
whenCondition to activate the route. This is a standard EL expression.string


Document to JSON (document-to-json)

Convert raw text document to JSON. The result will be a JSON object with the text content in the specified field.

DescriptionTypeRequiredDefault Value
copy-propertiesWhether to copy the message properties/headers in the output message.booleantrue
text-fieldField name to write the text content to.stringtext


Drop the record (drop)

Drops the record from further processing. Use in conjunction with when to selectively drop records.

DescriptionTypeRequiredDefault Value
composableWhether this step can be composed with other steps.booleantrue
whenExecute the step only when the condition is met.
You can use the expression language to reference the message.
Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string


Drop fields (drop-fields)

Drops the record fields.

DescriptionTypeRequiredDefault Value
composableWhether this step can be composed with other steps.booleantrue
fieldsFields to drop from the input record.array of string
partPart to drop. (value or key)string
whenExecute the step only when the condition is met.
You can use the expression language to reference the message.
Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string


Flare Controller (flare-controller)

Apply to the Flare pattern to enhance the quality of text completion results.

DescriptionTypeRequiredDefault Value
logprobs-fieldThe field that contains the logprobs of tokens returned by the ai-text-completion agent.string
loop-topicName of the topic to forward the message in case of requesting more documents.string
retrieve-documents-fieldName of the field to set in order to request the retrival of more documents.string
tokens-fieldThe field that contains the list of tokens returned by the ai-text-completion agent.string


Flatten record fields (flatten)

Converts structured nested data into a new single-hierarchy-level structured data. The names of the new fields are built by concatenating the intermediate level field names.

DescriptionTypeRequiredDefault Value
composableWhether this step can be composed with other steps.booleantrue
delimiterThe delimiter to use when concatenating the field names.string_
partWhen used with KeyValue data, defines if the transformation is done on the key or on the value. If empty, the transformation applies to both the key and the value.string
whenExecute the step only when the condition is met.
You can use the expression language to reference the message.
Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string


Http Request (http-request)

Agent for enriching data with an HTTP request.

DescriptionTypeRequiredDefault Value
allow-redirectsWhether or not to follow redirects.booleantrue
bodyBody to send with the request. You can use the Mustache syntax to inject value from the context.string
handle-cookiesWhether or not to handle cookies during the redirects.booleantrue
headersHeaders to send with the request. You can use the Mustache syntax to inject value from the context.object
methodHttp method to use for the request.stringGET
output-fieldThe field that will hold the results, it can be the same as "field" to override it.string
query-stringQuery string to append to the url. You can use the Mustache syntax to inject value from the context.
Note that the values will be automatically escaped.
object
urlUrl to send the request to. For adding query string parameters, use the `query-string` field.string


Identity function (identity)

Simple agent to move data from the input to the output. Could be used for testing or sample applications.

DescriptionTypeRequiredDefault Value


Invoke LangServe (langserve-invoke)

Agent for invoking LangServe based applications

DescriptionTypeRequiredDefault Value
allow-redirectsWhether or not to follow redirects.booleantrue
content-fieldField in the response that will be used as the content of the record.stringcontent
debugField in the response that will be used as the content of the record.boolean
fieldsFields of the generated records.array of object
handle-cookiesWhether or not to handle cookies during the redirects.booleantrue
headersHeaders to send with the request. You can use the Mustache syntax to inject value from the context.object
methodHttp method to use for the request.stringPOST
min-chunks-per-messageMinimum number of chunks to send to the stream-to-topic topic. The chunks are sent as soon as they are available.
The chunks are sent in the order they are received from the AI Service.
To improve the TTFB (Time-To-First-Byte), the chunk size starts from 1 and doubles until it reaches the max-chunks-per-message value.
integer20
output-fieldThe field that will hold the results, it can be the same as "field" to override it.stringvalue
stream-response-fieldField to use to store the completion results in the stream-to-topic topic. Use "value" to write the result without a structured schema. Use "value.<field>" to write the result in a specific field.string
stream-to-topicEnable streaming of the results. If enabled, the results are streamed to the specified topic in small chunks. The entire messages will be sent to the output topic instead.string
urlUrl to send the request to. For adding query string parameters, use the `query-string` field.string


Invoke LangServe (langserve-invoke).fields

DescriptionTypeRequiredDefault Value
expressionExpression to compute the value of the field. This is a standard EL expression.string
nameName of the field like value.xx, key.xxx, properties.xxxstring


Language detector (language-detector)

Detect the language of a message’s data and limit further processing based on language codes.

DescriptionTypeRequiredDefault Value
allowedLanguagesDefine a list of allowed language codes. If the message language is not in this list, the message is dropped.array of string
propertyThe name of the message header to write the language code to.stringlanguage


Log an event (log-event)

Log a line in the agent logs when a record is received.

DescriptionTypeRequiredDefault Value
fieldsFields to log.array of object
messageTemplate for a log message to print (Mustache).string
whenCondition to trigger the operation. This is a standard EL expression.stringtrue


Log an event (log-event).fields

DescriptionTypeRequiredDefault Value
expressionExpression to compute the value of the field. This is a standard EL expression.string
nameName of the field like value.xx, key.xxx, properties.xxxstring


Merge key-value format (merge-key-value)

Merges the fields of KeyValue records where both the key and value are structured types of the same schema type. Only AVRO and JSON are supported.

DescriptionTypeRequiredDefault Value
composableWhether this step can be composed with other steps.booleantrue
whenExecute the step only when the condition is met.
You can use the expression language to reference the message.
Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string


Python custom processor (python-function)

Run a your own Python processor.
All the configuration properties are available the class init method.

DescriptionTypeRequiredDefault Value
classNamePython class name to instantiate. This class must be present in the application's "python" files.string


Python custom processor (python-processor)

Run a your own Python processor.
All the configuration properties are available the class init method.

DescriptionTypeRequiredDefault Value
classNamePython class name to instantiate. This class must be present in the application's "python" files.string


Python custom service (python-service)

Run a your own Python service.
All the configuration properties are available in the class init method.

DescriptionTypeRequiredDefault Value
classNamePython class name to instantiate. This class must be present in the application's "python" files.string


Python custom sink (python-sink)

Run a your own Python sink.
All the configuration properties are available in the class init method.

DescriptionTypeRequiredDefault Value
classNamePython class name to instantiate. This class must be present in the application's "python" files.string


Python custom source (python-source)

Run a your own Python source.
All the configuration properties are available in the class init method.

DescriptionTypeRequiredDefault Value
classNamePython class name to instantiate. This class must be present in the application's "python" files.string


Query (query)

Perform a vector search or simple query against a datasource.

DescriptionTypeRequiredDefault Value
composableWhether this step can be composed with other steps.booleantrue
datasourceReference to a datasource id configured in the application.string
fieldsFields of the record to use as input parameters for the query.array of string
generated-keysList of fields to use as generated keys. The generated keys are returned in the output, depending on the database.array of string
loop-overLoop over a list of items taken from the record. For instance value.documents.
It must refer to a list of maps. In this case the output-field parameter
but be like "record.fieldname" in order to replace or set a field in each record
with the results of the query. In the query parameters you can refer to the
record fields using "record.field".
string
modeExecution mode: query or execute. In query mode, the query is executed and the results are returned. In execute mode, the query is executed and the result is the number of rows affected (depending on the database).stringquery
only-firstIf true, only the first result of the query is stored in the output field.booleanfalse
output-fieldThe name of the field to use to store the query result.string
queryThe query to use to extract the data.string
whenExecute the step only when the condition is met.
You can use the expression language to reference the message.
Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string


Query a vector database (query-vector-db)

Query a vector database using Vector Search capabilities.

DescriptionTypeRequiredDefault Value
composableWhether this step can be composed with other steps.booleantrue
datasourceReference to a datasource id configured in the application.string
fieldsFields of the record to use as input parameters for the query.array of string
generated-keysList of fields to use as generated keys. The generated keys are returned in the output, depending on the database.array of string
loop-overLoop over a list of items taken from the record. For instance value.documents.
It must refer to a list of maps. In this case the output-field parameter
but be like "record.fieldname" in order to replace or set a field in each record
with the results of the query. In the query parameters you can refer to the
record fields using "record.field".
string
modeExecution mode: query or execute. In query mode, the query is executed and the results are returned. In execute mode, the query is executed and the result is the number of rows affected (depending on the database).stringquery
only-firstIf true, only the first result of the query is stored in the output field.booleanfalse
output-fieldThe name of the field to use to store the query result.string
queryThe query to use to extract the data.string
whenExecute the step only when the condition is met.
You can use the expression language to reference the message.
Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string


Re-rank (re-rank)

Agent for re-ranking documents based on a query.

DescriptionTypeRequiredDefault Value
algorithmAlgorithm to use for re-ranking. 'none' or 'MMR'.stringnone
bParameter for B25 algorithm.number0.75
embeddings-fieldResult field for the embeddings.string
fieldThe field that contains the documents to sort.string
k1Parameter for B25 algorithm.number1.5
lambdaParameter for MMR algorithm.number0.5
maxMaximum number of documents to keep.integer100
output-fieldThe field that will hold the results, it can be the same as "field" to override it.string
query-embeddingsField that contains the embeddings of the documents to sort.string
query-textField that already contains the text that has been embedded.string
text-fieldResult field for the text.string


S3 Source (s3-source)

Reads data from S3 bucket

DescriptionTypeRequiredDefault Value
access-keyAccess key for the S3 server.stringminioadmin
bucketNameThe name of the bucket to read from.stringlangstream-source
endpointThe endpoint of the S3 server.stringhttp://minio-endpoint.-not-set:9090
file-extensionsComma separated list of file extensions to filter by.stringpdf,docx,html,htm,md,txt
idle-timeTime in seconds to sleep after polling for new files.integer5
regionRegion for the S3 server.string
secret-keySecret key for the S3 server.stringminioadmin


Kafka Connect Sink agent (sink)

Run any Kafka Connect Sink.
All the configuration properties are passed to the Kafka Connect Sink.

DescriptionTypeRequiredDefault Value
connector.classJava main class for the Kafka Sink connector.string


Kafka Connect Source agent (source)

Run any Kafka Connect Source.
All the configuration properties are passed to the Kafka Connect Source.

DescriptionTypeRequiredDefault Value
connector.classJava main class for the Kafka Source connector.string


Text extractor (text-extractor)

Extracts text content from different document formats like PDF, JSON, XML, ODF, HTML and many others.

DescriptionTypeRequiredDefault Value


Text normaliser (text-normaliser)

Apply normalisation to the text.

DescriptionTypeRequiredDefault Value
make-lowercaseWhether to make the text lowercase.booleantrue
trim-spacesWhether to trim spaces from the text.booleantrue


Text splitter (text-splitter)

Split message content in chunks.

DescriptionTypeRequiredDefault Value
chunk_overlapRecursiveCharacterTextSplitter splitter option. Chunk overlap of the previous message.
Checkout https://github.com/knuddelsgmbh/jtokkit for more details.
integer100
chunk_sizeRecursiveCharacterTextSplitter splitter option. Chunk size of each message.
Checkout https://github.com/knuddelsgmbh/jtokkit for more details.
integer200
keep_separatorRecursiveCharacterTextSplitter splitter option. Whether or not to keep separators.
Checkout https://github.com/knuddelsgmbh/jtokkit for more details.
booleanfalse
length_functionRecursiveCharacterTextSplitter splitter option. Options are: r50k_base, p50k_base, p50k_edit and cl100k_base.
Checkout https://github.com/knuddelsgmbh/jtokkit for more details.
stringcl100k_base
separatorsRecursiveCharacterTextSplitter splitter option. The separator to use for splitting.
Checkout https://github.com/knuddelsgmbh/jtokkit for more details.
array of string"\n\n", "\n", " ", ""
splitter_typeSplitter implementation to use. Currently supported: RecursiveCharacterTextSplitter.stringRecursiveCharacterTextSplitter


Timer source (timer-source)

Periodically emits records to trigger the execution of pipelines.

DescriptionTypeRequiredDefault Value
fieldsFields of the generated records.array of object
period-secondsPeriod of the timer in seconds.integer60


Timer source (timer-source).fields

DescriptionTypeRequiredDefault Value
expressionExpression to compute the value of the field. This is a standard EL expression.string
nameName of the field like value.xx, key.xxx, properties.xxxstring


Trigger event (trigger-event)

Emits a record on a side destination when a record is received.

DescriptionTypeRequiredDefault Value
continue-processingWhether to continue processing the record downstream after emitting the event.
If the when condition is false, the record is passed downstream anyway.
This flag allows you to stop processing system events and trigger a different pipeline.
booleantrue
destinationDestination of the message.string
fieldsFields of the generated records.array of object
whenCondition to trigger the event. This is a standard EL expression.stringtrue


Trigger event (trigger-event).fields

DescriptionTypeRequiredDefault Value
expressionExpression to compute the value of the field. This is a standard EL expression.string
nameName of the field like value.xx, key.xxx, properties.xxxstring


Unwrap key-value format (unwrap-key-value)

If the record value is in KeyValue format, extracts the KeyValue's key or value and make it the record value.

DescriptionTypeRequiredDefault Value
composableWhether this step can be composed with other steps.booleantrue
unwrapKeyWhether to unwrap the key instead of the value.booleanfalse
whenExecute the step only when the condition is met.
You can use the expression language to reference the message.
Example: when: "value.first == 'f1' && value.last.toUpperCase() == 'L1'"
string


Astra (vector-db-sink)

Writes data to DataStax Astra service.
All the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html

DescriptionTypeRequiredDefault Value
datasourceResource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'astra'.string
keyspaceThe keyspace of the table to write to.string
mappingComma separated list of mapping between the table column and the record field. e.g. my_colum_id=key, my_column_name=value.name.string
table-nameThe name of the table to write to. The table must already exist.string


Astra Vector DB (vector-db-sink)

Writes data to Apache Cassandra.
All the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html

DescriptionTypeRequiredDefault Value
collection-nameThe name of the collection to write to. The collection must already exist.string
datasourceResource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'astra-vector-db'.string
fieldsFields of the collection to write to.array of object


Astra Vector DB (vector-db-sink).fields

DescriptionTypeRequiredDefault Value
expressionJSTL Expression for computing the field value.string
nameField namestring


Cassandra (vector-db-sink)

Writes data to Apache Cassandra.
All the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html

DescriptionTypeRequiredDefault Value
datasourceResource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'cassandra'.string
keyspaceThe keyspace of the table to write to.string
mappingComma separated list of mapping between the table column and the record field. e.g. my_colum_id=key, my_column_name=value.name.string
table-nameThe name of the table to write to. The table must already exist.string


JDBC (vector-db-sink)

Writes data to any JDBC compatible database.

DescriptionTypeRequiredDefault Value
datasourceResource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'jdbc'.string
fieldsFields of the table to write to.array of object
table-nameThe name of the table to write to. The table must already exist.string


JDBC (vector-db-sink).fields

DescriptionTypeRequiredDefault Value
expressionJSTL Expression for computing the field value.string
nameField namestring
primary-keyIs this field part of the primary key?booleanfalse


Milvus (vector-db-sink)

Writes data to Milvus/Zillis service.

DescriptionTypeRequiredDefault Value
collection-nameCollection namestring
database-nameCollection namestring
datasourceResource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'milvus'.string
fieldsFields definition.array of object


Milvus (vector-db-sink).fields

DescriptionTypeRequiredDefault Value
expressionJSTL Expression for computing the field value.string
nameField namestring


OpenSearch (vector-db-sink)

Writes data to OpenSearch or AWS OpenSearch serverless.

DescriptionTypeRequiredDefault Value
batch-sizeBatch size for bulk operations. Hitting the batch size will trigger a flush.integer10
bulk-parametersOpenSearch bulk URL parameters.object
datasourceResource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'opensearch'.string
fieldsIndex fields definition.array of object
flush-intervalFlush interval in millisecondsinteger1000
idJSTL Expression to compute the index _id field. Leave it empty to let OpenSearch auto-generate the _id field.string


OpenSearch (vector-db-sink).bulk-parameters

DescriptionTypeRequiredDefault Value
pipelineThe pipeline ID for preprocessing documents.
Refer to the OpenSearch documentation for more details.
string
routingRoutes the request to the specified shard.
Refer to the OpenSearch documentation for more details.
string
require_aliasSet to true to require that all actions target an index alias rather than an index.
Refer to the OpenSearch documentation for more details.
boolean
refreshWhether to refresh the affected shards after performing the indexing operations. Default is false. true makes the changes show up in search results immediately, but hurts cluster performance. wait_for waits for a refresh. Requests take longer to return, but cluster performance doesn’t suffer.
Note that AWS OpenSearch supports only false.
Refer to the OpenSearch documentation for more details.
string
wait_for_active_shardsSpecifies the number of active shards that must be available before OpenSearch processes the bulk request. Default is 1 (only the primary shard). Set to all or a positive integer. Values greater than 1 require replicas. For example, if you specify a value of 3, the index must have two replicas distributed across two additional nodes for the request to succeed.
Refer to the OpenSearch documentation for more details.
string
timeoutHow long to wait for the request to return.
Refer to the OpenSearch documentation for more details.
string


OpenSearch (vector-db-sink).fields

DescriptionTypeRequiredDefault Value
expressionJSTL Expression for computing the field value.string
nameField namestring


Pinecone (vector-db-sink)

Writes data to Pinecone service.
To add metadata fields you can add vector.metadata.my-field: "value.my-field". The value is a JSTL Expression to compute the actual value.

DescriptionTypeRequiredDefault Value
datasourceResource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'pinecone'.string
vector.idJSTL Expression to compute the id.string
vector.metadataMetadata to append. The key is the metadata name and the value the JSTL Expression to compute the actual value.object
vector.namespaceJSTL Expression to compute the namespace.string
vector.vectorJSTL Expression to compute the vector.string


Apache Solr (vector-db-sink)

Writes data to Apache Solr service.
The collection-name is configured at datasource level.

DescriptionTypeRequiredDefault Value
commit-withinCommit within optioninteger1000
datasourceResource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'solr'.string
fieldsFields definition.array of object


Apache Solr (vector-db-sink).fields

DescriptionTypeRequiredDefault Value
expressionJSTL Expression for computing the field value.string
nameField namestring


Web crawler source (webcrawler-source)

Crawl a website and extract the content of the pages.

DescriptionTypeRequiredDefault Value
access-keyConfiguration for handling the agent status.
Access key for the S3 server.
stringminioadmin
allow-non-html-contentsWhether to emit non HTML documents to the pipeline (i.e. PDF Files).booleanfalse
allowed-domainsDomains that the crawler is allowed to access.array of string
bucketNameConfiguration for handling the agent status.
The name of the bucket.
stringlangstream-source
endpointConfiguration for handling the agent status.
The S3 endpoint.
stringhttp://minio-endpoint.-not-set:9090
forbidden-pathsPaths that the crawler is not allowed to access.array of string
handle-cookiesWhether to handle cookies.booleantrue
handle-robots-fileWhether to scan the HTML documents to find links to other pages.booleantrue
http-timeoutTimeout for HTTP requests. (in milliseconds)integer10000
max-depthMaximum depth of the crawl.integer50
max-error-countMaximum number of errors allowed before stopping.integer5
max-unflushed-pagesMaximum number of unflushed pages before the agent persists the crawl data.integer100
max-urlsMaximum number of URLs that can be crawled.integer1000
min-time-between-requestsMinimum time between two requests to the same domain. (in milliseconds)integer500
regionConfiguration for handling the agent status.
Region for the S3 server.
string
reindex-interval-secondsTime interval between reindexing of the pages.integer86400
scan-html-documentsWhether to scan HTML documents for links to other sites.booleantrue
secret-keyConfiguration for handling the agent status.
Secret key for the S3 server.
stringminioadmin
seed-urlsThe starting URLs for the crawl.array of string
state-storageState storage configuration. "s3" or "disk"strings3
user-agentUser agent to use for the requests.stringMozilla/5.0 (compatible; LangStream.ai/0.1; +https://langstream.ai)