Skip to content

Latest commit

 

History

History
289 lines (230 loc) · 14.6 KB

overview.md

File metadata and controls

289 lines (230 loc) · 14.6 KB

规则

规则由 JSON 定义,下面是一个示例。

{
  "id": "rule1",
  "sql": "SELECT demo.temperature, demo1.temp FROM demo left join demo1 on demo.timestamp = demo1.timestamp where demo.temperature > demo1.temp GROUP BY demo.temperature, HOPPINGWINDOW(ss, 20, 10)",
  "actions": [
    {
      "log": {}
    },
    {
      "mqtt": {
        "server": "tcp://47.52.67.87:1883",
        "topic": "demoSink"
      }
    }
  ]
}

创建规则需要以下参数。

参数

参数名 是否可选 说明
id 规则 id, 规则 id 在同一 eKuiper 实例中必须唯一。
name 规则显示的名字或者描述。
sql 如果 graph 未定义,则该属性必须定义 为规则运行的 sql 查询
actions 如果 graph 未定义,则该属性必须定义 Sink 动作数组
graph 如果 sql 未定义,则该属性必须定义 规则有向无环图的 JSON 表示
options 选项列表
triggerd 布尔值,设置是否创建完规则后立刻运行,默认是 true

规则逻辑

一个规则代表了一个流处理流程,定义了从将数据输入流的数据源到各种处理逻辑,再到将数据输入到外部系统的动作。

有两种方法来定义规则的业务逻辑。要么使用SQL/动作组合,要么使用新增加的图API。

SQL 规则

通过指定 sqlactions 属性,我们可以以声明的方式定义规则的业务逻辑。其中,sql 定义了针对预定义流运行的 SQL 查询,这将转换数据。然后,输出的数据可以通过 action 路由到多个位置。

SQL

最简单的规则 SQL 如 SELECT * FROM demo。它有类似于 ANSI SQL 的语法,并且可以利用 eKuiper 运行时提供的丰富的运算符和函数。参见SQL获取更多 eKuiper SQL 的信息。

大部分的 SQL 子句都是定义逻辑的,除了负责指定流的 FROM 子句。在这个例子中,demo 是一个流。通过使用连接子句,可以有多个流或流/表。作为一个流引擎,一个规则中必须至少有一个流。

因此,这里的 SQL 查询语句实际上定义了两个部分。

  • 要处理的流或表。
  • 如何处理。

在使用SQL规则之前,必须事先定义流。请查看 streams 了解详情。

动作

动作部分定义了一个规则的输出行为。每个规则可以有多个动作。一个动作是一个 sink 连接器的实例。当定义动作时,键是 sink 连接器的类型名称,而值是其属性。

eKuiper 已经内置了丰富的 sink connector 类型,如 mqtt、rest 和 file 。用户也可以扩展更多的 sink 类型来用于规则动作中。每种sink类型都有自己的属性集。更多细节,请查看 sink

图规则

从 eKuiper 1.6.0 开始, eKuiper 在规则模型中提供了图规则 API 作为创建规则的另一种方式。该属性以 JSON 格式定义了一个规则的 DAG。它很容易直接映射到 GUI 编辑器中的图形,并适合作为拖放用户界面的后端。下面是一个图形规则定义的例子。

{
  "id": "rule1",
  "name": "Test Condition",
  "graph": {
    "nodes": {
      "demo": {
        "type": "source",
        "nodeType": "mqtt",
        "props": {
          "datasource": "devices/+/messages"
        }
      },
      "humidityFilter": {
        "type": "operator",
        "nodeType": "filter",
        "props": {
          "expr": "humidity > 30"
        }
      },
      "logfunc": {
        "type": "operator",
        "nodeType": "function",
        "props": {
          "expr": "log(temperature) as log_temperature"
        }
      },
      "tempFilter": {
        "type": "operator",
        "nodeType": "filter",
        "props": {
          "expr": "log_temperature < 1.6"
        }
      },
      "pick": {
        "type": "operator",
        "nodeType": "pick",
        "props": {
          "fields": ["log_temperature as temp", "humidity"]
        }
      },
      "mqttout": {
        "type": "sink",
        "nodeType": "mqtt",
        "props": {
          "server": "tcp://${mqtt_srv}:1883",
          "topic": "devices/result"
        }
      }
    },
    "topo": {
      "sources": ["demo"],
      "edges": {
        "demo": ["humidityFilter"],
        "humidityFilter": ["logfunc"],
        "logfunc": ["tempFilter"],
        "tempFilter": ["pick"],
        "pick": ["mqttout"]
      }
    }
  }
}

