无码人妻精品一区二区三18禁,影音先锋男人AV橹橹色,污污污污污污www网站免费,日韩成人av无码一区二区三区,欧美性受xxxx狂喷水

TDengine 3.0.4.0 重要特性之 Python UDF 實戰分享

Bo Ding

2023-06-01 / ,

TDengine 3.0.4.0 發(fa)布了(le)一個重要特(te)性: 支(zhi)持(chi)用 Python 語言編寫的(de)自定義函數(UDF)。這個特(te)性極(ji)大節省了(le) UDF 開發(fa)的(de)時(shi)間成本。作為(wei)時(shi)序(xu)大數據(ju)處理(li)平臺(tai),不支(zhi)持(chi) Python UDF 顯然是(shi)不完(wan)整的(de)。UDF 在實(shi)現自己(ji)業務中特(te)有的(de)邏(luo)輯(ji)時(shi)非(fei)常有用,比如量化(hua)交(jiao)易場景計算(suan)自研的(de)交(jiao)易信(xin)號。本文內(nei)容由淺入深包括 4 個示(shi)例程序(xu):

  1. 定義一個只接收一個整數的標量函數: 輸入 n, 輸出 ln(n^2 + 1)。
  2. 定義一個接收 n 個整數的標量函數, 輸入 (x1, x2, …, xn), 輸出每個值和它們的序號的乘積的和: x1 + 2 * x2 + … + n * xn。
  3. 定義一個標量函數,輸入一個時間戳,輸出距離這個時間最近的下一個周日。完成這個函數要用到第三方庫 moment。我們在這個示例中講解使用第三方庫的注意事項。
  4. 定義一個聚合函數,計算某一列最大值和最小值的差, 也就是實現 TDengien 內置的 spread 函數。

同時也包(bao)含大(da)量實用的(de) debug 技巧。

本文假設你用的(de)是(shi) Linux 系統,且已安(an)裝(zhuang)好了 TDengine 3.0.4.0+ 和(he) Python 3.x。

示例一: 最簡單的 UDF

編(bian)寫(xie)一個只(zhi)接收一個整數(shu)的(de) UDF 函(han)數(shu): 輸入 n, 輸出 ln(n^2 + 1)。

首先編寫一個 Python 文件,存在系統某個目錄,比如 /root/udf/myfun.py 內容如下:

from math import log

def init():
    pass

def destroy():
    pass

def process(block):
    rows, _ = block.shape()
    return [log(block.data(i, 0) ** 2 + 1) for i in range(rows)]

這個文件(jian)包(bao)含 3 個函數(shu), init 和 destroy 都是(shi)空函數(shu),它(ta)們(men)是(shi) UDF 的生命周期函數(shu),即使什么都不做也要定義。最關鍵的是(shi) process 函數(shu), 它(ta)接受一(yi)個數(shu)據塊(kuai),這個數(shu)據塊(kuai)對(dui)象有兩個方法:

  1. shape() 返回數據塊的行數和列數
  2. data(i, j) 返回 i 行 j 列的數據

標量函數(shu)(shu)的(de) process 方法傳入的(de)數(shu)(shu)據(ju)塊有多少(shao)行,就需要(yao)返回多少(shao)個數(shu)(shu)據(ju)。上述(shu)代碼中我(wo)們(men)忽略的(de)列數(shu)(shu),因為我(wo)們(men)只想對(dui)每行的(de)第(di)一(yi)個數(shu)(shu)做計(ji)算。

接下來我們在時序數據庫(Time Series Database) TDengine 中創(chuang)建對應的(de) UDF 函數,執行下面語句:

create function myfun as '/root/udf/myfun.py' outputtype double language 'Python'
 taos> create function myfun as '/root/udf/myfun.py' outputtype double language 'Python';
Create OK, 0 row(s) affected (0.005202s)

