无码人妻精品一区二区三18禁,影音先锋男人AV橹橹色,污污污污污污www网站免费,日韩成人av无码一区二区三区,欧美性受xxxx狂喷水

一文學會如何使用 TDengine 3.0 中的流式計算

TAOS Data

2022-09-22 / , ,

小 T 導讀:TDengine 3.0 引入(ru)了全新(xin)的(de)流式(shi)(shi)計算(suan)引擎,既支持時間驅動(dong)的(de)流式(shi)(shi)計算(suan),也(ye)支持事件驅動(dong)的(de)流式(shi)(shi)計算(suan)。本文將(jiang)對新(xin)的(de)流式(shi)(shi)計算(suan)引擎的(de)語法規則進行詳細介紹,方(fang)便(bian)開發(fa)者(zhe)及(ji)企(qi)業使用。

TDengine 是一款開源、云原生的時序數據庫(Time Series Database,TSDB),專(zhuan)為物聯(lian)(lian)網、工業(ye)互聯(lian)(lian)網、金融、IT 運維監控等場(chang)景設計并優化。近期發布(bu)的 TDengine 3.0,全新的流式(shi)計算引擎是其一大亮(liang)點。

TDengine 3.0 的(de)流式(shi)計算(suan)引擎(qing)提(ti)(ti)供了實(shi)時處理(li)寫(xie)入的(de)數(shu)據流能力,使用 SQL 定(ding)(ding)義(yi)實(shi)時流變換(huan),當(dang)數(shu)據被寫(xie)入流的(de)源表后,數(shu)據會被以定(ding)(ding)義(yi)的(de)方式(shi)自動處理(li),并根據定(ding)(ding)義(yi)的(de)觸(chu)發(fa)模式(shi)向(xiang)目的(de)表推送結果。它提(ti)(ti)供了替代(dai)復雜流處理(li)系統的(de)輕(qing)量級(ji)解(jie)決方案(an),并能夠(gou)在高吞(tun)吐(tu)的(de)數(shu)據寫(xie)入情況下,提(ti)(ti)供毫秒級(ji)的(de)計算(suan)結果延遲。

流式計算(suan)可以(yi)(yi)包含(han)數(shu)(shu)據(ju)過濾,標(biao)(biao)量(liang)函(han)數(shu)(shu)計算(suan)(含(han) UDF),以(yi)(yi)及窗(chuang)口(kou)(kou)聚(ju)合(支(zhi)持滑動窗(chuang)口(kou)(kou)、會(hui)話窗(chuang)口(kou)(kou)與狀態(tai)窗(chuang)口(kou)(kou)),可以(yi)(yi)以(yi)(yi)超級表(biao)、子(zi)表(biao)、普通表(biao)為源表(biao),寫入到目的(de)超級表(biao)。在創(chuang)建流時,目的(de)超級表(biao)將被(bei)自動創(chuang)建,隨后新插入的(de)數(shu)(shu)據(ju)會(hui)被(bei)流定義(yi)的(de)方(fang)式處理并寫入其中,通過 partition by 子(zi)句,可以(yi)(yi)以(yi)(yi)表(biao)名或標(biao)(biao)簽劃分 partition,不(bu)同的(de) partition 將寫入到目的(de)超級表(biao)的(de)不(bu)同子(zi)表(biao)。

TDengine 的流式計算能夠支持分布在多個 vnode 中的超級表聚合;還能夠處理亂序數據的寫入:它提供了 watermark 機制以度量容忍數據亂序的程度,并提供了 ignore expired 配置項以決定亂序數據的處理策略——丟棄或者重新計算。
下面我們就(jiu)一起看一下 TDengine 中流(liu)式計算相關的 SQL 語(yu)法。

流式計算的創建、刪除與展示

創建

CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery
stream_options: {
 TRIGGER    [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time]
 WATERMARK   time
}

其中 subquery 是 select 普通查詢語(yu)法的子集:

subquery: SELECT select_list
    from_clause
    [WHERE condition]
    [PARTITION BY tag_list]
    [window_clause]

支持會(hui)話窗口(kou)、狀態窗口(kou)與滑動窗口(kou),其(qi)中,會(hui)話窗口(kou)與狀態窗口(kou)搭配超(chao)級表時(shi)必(bi)須與 partition by tbname 一起使用:

window_clause: {
    SESSION(ts_col, tol_val)
  | STATE_WINDOW(col)
  | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)]
}

在上述語句(ju)中,SESSION 是會話窗口(kou),tol_val 是時間(jian)(jian)間(jian)(jian)隔(ge)(ge)的最大范圍。在 tol_val 時間(jian)(jian)間(jian)(jian)隔(ge)(ge)范圍內的數據(ju)都屬于同(tong)一(yi)個(ge)(ge)窗口(kou),如果有(you)連(lian)續兩條數據(ju)的時間(jian)(jian)超過(guo) tol_val,則自動開啟下一(yi)個(ge)(ge)窗口(kou)。窗口(kou)的定(ding)義(yi)與時序數據(ju)特色查(cha)詢中的定(ding)義(yi)完全(quan)相(xiang)同(tong),詳見 。

例如(ru),使(shi)用如(ru)下語句創(chuang)(chuang)(chuang)建流(liu)式計(ji)算,同(tong)時(shi)自(zi)動(dong)創(chuang)(chuang)(chuang)建名為 avg_vol 的超級表(biao)(biao),此流(liu)計(ji)算以一分鐘為時(shi)間窗口(kou)、30 秒(miao)為前向增(zeng)量統計(ji)這(zhe)些電表(biao)(biao)的平均(jun)電壓,并將來(lai)自(zi) meters 表(biao)(biao)的數(shu)據(ju)的計(ji)算結(jie)果寫入(ru) avg_vol 表(biao)(biao),不(bu)同(tong) partition 的數(shu)據(ju)會(hui)分別創(chuang)(chuang)(chuang)建子(zi)表(biao)(biao)并寫入(ru)不(bu)同(tong)子(zi)表(biao)(biao)。

CREATE STREAM avg_vol_s INTO avg_vol AS
SELECT _wstartts, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s);

刪除

DROP STREAM [IF NOT EXISTS] stream_name;

僅刪除(chu)流(liu)式計算任務(wu),由(you)流(liu)式計算寫(xie)入的數據不會(hui)被刪除(chu)。

展示

SHOW STREAMS;

若要(yao)展(zhan)示更(geng)詳(xiang)細(xi)的信息,可以使用:

SELECT * from performance_schema.`perf_streams`;

流式計算的 partition

我們可以(yi)使用(yong) PARTITION BY TBNAME 或 PARTITION BY tag 對一個(ge)流進行多分(fen)區的(de)計算,每個(ge)分(fen)區的(de)時間線與時間窗口是獨(du)立(li)的(de),會各自聚(ju)合,并寫入到目的(de)表中的(de)不(bu)同子表。如果不(bu)帶(dai) PARTITION BY 選項,那所有的(de)數據將寫入到一張子表。

流式計算創(chuang)(chuang)建(jian)的超級表(biao)有唯(wei)一的 tag 列 groupId,每個 partition 會(hui)被(bei)分配唯(wei)一 groupId。與 schemaless 寫入一致,我(wo)們通過 MD5 計算子(zi)表(biao)名(ming),并自(zi)動創(chuang)(chuang)建(jian)它(ta)。

流式計算的觸發模式

在創建(jian)流(liu)時(shi),可以通過 TRIGGER 指令(ling)指定流(liu)式計算的觸發模式。

對于(yu)非窗口(kou)計(ji)(ji)算(suan)(suan),流(liu)式計(ji)(ji)算(suan)(suan)的(de)觸發(fa)是實時的(de);對于(yu)窗口(kou)計(ji)(ji)算(suan)(suan),目前提供如下(xia) 3 種觸發(fa)模式:

  1. AT_ONCE:寫入立即觸發
  2. WINDOW_CLOSE:窗口關閉時觸發(窗口關閉由事件時間決定,可配合 watermark 使用)
  3. MAX_DELAY time:若窗口關閉,則觸發計算。若窗口未關閉,且未關閉時長超過 max delay 指定的時間,則觸發計算。