graph 属性是一个 json 结构,其中 nodes 定义图形中呈现的节点,topo 定义节点之间的边缘。节点类型可以是内置的节点类型,如窗口节点和过滤器节点等。它也可以是来自插件的用户定义的节点。请参考 graph rule 了解更多细节。

选项

当前的选项包括:

选项名 类型和默认值 说明
debug bool:false 指定该条规则是否开启 Debug Level 的日志水平,缺省情况下会继承全局配置中的 Debug 配置参数。
logFilename string: "" 指定该条规则的单独的日志文件名称,日志将保存在全局日志文件夹中,缺省情况下会延用全局配置中的日志配置参数。
isEventTime bool:false 使用事件时间还是将时间用作事件的时间戳。 如果使用事件时间,则将从有效负载中提取时间戳。 必须通过 stream 定义指定时间戳记。
lateTolerance int64:0 在使用事件时间窗口时,可能会出现元素延迟到达的情况。 LateTolerance 可以指定在删除元素之前可以延迟多少时间(单位为 ms)。 默认情况下,该值为0,表示后期元素将被删除。
concurrency int: 1 一条规则运行时会根据 sql 语句分解成多个 plan 运行。该参数设置每个 plan 运行的线程数。该参数值大于1时,消息处理顺序可能无法保证。
bufferLength int: 1024 指定每个 plan 可缓存消息数。若缓存消息数超过此限制,plan 将阻塞消息接收,直到缓存消息被消费使得缓存消息数目小于限制为止。此选项值越大,则消息吞吐能力越强,但是内存占用也会越多。
sendMetaToSink bool:false 指定是否将事件的元数据发送到目标。 如果为 true,则目标可以获取元数据信息。
sendError bool: false 指定是否将运行时错误发送到目标。如果为 true,则错误会在整个流中传递直到目标。否则,错误会被忽略,仅打印到日志中。
qos int:0 指定流的 qos。 值为0对应最多一次; 1对应至少一次,2对应恰好一次。 如果 qos 大于0,将激活检查点机制以定期保存状态,以便可以从错误中恢复规则。
checkpointInterval int:300000 指定触发检查点的时间间隔(单位为 ms)。 仅当 qos 大于0时才有效。
restartStrategy 结构 指定规则运行失败后自动重新启动规则的策略。这可以帮助从可恢复的故障中回复,而无需手动操作。请查看规则重启策略了解详细的配置项目。
cron string: "" 指定规则的周期性触发策略,该周期通过 cron 表达式 进行描述。
duration string: "" 指定规则的运行持续时间,只有当指定了 cron 后才有效。duration 不应该超过两次 cron 周期之间的时间间隔,否则会引起非预期的行为。
cronDatetimeRange 结构体数组 指定周期性规则的生效时间段。当指定了该参数后,周期性规则只有在这个参数所制定的时间范围内才生效。请查看 周期性规则 了解详细的配置项目
enableRuleTracer bool: false 指定规则是否开启规则级别的数据追踪

有关 qoscheckpointInterval 的详细信息,请查看状态和容错

可以在 rules 下属的 etc/kuiper.yaml 中全局定义规则选项。 规则 json 中定义的选项将覆盖全局设置。

规则重启策略

规则重启策略的配置项包括:

选项名 类型和默认值 说明
attempts int: 0 最大重试次数。如果设置为0,该规则将立即失败,不会进行重试。
delay int: 1000 默认的重试间隔时间,以毫秒为单位。如果没有设置 multiplier,重试的时间间隔将固定为这个值。
maxDelay int: 30000 重试的最大间隔时间,单位是毫秒。只有当 multiplier 有设置时,从而使得每次重试的延迟都会增加时才会生效。
multiplier float: 2 重试间隔时间的乘数。
jitterFactor float: 0.1 添加或减去延迟的随机值系数,防止在同一时间重新启动多个规则。