看起來很順利,接下(xia)來 show 一(yi)下(xia)系(xi)統中(zhong)所有(you)的(de)自定義函(han)數,確認創建成(cheng)功:

taos> show functions;
              name              |
=================================
 myfun                          |
Query OK, 1 row(s) in set (0.005767s)

接下(xia)來(lai)就來(lai)測(ce)試(shi)一下(xia)這個函(han)數,測(ce)試(shi)之前先執行下(xia)面的 SQL 命(ming)令,制造(zao)些測(ce)試(shi)數據:

create database test;
create table t(ts timestamp, v1 int, v2 int, v3 int);
insert into t values('2023-05-01 12:13:14', 1, 2, 3);
insert into t values('2023-05-03 08:09:10', 2, 3, 4);
insert into t values('2023-05-10 07:06:05', 3, 4, 5);

測試 myfun 函數:

taos> select myfun(v1, v2) from t;

DB error: udf function execution failure (0.011088s)

不幸的是執(zhi)行失敗了(le),什么原因呢?

查看 udfd 進程的日志: /var/log/taos/udfd.log 發現以(yi)下(xia)錯誤信息:

05/24 22:46:28.733545 01665799 UDF ERROR can not load library libtaospyudf.so. error: operation not permitted
05/24 22:46:28.733561 01665799 UDF ERROR can not load python plugin. lib path libtaospyudf.so

錯誤很明確(que):沒有加載到 Python 插件(jian) libtaospyudf.so, 看官(guan)方文(wen)檔原來是(shi)要先安裝 taospyudf 這個 Python 包。 于(yu)是(shi):

pip3 install taospyudf

安裝(zhuang)(zhuang)過(guo)程會(hui)編譯(yi)(yi) C++ 源碼,因此系統(tong)上要有 cmake 和 gcc。編譯(yi)(yi)生(sheng)成的 libtaospyudf.so 文件自(zi)動會(hui)被復(fu)制到 /usr/local/lib/ 目錄,因此如果是非 root 用戶,安裝(zhuang)(zhuang)時需加 sudo。安裝(zhuang)(zhuang)完可(ke)以(yi)檢查這個(ge)目錄是否有了(le)這個(ge)文件:

root@slave11 ~/udf $ ls -l /usr/local/lib/libtaos*
-rw-r--r-- 1 root root 671344 May 24 22:54 /usr/local/lib/libtaospyudf.so

這時再去執行 SQL 測試 UDF,會發現報同樣的錯誤,原因是新安裝的共享庫還未生效,還需執行命令:

ldconfig

此時再去測(ce)試(shi) UDF,終于成功了:

taos> select myfun(v1) from t;
         myfun(v1)         |
============================
               0.693147181 |
               1.609437912 |
               2.302585093 |

至(zhi)此,我們(men)完成了第一(yi)個 UDF ??,并學會了簡(jian)單(dan)的(de) debug 方法。

示例一改進:異常處理

上(shang)面的(de) myfun 雖然測(ce)試測(ce)試通過了,但是有兩個缺點:

  1. 這個標量函數只接受 1 列數據作為輸入,如果用戶傳入了多列也不會拋異常。我們期望改成:如果用戶輸入多列,則提醒用戶輸入錯誤,這個函數只接收 1 個參數。
taos> select myfun(v1, v2) from t;
       myfun(v1, v2)       |
============================
               0.693147181 |
               1.609437912 |
               2.302585093 |
  1. 沒有處理 null 值, 如果用戶輸入了 null 值則會拋異常終止執行。我們期望改成:如果輸入是 null,則輸出也是 null, 不影響后續執行。

因此 process 函數改進如下:

def process(block):
    rows, cols = block.shape()
    if cols > 1:
        raise Exception(f"require 1 parameter but given {cols}")
    return [ None if block.data(i, 0) is None else log(block.data(i, 0) ** 2 + 1) for i in range(rows)]

然后執(zhi)行下面的(de)語句更新已有的(de) UDF:

create or replace function myfun as '/root/udf/myfun.py' outputtype double language 'Python';

再(zai)傳入 myfun 兩個參數,就會執行失敗了(le),

taos> select myfun(v1, v2) from t;

DB error: udf function execution failure (0.014643s)

但遺憾的是我們自定義的異常信息沒有展示給用戶,而是在插件的日志文件 /var/log/taos/taospyudf.log 中:

2023-05-24 23:21:06.790 ERROR [1666188] [doPyUdfScalarProc@507] call pyUdfScalar proc function. context 0x7faade26d180. error: Exception: require 1 parameter but given 2

At:
  /var/lib/taos//.udf/myfun_3_1884e1281d9.py(12): process

至(zhi)此,我們學(xue)會了如何更新 UDF,并(bing)查看(kan) UDF 輸出的錯誤日志。

(注:如果 UDF 更(geng)新后(hou)未(wei)生效,可以重啟(qi) taosd 試試,TDengine 3.0.5.0 及(ji)以后(hou)的(de)版本會確保不(bu)重啟(qi) UDF 更(geng)新就能生效)

示例二:接收 n 個參數的 UDF

編(bian)寫一(yi)個 UDF:輸入(ru)(x1, x2, …, xn), 輸出每(mei)個值和它們的序號的乘積的和: 1 * x1 + 2 * x2 + … + n * xn。如(ru)果(guo) x1 至 xn 中包含 null,則結果(guo)為(wei) null。

這個示例與(yu)示例一的區別是,可以接受任意(yi)多列(lie)作為輸入,且要處理每一列(lie)的值(zhi)。編寫(xie) UDF 文件 /root/udf/nsum.py:

def init():
    pass


def destroy():
    pass


def process(block):
    rows, cols = block.shape()
    result = []
    for i in range(rows):
        total = 0
        for j in range(cols):
            v = block.data(i, j)
            if v is None:
                total = None
                break
            total += (j + 1) * block.data(i, j)
        result.append(total)
    return result

創建 UDF:

create function nsum as '/root/udf/nsum.py' outputtype double language 'Python';

測試:

taos> insert into t values('2023-05-25 09:09:15', 6, null, 8);
Insert OK, 1 row(s) affected (0.003675s)

taos> select ts, v1, v2, v3,  nsum(v1, v2, v3) from t;
           ts            |     v1      |     v2      |     v3      |     nsum(v1, v2, v3)      |
================================================================================================
 2023-05-01 12:13:14.000 |           1 |           2 |           3 |              14.000000000 |
 2023-05-03 08:09:10.000 |           2 |           3 |           4 |              20.000000000 |
 2023-05-10 07:06:05.000 |           3 |           4 |           5 |              26.000000000 |
 2023-05-25 09:09:15.000 |           6 |        NULL |           8 |                      NULL |
Query OK, 4 row(s) in set (0.010653s)

示例三: 使用第三方庫

編寫(xie)一(yi)(yi)個 UDF,輸入一(yi)(yi)個時間戳,輸出距離(li)這個時間最(zui)近的下(xia)一(yi)(yi)個周日(ri)。比如今天是 2023-05-25, 則(ze)下(xia)一(yi)(yi)個周日(ri)是 2023-05-28。

完成這(zhe)個函數(shu)要用到第三方(fang)庫(ku) momen。先安裝這(zhe)個庫(ku):

pip3 install moment

然后編寫 UDF 文件 /root/udf/nextsunday.py

import moment


def init():
    pass


def destroy():
    pass


def process(block):
    rows, cols = block.shape()
    if cols > 1:
        raise Exception("require only 1 parameter")
    if not type(block.data(0, 0)) is int:
        raise Exception("type error")
    return [moment.unix(block.data(i, 0)).replace(weekday=7).format('YYYY-MM-DD')
            for i in range(rows)]