由于(yu)窗(chuang)口關(guan)閉是由事(shi)(shi)件(jian)時(shi)間(jian)(jian)(jian)所決(jue)定的,如果(guo)因(yin)事(shi)(shi)件(jian)流(liu)(liu)中(zhong)斷、或持續延遲導致事(shi)(shi)件(jian)時(shi)間(jian)(jian)(jian)無法更新,可(ke)能(neng)無法得到(dao)最新的計算(suan)結果(guo)。因(yin)此,流(liu)(liu)式計算(suan)提供了(le)以事(shi)(shi)件(jian)時(shi)間(jian)(jian)(jian)結合處理時(shi)間(jian)(jian)(jian)計算(suan)的 MAX_DELAY 觸發(fa)模式,MAX_DELAY 模式在窗(chuang)口關(guan)閉時(shi)會(hui)立(li)即(ji)觸發(fa)計算(suan)。此外,當數據寫入(ru)后(hou),計算(suan)觸發(fa)的時(shi)間(jian)(jian)(jian)超(chao)過 max delay 指定的時(shi)間(jian)(jian)(jian),則(ze)立(li)即(ji)觸發(fa)計算(suan)。

流式計算的窗口關閉

流式計算以事件時間(插入記錄中的時間戳主鍵)為基準計算窗口關閉,而非以 TDengine 服務器的時間,這樣可以避免客戶端與服務器時間不一致帶來的問題,有效解決亂序數據寫入等難題。同時,流式計算還提供了 watermark 來定義容忍的亂序程度。
在創建流時(shi),我們可以在 stream_option 中(zhong)指定(ding) watermark,它定(ding)義了數(shu)據亂序的容(rong)忍(ren)上界。流式計算通過 watermark 來度(du)量(liang)對亂序數(shu)據的容(rong)忍(ren)程度(du),watermark 默(mo)認為 0。

T = 最新(xin)事(shi)件時間 – watermark

每次(ci)寫入的數據都會以(yi)上述(shu)公式更新(xin)窗(chuang)口關閉(bi)時間(jian),并將(jiang)窗(chuang)口結束時間(jian) < T 的所(suo)有打開的窗(chuang)口關閉(bi),若觸發模式為 WINDOW_CLOSE 或(huo) MAX_DELAY,則(ze)推送窗(chuang)口聚(ju)合結果。

TDengine Database

在上(shang)圖中(zhong),縱(zong)軸表(biao)示不同時(shi)刻,對于不同時(shi)刻,我們畫出其對應的(de)(de)(de) TDengine 收到(dao)的(de)(de)(de)數(shu)(shu)(shu)據,即為橫軸。已知橫軸上(shang)的(de)(de)(de)數(shu)(shu)(shu)據點(dian)表(biao)示已經收到(dao)的(de)(de)(de)數(shu)(shu)(shu)據,其中(zhong)藍(lan)色的(de)(de)(de)點(dian)表(biao)示事件時(shi)間(即數(shu)(shu)(shu)據中(zhong)的(de)(de)(de)時(shi)間戳主鍵(jian))最(zui)后的(de)(de)(de)數(shu)(shu)(shu)據,該數(shu)(shu)(shu)據點(dian)減去定義的(de)(de)(de) watermark 時(shi)間,就得(de)到(dao)亂序容忍的(de)(de)(de)上(shang)界(jie) T。所(suo)有結束時(shi)間小于 T 的(de)(de)(de)窗(chuang)口都將被關閉(圖中(zhong)以灰色方框標記)。

在 T2 時(shi)刻(ke),亂(luan)序(xu)數據(黃色的點)到(dao)達(da) TDengine,由于有 watermark 的存在,這些數據進入(ru)的窗(chuang)口并未被(bei)關閉(bi)(bi),因此可以(yi)被(bei)正確(que)處理。在 T3 時(shi)刻(ke),最新事件到(dao)達(da),T 向后推移超(chao)過了第(di)二個窗(chuang)口關閉(bi)(bi)的時(shi)間,該窗(chuang)口被(bei)關閉(bi)(bi),亂(luan)序(xu)數據被(bei)正確(que)處理。

但要注意,在 window_close 或(huo) max_delay 模式(shi)下(xia),窗口(kou)關閉直(zhi)接(jie)影響推送結果(guo)。在 at_once 模式(shi)下(xia),窗口(kou)關閉只(zhi)與內存占(zhan)用(yong)有關。

流式計算的過期數據處理策略