这些选项的默认值定义于 etc/kuiper.yaml 配置文件,可通过修改该文件更改默认值。

周期性规则

规则支持周期性的启动、运行和暂停。在 options 中,cron 表达了周期性规则的启动策略,如每 1 小时启动一次,而 duration 则表达了每次启动规则时的运行时间,如运行 30 分钟。

cron 是每 1 小时一次,而 duration 是 30 分钟时,那么该规则会每隔 1 小时启动一次,每次运行 30 分钟后便被暂停,等待下一次的启动运行。

通过 停止规则 停止一个周期性规则时,便会将该规则从周期性调度器中移除,从而不再被调度运行。如果该周期性规则正在运行,那么该运行也会被暂停。

cronDatetimeRange 的配置项如下:

选项名 类型和默认值 说明
begin string 周期性规则生效时间段的起始时间,格式为 `YYYY-MM-DD hh:mm:ss"
end string 周期性规则生效时间段的结束时间,格式为 `YYYY-MM-DD hh:mm:ss"
beginTimestamp int 周期性规则生效时间段的起始 unix 时间戳,单位为 ms
endTimestamp int 周期性规则生效时间段的结束 unix 时间戳,单位为 ms

cronDatetimeRange 支持结构体数组,你可以声明一组时间段来表达周期性规则生效的多个时间段:

{
    "cronDatetimeRange": [
        {
            "begin": "2023-06-26 10:00:00",
            "end": "2023-06-26 20:00:00"
        },
        {
            "beginTimestamp": 1701401478000,
            "endTimestamp": 1701401578000
        }
    ]
}

阶段运行规则

cronDatetimeRange 配置了但是 cronduration 为空时,则该规则会按照 cronDatetimeRange 所指定的时间阶段内一直运行,直到超出该时间阶段。

查看规则状态

当一条规则被部署到 eKuiper 中后,我们可以通过规则指标来了解到当前的规则运行状态。

查看规则状态

我们可以通过 rest api 来得到所有规则的运行状态,与单条规则详细状况。

获得所有规则的状态可以通过 展示规则 了解,而获取单条规则状态可以通过 获取规则的状态

了解规则运行的状态指标

对于以下规则:

{
  "id": "rule",
  "sql": "select * from demo",
  "actions": [
     {
      "mqtt": {
        "server": "tcp://broker.emqx.io:1883",
        "topic": "devices/+/messages",
        "qos": 1,
        "clientId": "demo_001",
        "retained": false
      }
    }
  ]
}

我们可以通过上述 获取规则的状态 获取到以下内容:

{
  "status": "running",
  "source_demo_0_records_in_total": 0,
  "source_demo_0_records_out_total": 0,
  ......
  "op_2_project_0_records_in_total": 0,
  "op_2_project_0_records_out_total": 0,
  ......
  "sink_mqtt_0_0_records_in_total": 0,
  "sink_mqtt_0_0_records_out_total": 0,
  ......
}

其中 status 代表了 rule 当前的运行状态,running 代表规则正在运行。

而之后的监控项则代表了规则运行过程中,各个算子的运行情况,其监控项构成则为 算子类型_算子信息_算子索引_具体监控项

source_demo_0_records_in_total 为例,其中 source 代表了读数据算子,demo 为对应的 stream,0 代表了该算子实例在并发度中的索引,而 records_in_total 则诠释了实际的监控项,即该算子接收了多少条记录。

当我们尝试往该 stream 发送了一条数据后,再次获取该规则状态如下:

{
  "status": "running",
  "source_demo_0_records_in_total": 1,
  "source_demo_0_records_out_total": 1,
  ......
  "op_2_project_0_records_in_total": 1,
  "op_2_project_0_records_out_total": 1,
  ......
  "sink_mqtt_0_0_records_in_total": 1,
  "sink_mqtt_0_0_records_out_total": 1,
  ......
}

可以看到每个算子的 records_in_totalrecords_out_total 都由 0 变为了 1,代表了该算子接收到了一条记录,并向下一个算子传递了一条记录,最终在 sink 端向 sink 写入了 1 条记录。

若开启 Prometheus 配置,这些指标也会收集到 Prometheus 中。全部的运行指标列表请查看指标列表