UDF 框架會將 TDengine 的 timestamp 類(lei)型映射為(wei) Python 的 int 類(lei)型,所(suo)以這(zhe)(zhe)個函(han)數只接受一(yi)個表示毫秒數的整數。process 方法先做(zuo)參數檢查,然后用 moment 包(bao)替換時間的星期(qi)(qi)為(wei)星期(qi)(qi)日,最后格式(shi)化(hua)輸出。輸出的字(zi)符串長度是固定的10個字(zi)符長,因(yin)此可以這(zhe)(zhe)樣創建 UDF 函(han)數:

create function nextsunday as '/root/udf/nextsunday.py' outputtype binary(10) language 'Python';

此時測試函數,如(ru)果你是用 systemctl 啟動(dong)的 taosd,肯定會遇(yu)到(dao)錯誤(wu):

taos> select ts, nextsunday(ts) from t;

DB error: udf function execution failure (1.123615s)
 tail -20 taospyudf.log  
2023-05-25 11:42:34.541 ERROR [1679419] [PyUdf::PyUdf@217] py udf load module failure. error ModuleNotFoundError: No module named 'moment'

這是因(yin)為(wei) “moment” 所在(zai)位置不在(zai) python udf 插件默認(ren)的庫搜(sou)索路徑中。怎么確認(ren)這一點呢?通過(guo)以下命令搜(sou)索 taospyudf.log:

grep 'sys path' taospyudf.log  | tail -1
2023-05-25 10:58:48.554 INFO  [1679419] [doPyOpen@592] python sys path: ['', '/lib/python38.zip', '/lib/python3.8', '/lib/python3.8/lib-dynload', '/lib/python3/dist-packages', '/var/lib/taos//.udf']

發現 python udf 插件默認搜索的第三方庫安裝路徑是: /lib/python3/dist-packages,而 moment 默認安裝到了 /usr/local/lib/python3.8/dist-packages。下面我(wo)們(men)修改 python udf 插(cha)件默認的(de)庫搜索路徑,把當前 python 解釋器默認使用的(de)庫路徑全部加進去。

先打開 python3 命(ming)令行,查看當前的 sys.path

>>> import sys
>>> ":".join(sys.path)
'/usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/dist-packages:/usr/lib/python3/dist-packages'

復制上面腳(jiao)本的(de)(de)輸出的(de)(de)字符串(chuan),然后(hou)編輯 /var/taos/taos.cfg 加入以下配置:

UdfdLdLibPath /usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/dist-packages:/usr/lib/python3/dist-packages

保存后執行 systemctl restart taosd, 再測試(shi)就不報錯了(le):

taos> select ts, nextsunday(ts) from t;
           ts            | nextsunday(ts) |
===========================================
 2023-05-01 12:13:14.000 | 2023-05-07     |
 2023-05-03 08:09:10.000 | 2023-05-07     |
 2023-05-10 07:06:05.000 | 2023-05-14     |
 2023-05-25 09:09:15.000 | 2023-05-28     |
Query OK, 4 row(s) in set (1.011474s)

示例四:定義聚合函數

編(bian)寫一個聚合函數(shu),計算某一列最大值(zhi)和最小值(zhi)的差。

聚合函數與標量函數的區別是:標量函數是多行輸入對應多個輸出,聚合函數是多行輸入對應一個輸出。聚合函數的執行過程有點像經典的 map-reduce 框架的執行過程,框架把數據分成若干塊,每個 mapper 處理一個塊,reducer 再把 mapper 的結果做聚合。不一樣的地方在于,對于 TDengine Python UDF 中的 reduce 函數既有 map 的功能又有 reduce 的功能。reduce 函數接受兩個參數:一個是自己要處理的數據,一個是別的任務執行 reduce 函數的處理結果。如下面的示例 /root/udf/myspread.py:

import io
import math
import pickle

LOG_FILE: io.TextIOBase = None


def init():
    global LOG_FILE
    LOG_FILE = open("/var/log/taos/spread.log", "wt")
    log("init function myspead success")