對于已關(guan)閉(bi)的窗口(kou),再次落入該窗口(kou)中的數(shu)(shu)據(ju)就會(hui)被標記為(wei)過(guo)期數(shu)(shu)據(ju)。TDengine 對于過(guo)期數(shu)(shu)據(ju)提供兩種處理方式,由(you) IGNORE EXPIRED 選(xuan)項(xiang)指(zhi)定:

  1. 重新計算,即 IGNORE EXPIRED 0:默認配置,從 TSDB 中重新查找對應窗口的所有數據并重新計算得到最新結果
  2. 直接丟棄,即 IGNORE EXPIRED 1:忽略過期數據

無(wu)論在哪(na)種模(mo)式下,watermark 都應該(gai)被妥(tuo)善設置,來得到正確(que)結果(直接丟棄模(mo)式)或(huo)避免頻繁(fan)觸發(fa)重算帶來的性(xing)能開銷(xiao)(重新計算模(mo)式)。

示例

企業電(dian)表(biao)的數(shu)據(ju)經常都是成(cheng)百上千億(yi)條的,想要將(jiang)這些(xie)分散、凌亂的數(shu)據(ju)清(qing)洗(xi)或轉換(huan)都需要比較長(chang)的時間,很難做(zuo)到高效性和實時性。在如下例子中,通過 TDengine 流計算可以將(jiang)電(dian)表(biao)電(dian)壓大(da)于 220V 的數(shu)據(ju)清(qing)洗(xi)掉,然后以 5 秒(miao)為窗口(kou)整(zheng)合并計算出每個窗口(kou)中電(dian)流的最大(da)值(zhi),最后將(jiang)結果輸(shu)出到指(zhi)定的數(shu)據(ju)表(biao)中。

創建 Database 和原始數據表

首先準備數據,完成(cheng)建庫、建一張超(chao)級表和多張子(zi)表操作:

DROP DATABASE IF EXISTS power;
CREATE DATABASE power;
USE power;

CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);

CREATE TABLE d1001 USING meters TAGS ("California.SanFrancisco", 2);
CREATE TABLE d1002 USING meters TAGS ("California.SanFrancisco", 3);
CREATE TABLE d1003 USING meters TAGS ("California.LosAngeles", 2);
CREATE TABLE d1004 USING meters TAGS ("California.LosAngeles", 3);

創建流

create stream current_stream into current_stream_output_stb as select _wstart as start, _wend as end, max(current) as max_current from meters where voltage <= 220 interval (5s);

寫入數據

insert into d1001 values("2018-10-03 14:38:05.000", 10.30000, 219, 0.31000);
insert into d1001 values("2018-10-03 14:38:15.000", 12.60000, 218, 0.33000);
insert into d1001 values("2018-10-03 14:38:16.800", 12.30000, 221, 0.31000);
insert into d1002 values("2018-10-03 14:38:16.650", 10.30000, 218, 0.25000);
insert into d1003 values("2018-10-03 14:38:05.500", 11.80000, 221, 0.28000);
insert into d1003 values("2018-10-03 14:38:16.600", 13.40000, 223, 0.29000);
insert into d1004 values("2018-10-03 14:38:05.000", 10.80000, 223, 0.29000);
insert into d1004 values("2018-10-03 14:38:06.500", 11.50000, 221, 0.35000);

查詢以觀察結果

taos> select start, end, max_current from current_stream_output_stb;
          start          |           end           |     max_current      |
===========================================================================
 2018-10-03 14:38:05.000 | 2018-10-03 14:38:10.000 |             10.30000 |
 2018-10-03 14:38:15.000 | 2018-10-03 14:38:20.000 |             12.60000 |
Query OK, 2 rows in database (0.018762s)

寫在最后

如(ru)果大(da)家能夠運(yun)用(yong)好 TDengine 3.0 提供(gong)的流(liu)計算引(yin)擎,就不需要再(zai)部署其(qi)他的第三方流(liu)處理系統,這(zhe)樣一來(lai),不僅降低了(le)系統的復雜(za)度(du),還大(da)大(da)減少了(le)研發(fa)和運(yun)維成本。在實際(ji)操(cao)作中(zhong)應用(yong) TDengine 流(liu)計算引(yin)擎時(shi),上述的詳(xiang)細(xi)語法會帶給(gei)你很(hen)多幫助,如(ru)果還產(chan)生了(le)其(qi)他更為復雜(za)的應用(yong)問題,你也可以(yi)進入 TDengine 社區(qu)向技術(shu)人(ren)員(yuan)尋求幫助。