Skip to content

Feature 767 Rule pipeline

rwadowski edited this page Sep 13, 2021 · 3 revisions

Design Proposal

Currently, we already have a memory sink to eat the sink result. We can use it as a bridge for different rules to form a pipeline by adding a memory source to read from the sink. Memory sink/source can be a global instance in eKuiper runtime to:

  1. Bridge data between rules to realize a rule pipeline
  2. Aggregate the processed data of different rules into a sink to achieve global data aggregation

Usage

Rule pipeline will be implicit. Each rule can use an inmemory sink / source. This means that each step will be create separately using existing api (example below).

curl -X POST --location "http://{{host}}/rules" \
    -H "Content-Type: application/json" \
    -d "{
          \"id\": \"rule1\",
          \"sql\": \"select temp from device where temp > 60 and temp < 90\",
          \"actions\": [{
            \"memory\": {
                \"topic\":\"result1\"
            }
          }]
        }"

curl -X POST --location "http://{{host}}/streams" \
    -H "Content-Type: application/json" \
    -d "{\"sql\" : \"create stream result1Stream () WITH (DATASOURCE=\\\"result1\\\", FORMAT=\\\"JSON\\\", TYPE=\\\"memory\\\")\"}"

curl -X POST --location "http://{{host}}/rules" \
    -H "Content-Type: application/json" \
    -d "{
          \"id\": \"rule2\",
          \"sql\": \"select temp + 1 as newtemp from result1Stream \",
          \"actions\": [{
            \"memory\": {
                 \"topic\":\"result2\"
            }
          }]
        }"

curl -X POST --location "http://{{host}}/streams" \
    -H "Content-Type: application/json" \
    -d "{\"sql\" : \"create stream result2Stream () WITH (DATASOURCE=\\\"result2\\\", FORMAT=\\\"JSON\\\", TYPE=\\\"memory\\\")\"}"

curl -X POST --location "http://{{host}}/rules" \
    -H "Content-Type: application/json" \
    -d "{
          \"id\": \"rule3\",
          \"sql\": \"select avg(newtemp) from result2Stream group by tumblingWindow(10)\",
          \"actions\": [{
            \"log\": {         
            }
          }]
        }"

This definition will create all sql operators and connect them via source/sink elements.

Memory sink/source

In order to connect certain sql operators there there is need for an inmemory data structure which can serve as source or sink.

Considered features

  • Topic in the memory bridge to allow pub/sub of data. Should be mqtt compliant.
  • Avoid OOM or other memory consumption problem