def log(o):
    LOG_FILE.write(str(o) + '\n')


def destroy():
    log("close log file: spread.log")
    LOG_FILE.close()


def start():
    return pickle.dumps((-math.inf, math.inf))


def reduce(block, buf):
    max_number, min_number = pickle.loads(buf)
    log(f"initial max_number={max_number}, min_number={min_number}")
    rows, _ = block.shape()
    for i in range(rows):
        v = block.data(i, 0)
        if v > max_number:
            log(f"max_number={v}")
            max_number = v
        if v < min_number:
            log(f"min_number={v}")
            min_number = v
    return pickle.dumps((max_number, min_number))


def finish(buf):
    max_number, min_number = pickle.loads(buf)
    return max_number - min_number

在這個示例中我們不(bu)光定義了一(yi)個聚合函數(shu),還添加記錄執行日(ri)志的功能,講解如下:

  1. init 函數不再是空函數,而是打開了一個文件用于寫執行日志
  2. log 函數是記錄日志的工具,自動將傳入的對象轉成字符串,加換行符輸出
  3. destroy 函數用來在執行結束關閉文件
  4. start 返回了初始的 buffer,用來存聚合函數的中間結果,我們把最大值初始化為負無窮大,最小值初始化為正無窮大
  5. reduce 處理每個數據塊并聚合結果
  6. finish 函數將最終的 buffer 轉換成最終的輸出

執行下(xia)面(mian)的(de) SQL語(yu)句(ju)創建對應的(de) UDF:

create or replace aggregate function myspread as '/root/udf/myspread.py' outputtype double bufsize 128 language 'Python';

這個 SQL 語句與創建標量函數的(de) SQL 語句有兩個重要區別(bie):

  1. 增加了 aggregate 關鍵字
  2. 增加了 bufsize 關鍵字,用來指定存儲中間結果的內存大小,這個數值可以大于實際使用的數值。本例中間結果是兩個浮點數組成的 tuple,序列化后實際占用大小只有 32 個字節,但指定的 bufsize 是128,可以用 python 命令行打印實際占用的字節數
>>> len(pickle.dumps((12345.6789, 23456789.9877)))
32

測試這個函(han)數(shu),可以(yi)看到 myspread 的(de)輸(shu)(shu)出(chu)結果和內置的(de) spread 函(han)數(shu)的(de)輸(shu)(shu)出(chu)結果是一致的(de)。

taos> select myspread(v1) from t;
       myspread(v1)        |
============================
               5.000000000 |
Query OK, 1 row(s) in set (0.013486s)

taos> select spread(v1) from t;
        spread(v1)         |
============================
               5.000000000 |
Query OK, 1 row(s) in set (0.005501s)

最后,查看我們自己打印的執行日志(zhi),從日志(zhi)可以看出,reduce 函數(shu)被執行了(le) 3 次(ci)。執行過(guo)程中 max 值被更新(xin)了(le) 4 次(ci), min 值只(zhi)被更新(xin) 1 次(ci)。

root@slave11 /var/log/taos $ cat spread.log
init function myspead success
initial max_number=-inf, min_number=inf
max_number=1
min_number=1
initial max_number=1, min_number=1
max_number=2
max_number=3
initial max_number=3, min_number=1
max_number=6
close log file: spread.log

通過這個示例,我們學會(hui)了(le)如何定義(yi)聚合(he)函(han)數,并打(da)印(yin)自定義(yi)的日志信息。

要點總結

1.創建標量函數的語(yu)法

CREATE FUNCTION function_name AS library_path OUTPUTTYPE output_type LANGUAGE 'Python';

OUTPUTTYPE 對應的是 TDengine 的數據類型,如 TIMESTAMP, BIGINT, VARCHAR(64), 類型映射關系見官方(fang)文檔(dang)://docs.yakult-sh.com.cn/develop/udf/。

2.創建聚(ju)合函(han)數的語(yu)法

