Skip to content

Commit

Permalink
docs(io): source/sink split (#3207)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying authored Sep 20, 2024
1 parent ca32f2c commit 31a28a1
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 266 deletions.
8 changes: 0 additions & 8 deletions docs/directory.json
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,6 @@
"title": "InfluxDBV2 Sink",
"path": "guide/sinks/plugin/influx2"
},
{
"title": "TDengine Sink",
"path": "guide/sinks/plugin/tdengine"
},
{
"title": "Image Sink",
"path": "guide/sinks/plugin/image"
Expand Down Expand Up @@ -1105,10 +1101,6 @@
"title": "InfluxDBV2 Sink",
"path": "guide/sinks/plugin/influx2"
},
{
"title": "TDengine Sink",
"path": "guide/sinks/plugin/tdengine"
},
{
"title": "Image Sink",
"path": "guide/sinks/plugin/image"
Expand Down
1 change: 0 additions & 1 deletion docs/en_US/guide/connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ For specialized data dispatch requirements or integrations with particular platf

- [InfluxDB sink](./sinks/plugin/influx.md): A sink to InfluxDB `v1.x`.
- [InfluxDBV2 sink](./sinks/plugin/influx2.md): A sink to InfluxDB `v2.x`.
- [TDengine sink](./sinks/plugin/tdengine.md): A sink to TDengine.
- [Image sink](./sinks/plugin/image.md): A sink to an image file. Only used to handle binary results.
- [Zero MQ sink](./sinks/plugin/zmq.md): A sink to Zero MQ.
- [Kafka sink](./sinks/plugin/kafka.md): A sink to Kafka.
Expand Down
49 changes: 45 additions & 4 deletions docs/en_US/guide/sinks/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ The list of predefined sink plugins:

- [InfluxDB sink](./plugin/influx.md): sink to InfluxDB `v1.x`.
- [InfluxDBV2 sink](./plugin/influx2.md): sink to InfluxDB `v2.x`.
- [TDengine sink](./plugin/tdengine.md): sink to TDengine.
- [Image sink](./plugin/image.md): sink to an image file. Only used to handle binary results.
- [Zero MQ sink](./plugin/zmq.md): sink to Zero MQ.
- [Kafka sink](./plugin/kafka.md): sink to Kafka.
Expand Down Expand Up @@ -68,7 +67,6 @@ Each action can define its own properties. There are several common properties:

| property name | Type & Default Value | Description |
|----------------------|--------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| concurrency | int: 1 | Specify how many instances of the sink will be run. If the value is bigger than 1, the order of the messages may not be retained. |
| bufferLength | int: 1024 | Specify how many messages can be buffered in memory. If the buffered messages exceed the limit, the sink will block message receiving until the buffered messages have been sent out so that the buffered size is less than the limit. |
| omitIfEmpty | bool: false | If the configuration item is set to true, when SELECT result is empty, then the result will not feed to sink operator. |
| sendSingle | bool: false | The output messages are received as an array. This is indicate whether to send the results one by one. If false, the output message will be `{"result":"${the string of received message}"}`. For example, `{"result":"[{\"count\":30},"\"count\":20}]"}`. Otherwise, the result message will be sent one by one with the actual field name. For the same example as above, it will send `{"count":30}`, then send `{"count":20}` to the RESTful endpoint.Default to false. |
Expand All @@ -88,8 +86,10 @@ Each action can define its own properties. There are several common properties:
| resendPriority | int: default to global definition | resend cached priority, int type, default is 0. -1 means resend real-time data first; 0 means equal priority; 1 means resend cached data first. |
| resendIndicatorField | string: default to global definition | field name of the resend cache, the field type must be a bool value. If the field is set, it will be set to true when resending. e.g., if resendIndicatorField is `resend`, then the `resend` field will be set to true when resending the cache. |
| resendDestination | string: default "" | the destination to resend the cache to, which may have different meanings or support depending on the sink. For example, the mqtt sink can send the resend data to a different topic. The supported sinks are listed in [sinks with resend destination support](#sinks-with-resend-destination-support). |
| batchSize | int: 0 | Specify the number of buffered messages before sending. The sink will block sending messages until the number of buffered messages is equal to this value, then the messages will be sent at one time. batchSize treats the data for []map as multiple messages. |
| lingerInterval | int 0 | Specify the interval time for buffer messages before seding, the unit is millisecond. The sink will block sending messages until the buffer sending interval reaches this value. lingerInterval can be used together with batchSize to trigger sending when any condition is met. |
| batchSize | int: 0 | Specify the number of buffered messages before sending. The sink will block sending messages until the number of buffered messages is equal to this value, then the messages will be sent at one time. batchSize treats the data for []map as multiple messages. |
| lingerInterval | int 0 | Specify the interval time for buffer messages before seding, the unit is millisecond. The sink will block sending messages until the buffer sending interval reaches this value. lingerInterval can be used together with batchSize to trigger sending when any condition is met. |
| compression | string: "" | Sets the data compression algorithm. Only effective when the sink is of a type that sends bytecode. Supported compression methods are "zlib", "gzip", "flate", "zstd". |
| encryption | string: "" | Sets the data encryption algorithm. Only effective when the sink is of a type that sends bytecode. Currently, only the AES algorithm is supported. |

### Dynamic properties

Expand Down Expand Up @@ -248,3 +248,44 @@ Can also use the `resourceId` reference form with the following configuration
}
}
```

## Runtime Nodes

When users create rules, the Sink is a logical node. Depending on the type of the Sink itself and the user's
configuration, each Sink at runtime may generate an execution plan consisting of multiple nodes. The Sink property
configuration items are numerous, and the logic during actual runtime is quite complex. By breaking down the execution
plan into multiple nodes, the following benefits are primarily achieved:

- There are many shared properties and implementation logic among various Sinks, such as data format encoding. Splitting
the shared property implementation into independent runtime nodes facilitates node reuse, simplifies the
implementation of Sink nodes (Single Responsibility Principle), and improves the maintainability of nodes.
- The properties of the Sink include time-consuming calculations, such as compression and encoding. With a single node's
metrics, it is difficult to distinguish the actual execution status of sub-tasks when the Sink is executed. After
splitting the nodes, finer-grained runtime metrics can be supported to understand the status and latency of each
sub-task.
- After sub-task splitting, parallel computation of sub-tasks can be implemented, improving the overall efficiency of
rule execution.

### Execution Plan Splitting

The physical execution plan of the Sink node can be split into:

Batch --> Transform --> Encode --> Compress --> Encrypt --> Cache --> Connect

The rules for splitting are as follows:

- **Batch**: Configured with `batchSize` and/or `lingerInterval`. This node is used to accumulate batches, sending
received data to subsequent nodes according to batch configuration.
- **Transform**: Configured with `dataTemplate` or `dataField` or `fields` or other shared properties that require data
format conversion. This node is used to implement various transformation properties.
- **Encode**: Applicable when the Sink is of a type that sends bytecode (such as MQTT, which can send arbitrary
bytecode. SQL sinks with their own formats are not of this type) and the `format` property is configured. This node
will serialize the data based on the format and related schema configuration.
- **Compress**: Applicable when the Sink is of a type that sends bytecode and the `compression` property is configured.
This node will compress the data according to the configured compression algorithm.
- **Encrypt**: Applicable when the Sink is of a type that sends bytecode and the `encryption` property is configured.
This node will encrypt the data according to the configured encryption algorithm.
- **Cache**: Configured with `enableCache`. This node is used to implement data caching and retransmission. For detailed
information, please refer to [Caching](#caching).
- **Connect**: A node that is necessarily implemented for each Sink. This node is used to connect to external systems
and send data.
Loading

0 comments on commit 31a28a1

Please sign in to comment.