CREATE AGGREGATE FUNCTION function_name library_path OUTPUTTYPE output_type LANGUAGE 'Python';

3.更新 UDF 的語法

更新標量函數

CREATE OR REPLACE FUNCTION function_name AS OUTPUTTYPE int LANGUAGE 'Python';

更新聚合函數

CREATE OR REPLACE AGGREGATE FUNCTION function_name AS OUTPUTTYPE BUFSIZE buf_size int LANGUAGE 'Python';

注意:如果加(jia)了 “AGGREGATE” 關鍵字,更新之后(hou)函(han)(han)數(shu)將被當(dang)作聚(ju)合函(han)(han)數(shu),無論之前(qian)(qian)是什么類型的(de)函(han)(han)數(shu)。相(xiang)反,如果沒有加(jia) “AGGREGATE” 關鍵字,更新之后(hou)的(de)函(han)(han)數(shu)將被當(dang)作標(biao)量函(han)(han)數(shu),無論之前(qian)(qian)是什么類型的(de)函(han)(han)數(shu)。

4.同(tong)名的(de) UDF 每更(geng)新(xin)一次(ci),版本號會增(zeng)加 1。 用

select * from ins_functions \G;     

可查(cha)看(kan) UDF 的完整信息,包括(kuo) UDF 的源碼。

5.查看和刪除已(yi)有的 UDF

SHOW functions;
DROP FUNCTION function_name;

6.安裝 taospyudf 動(dong)態(tai)庫

sudo pip3 install taospyudf

安裝過程會從源碼編譯出共享庫 libtaospyudf.so,因此系統上要有 cmake 和 gcc,編譯后這個庫會被安裝到 /usr/local/lib。安裝完別忘了執行命令 ldconfig 更新系統動態鏈接庫。

7.調試 Python UDF 的(de)兩個重要日志(zhi)文件

  • /var/log/taos/udfdlog.* 這個文件是 UDF 框架的日志。框架負責加載各語言 UDF 的插件,執行 UDF 的生命周期函數
  • /var/log/taos/taospyudf.log 這個文件是 libtaospyudf.so 輸出的日志,每個文件最大 50M,最多保留 5 個。

8.定義標量函數最重要是要實現 process 函數,同時必須定義 init 和 destroy 函數即使什么都不做

def init():
  pass
  
def process(block: datablock) -> tuple[output_type]:
    rows, cols = block.shape()
    result = []
    for i in range(rows):
        for j in range(cols):
            cell_data = block.data(i, j)
            # your logic here
    return result

def destroy():
  pass

9.定(ding)義聚合函數(shu)最重要是(shi)要實現 start, reduce 和 finish,同樣必須定(ding)義 init 和 destroy 函數(shu)。

def init():
def destroy():
def start() -> bytes:
def reduce(inputs: datablock, buf: bytes) -> bytes
def finish(buf: bytes) -> output_type:

start 生(sheng)成最初結(jie)(jie)(jie)果(guo) buffer,然后輸(shu)入數(shu)據會被(bei)分為(wei)多個行數(shu)據塊,對每(mei)個數(shu)據塊 inputs 和當前中間結(jie)(jie)(jie)果(guo) buf 調(diao)用 reduce,得到新的中間結(jie)(jie)(jie)果(guo),最后再(zai)調(diao)用 finish 從中間結(jie)(jie)(jie)果(guo) buf 產生(sheng)最終輸(shu)出。

10.使用(yong)第三方(fang) python 庫(ku)。

使用第三方庫需要檢查這個庫是否安裝到了 Python UDF 插件默認的庫搜索路徑,如果沒有需要修改 taos.cfg, 添加 UdfdLdLibPath 配(pei)置,庫路(lu)徑用(yong)冒號分隔。

11.UDF 內無法通過 print 函(han)數輸(shu)出(chu)日志,需要自己寫文件或用 python 內置的 logging 庫(ku)寫文件。