本文介紹了一個用于操作 TDengine 的 Python ORM 庫。本文的預期讀者是,需要使用 Python 語言操作 TDengine 數據庫的開發人員。
項目地址:
一、什么是 ORM ?
ORM 就是對象關系映射(Object Relational Mapping),是一種程序設計技術,用于實現面向對象編程語言里不同類型系統的數據之間的轉換。從效果上說,它其實是創建了一個可在編程語言里使用的“虛擬對象數據庫”。簡單說來就是,通過建立類與數據庫表,對象與數據庫數據條目的對應關系。從而可以通過編程語言的數據類型操作數據庫。
二、為 TDengine 開發 ORM 庫的動因
作為一個使用 Python 作為主力編程語言的開發者,筆者經常要編寫操作各種數據庫的代碼。對于鍵值對類型(如:Redis)或者文檔類型(如:MongoDB)的數據庫,Python 生態都提供了很好的第三方連接庫。而對于最常使用的關系型數據( 如:MySQL、PostgreSQL),Python 則提供了SQLAlchemy、Peewee 等 ORM 第三方庫的解決方案。所以,筆者日常工作中,需要手工拼接 SQL 查詢語句的場景非常少 ( 甚至慢慢忘記了這項技能)。
近來,筆者需要帶領團隊完成一個智能電力系統的項目。在技術選型過程中發現了優秀的物聯網大數據平臺 TDengine 。經過測試和評估發現,無論從超高性能和穩定性、還是簡潔的設計、開源的理念。TDengine 都非常適合作為智能電力系統的基礎平臺使用。但是,在使用過程中,我們發現了一個比較棘手的問題。那就是:由于 TDengine 誕生不久,相比較其他已經發展很多年的其他數據庫平臺,周邊的相關生態軟件還略少一些。特別是,蘋果操作系統 OS X 下暫時沒有原生連接器可用,寫好的程序需要拿到 Linux 上去調試。這對于被“寵壞”的 Python 程序員來講真得沒法適應。而且考慮到筆者團隊中其他程序員都習慣了 ORM 的操作方式,對原始 SQL 并不熟悉。所以,筆者意識到:如果使用原生的連接器進行開發,將會遇到很多困難。于是就開始了 TDengine 的開源 ORM 庫的開發。一方面,可以幫助團隊更高效的完成系統開發工作。另外一方面,也可以為幫助 TDengine 更好的完善生態工具鏈。

三、如何安裝和使用
3.1 簡介
- 需要 Python 3.0版本以上
- 在 TDengine 2.0.8 版本測試通過
- 解決 Mac 操作系統下沒有原生 Python 連接器的問題
- 極大的降低了 Python 程序員使用 TDengine 技術門檻
- 可以方便的將數據轉換到 numpy 與 pandas
由于目前 TDengine 沒有提供 Mac 操作系統下的原生 client , 為保證庫的兼容性,目前 crown 庫底層使用的 RESTful 接口進行連接。以后的版本中,筆者將提供在 Windows 和 Linux 下的原生連接器接口可供配置使用。
項目地址:
3.2 安裝
crown 庫像其他 Python 第三方庫一樣,可以通過 pip ,輕松安裝最新版本:
pip install crown
還可以通過git安裝,使用方法:
git clone //github.com/machine-w/crown.gitcd crowmpython setup.py install
3.3 簡單使用
1. 連接數據庫
使用 crown 連接 TDengine ,只需要提供 taos RESTful 服務的地址、端口號、以及需要操作的數據庫名。然后即可使用 TdEngineDatabase 類新建一個數據庫對象。以下為連接數據庫的例子代碼:
from crown import * #導入庫
DATABASENAME = 'taos_test' #數據庫名
HOST = 'localhost'
PORT = 6041
db = TdEngineDatabase(DATABASENAME) # 默認端口 6041,默認用戶名:root,默認密碼:taosdata
#如不使用默認參數,可以使用下面的方法提供參數
# db = TdEngineDatabase(DATABASENAME,host=HOST,port=PORT,user='yourusername',passwd='yourpassword')
# 一般情況我們使用connect方法嘗試連接數據庫,如果當前數據庫不存在,則會自動建庫。
db.connect()
# 連接數據庫后,db對象后會自動獲取全部數據庫信息,以字典的形式保存在屬性databases中。
print(db.databases)
#當然也可以使用手動建庫方法建立數據庫。
db.create_database(safe=True) #參數safe為True表示:如果庫存在,則跳過建庫指令。
#可選字段:建庫時配置數據庫參數,具體字段含義請參考tdengine文檔。
# db.create_database(safe=True,keep= 100,comp=0,replica=1,quorum=2,blocks=115)
#可以通過調用alter_database方法修改數據庫參數。
db.alter_database(keep= 120,comp=1,replica=1,quorum=1,blocks=156)
#刪除當前數據庫方法drop_database
db.drop_database(safe=True) #參數safe:如果庫不存在,則跳過刪庫指令。
理論上使用 crown 庫操作 TDengine ,所有的數據庫操作都無需手工拼裝 SQL 語句,但是為了應對比較特殊的應用場景, crown 庫也提供了執行原始 SQL 語句的功能。
#通過數據庫對象的raw_sql方法直接執行sql語句,語句規則與TDengine restful接口要求一致。
res = db.raw_sql('select c1,c2 from taos_test.member1')
print(res)
print(res.head)
print(res.rowcount)
#返回的對象為二維數據。res.head屬性為數組對象,保存每一行數據的代表的列名。res.rowcount屬性保存返回行數。
# res: [[1.2,2.2],[1.3,2.1],[1.5,2.0],[1.6,2.1]]
# res.head: ['c1','c2']
# res.rowcount: 4
2. 建表刪表操作
建好數據庫對象后,就可以通過為 model 類建立子類的方式定義并新建數據庫表(使用方法和 Python 中常用的 ORM 庫類似),新建的一個類對應數據庫中的一張表,類的一個對象對應表中一條數據。以下示例創建一個簡單的數據庫表:
# 表模型類繼承自Model類,每個模型類對應數據庫中的一張表,模型類中定義的每個Field,對應表中的一列,
# 如果不明確定義主鍵類型字段, 會默認添加一個主鍵,主鍵名為 “ts”
class Meter1(Model):
cur = FloatField() #如果省略列名參數,則使用屬性名作為列名
curInt = IntegerField(db_column='c2')
curDouble = DoubleField(db_column='c3')
desc = BinaryField(db_column='des')
# custom_ts = PrimaryKeyField() # 如果定義了主鍵列,則使用主鍵列名作為主鍵
class Meta: # Meta子類中定義模型類的配置信息
database = db # 指定之前建的數據庫對象
db_table = 'meter1' # 指定表名
crown 支持的字段類型與 TDengine 字段類型的對應關系:

定義好表模型后,即可調用類方法 create_table 進行建表操作:
# create_table運行成功返回True,失敗則raise錯誤
Meter1.create_table(safe=True) #safe:如果表存在,則跳過建表指令
#也可以通過數據庫對象的create_table方法進行建表。
# db.create_table(Meter1,safe=True)
# drop_table方法進行刪除表操作,運行成功返回True,失敗則raise錯誤
Meter1.drop_table(safe=True) #safe:如果表不存在,則跳過刪表指令
#同樣可以通過數據庫對象刪表,功能同上
# db.drop_table(Meter1,safe=True)
#table_exists方法查看表是否存在,存在返回True,不存在返回:False
Meter1.table_exists()
3. 數據插入
可以通過新建的數據表類 Meter1 新建數據對象并傳入具體字段的數值,然后使用對象的 save 方法插入數據。
也可以直接使用 Meter1 類的類方法 insert 直接插入數據。下面的例子分別演示了這兩種方法:
import time
#方法一
for i in range(1,101):
#使用模型類實例化的每個對象對應數據表中的每一行,可以通過傳入屬性參數的方式給每一列賦值
m = Meter1(cur = 1/i,curInt=i,curDouble=1/i+10,desc='g1',ts= datetime.datetime.now())
time.sleep(1)
#使用對象的save方法將數據存入數據庫
m.save()
print(Meter1.select().count()) # 結果:100
#方法二
for i in range(1,101):
#也可以直接使用模型類的insert方法插入數據。
Meter1.insert(cur = 1/i,curInt=i,curDouble=1/i+10,desc='g1',ts= datetime.datetime.now() - datetime.timedelta(seconds=(102-i)))
print(Meter1.select().count()) # 結果:101
如果不傳入時間屬性 ts ,則會以當前時刻為默認值傳入
Meter1.insert(cur = 1/i,curInt=i,curDouble=1/i+10,desc='g1')
m = Meter1(cur = 1/i,curInt=i,curDouble=1/i+10,desc='g1')
4. 數據查詢
crown 提供了豐富的數據查詢功能,由于篇幅的原因,這里只介紹筆者項目中比較常用的幾種查詢。了解更多的查詢使用方法,請查看項目文檔:
單條數據查詢:使用 Meter1 類的 select() 方法可以獲取表的查詢對象,查詢對象的 one 方法可以獲取滿足條件的第一條數據。
#獲取一條數據:使用select()類方法獲取查詢字段(參數留空表示取全部字段),然后可以鏈式使用one方法獲取第一條數據
res = Meter1.select().one()
print(res.desc,res.curDouble,res.curInt,res.cur,res.ts)
#select函數中可以選擇要讀取的字段
res = Meter1.select(Meter1.cur,Meter1.desc).one()
print(res.desc,res.curDouble,res.curInt,res.cur,res.ts)
多條數據查詢:使用 Meter1 類的 select() 方法可以獲取表的查詢對象,查詢對象的 all 方法可以獲取滿足條件的全部數據。
#使用select()類方法獲取查詢字段(參數留空表示取全部字段),然后可以鏈式使用all方法獲取全部數據
res_all = Meter1.select().all()
for res in res_all:
print(res.desc,res.curDouble,res.curInt,res.cur,res.ts)
#select函數中可以選擇要讀取的字段
res_all = Meter1.select(Meter1.cur,Meter1.desc).all()
for res in res_all:
print(res.desc,res.curDouble,res.curInt,res.cur,res.ts)
讀取數據導入 numpy 和 pandas :雖然 TDengine 提供了很多聚合和統計函數,但是把時序數據導入 numpy 或 pandas 等數據分析組件中進行處理的情況也是很常見的操作。
下面演示如何通過 crown 把結果數據導入 numpy 和 pandas :
#導入numpy
#通過all_raw函數可以獲取二維數組格式的數據查詢結果。結果每列代表的標題保存在結果對象的head屬性中。
raw_results = Meter1.select(Meter1.cur,Meter1.curInt,Meter1.curDouble).all_raw()
#可以很方便的將結果轉換為numpy數組對象
np_data = np.array(raw_results)
print(np_data)
print(raw_results.head)
#導入pandas
raw_results = Meter1.select().all_raw()
#使用以下方法,可以輕松的將數據導入pandas,并且使用時間點作為index,使用返回的數據標題作為列名。
pd_data = pd.DataFrame(raw_results,columns=raw_results.head).set_index('ts')
print(pd_data)
選擇列四則運算:
#使用select()類方法獲取查詢字段時,可以返回某列或多列間的值加、減、乘、除、取余計算結果(+ - * / %)
res_all = Meter1.select((Meter1.curDouble+Meter1.cur),Meter1.ts).all()
for res in res_all:
#返回的結果對象可以用get方法獲取原始計算式結果
print(res.get(Meter1.curDouble+Meter1.cur),res.ts)
#字段別名
#給運算式起別名(不僅運算式,其他放在select函數中的任何屬性都可以使用別名)
res_all = Meter1.select(((Meter1.curDouble+Meter1.cur)*Meter1.curDouble).alias('new_name'),Meter1.ts).all()
for res in res_all:
#使用別名獲取運算結果
print(res.new_name,res.ts)
where函數:
#可以在select函數后鏈式調用where函數進行條件限
one_time =datetime.datetime.now() - datetime.timedelta(hours=10)
ress = Meter1.select().where(Meter1.ts > one_time).all()
#限定條件可以使用 > < == >= <= != and or ! 等。字符類型的字段可以使用 % 作為模糊查詢(相當于like)
ress = Meter1.select().where(Meter1.cur > 0 or Meter1.desc % 'g%').all()
#where函數可以接收任意多參數,每個參數為一個限定條件,參數條件之間為"與"的關系。
ress = Meter1.select().where(Meter1.cur > 0, Meter1.ts > one_time, Meter1.desc % '%1').all()
分頁與limit:
#可以在select函數后鏈式調用paginate函數進行分頁操作,以下例子為取第6頁 每頁5條數據。
ress_1 = Meter1.select().paginate(6,page_size=5).all()
ress_2 = Meter1.select().paginate(6).all() #默認page_size為20
#可以在select函數后鏈式調用limit函數和offset函數條數限制和定位操作。
ress_3 = Meter1.select().limit(2).offset(5).all()
ress_4 = Meter1.select().limit(2).all()
排序:目前 TDengine 只支持主鍵排序
#可以在select函數后鏈式調用desc或者asc函數進行時間軸的正序或者倒序查詢
res = Meter1.select().desc().one()
聚合函數:TDengine 提供了許多聚合函數可供使用,可以直接返回聚合結果,極大的提高數據聚合效率。crown 幾乎完全兼容 TDengine 全部的聚合函數,只需要調用對應的方法即可使用。
以下是使用的例子。
#count
count = Meter1.select().count() #統計行數
print(count) # 結果:100
count = Meter1.select().count(Meter1.desc) #統計指定列非空行數
print(count) # 結果:90
#avg(sum,stddev,min,max,first,last,last_row,spread使用方法與avg相同)
avg1 = Meter1.select().avg(Meter1.cur,Meter1.curDouble.alias('aa')) #可以同時獲取多列,并且可以使用別名
print(avg1.get(Meter1.cur.avg()),avg1.aa) #打印統計結果
#twa 必須配合where函數,且必須選擇時間段
twa1 = Meter1.select().where(Meter1.ts > datetime.datetime(2020, 11, 19, 15, 9, 12, 946118),Meter1.ts < datetime.datetime.now()).twa(Meter1.cur,Meter1.curDouble.alias('aa'))
print(twa1.get(Meter1.cur.twa()),avg1.aa) #打印統計結果
#diff
diffs = Meter1.select().diff(Meter1.curInt.alias('aa')) #diff目前只可以聚合一個屬性。
for diff1 in diffs:
print(diff1.aa,diff1.ts) # 時間點數據同時返回
#top(bottom函數使用方式相同)
# top函數需要提供要統計的屬性,行數,以及別名
tops = Meter1.select().top(Meter1.cur,3,alias='aa')
for top1 in tops:
print(top1.aa,top1.ts) # 時間點數據同時返回
tops = Meter1.select().top(Meter1.cur,3) # 可以不指定別名
for top1 in tops:
#不指定別名,需用使用get方法獲取屬性
print(top1.get(Meter1.cur.top(3)))
#percentile (apercentile函數使用方式相同)
#每個屬性參數為一個元組(數組),分別定義要統計的屬性,P值(P值取值范圍0≤P≤100),可選別名。
percentile1 = Meter1.select().percentile((Meter1.cur,1,'aa'),(Meter1.curDouble,2))
print(percentile1.aa)
#不指定別名,需用使用get方法獲取屬性
print(percentile1.get(Meter1.curDouble.percentile(2)))
#leastsquares
#每個屬性參數為一個元組(數組),分別定義要統計的屬性,start_val(自變量初始值),step_val(自變量的步長值),可選別名。
leastsquares1 = Meter1.select().leastsquares((Meter1.cur,1,1,'aa'),(Meter1.curDouble,2,2))
print(leastsquares1.aa) # 結果:{slop:-0.001595, intercept:0.212111}
#不指定別名,需用使用get方法獲取屬性
print(leastsquares1.get(Meter1.curDouble.leastsquares(2,2)))
注意:當前版本并不支持多表 join 查詢,需要多表查詢的情況請使用 raw_sql 函數,執行原始 SQL 語句。后期版本會補充 join 功能。
5.?超級表定義
超級表定義與普通表的區別在于繼承自 SuperModel 。而且,在 Meta 類中,可以定義標簽。超級表與普通表的查詢操作使用方式相同,以上介紹的所有方法也可以在超級表類中使用,查詢操作時標簽字段也可以當作普通字段一樣操作。
# 超級表模型類繼承自SuperModel類
class Meters(SuperModel):
cur = FloatField(db_column='c1')
curInt = IntegerField(db_column='c2')
curDouble = DoubleField(db_column='c3')
desc = BinaryField(db_column='des')
class Meta:
database = db
db_table = 'meters'
# Meta類中定義的Field,為超級表的標簽
location = BinaryField(max_length=30)
groupid = IntegerField(db_column='gid')
#建立超級表
Meters.create_table(safe=True)
#刪除超級表
Meters.drop_table(safe=True)
#查看超級表是否存在
Meters.supertable_exists()
6. 從超級表建立子表
對于數據插入操作,就需要從超級表中建立子表。可以使用 Meters 類的 create_son_table 方法創建子表。該方法返回一個子表對應的類對象。該對象可以和以上介紹的普通表類對象(Meter1)一樣使用。
#生成字表模型類的同時,自動在數據庫中建表。
SonTable_d3 = Meters.create_son_table('d3',location='beijing',groupid=3)
# SonTable_d3的使用方法和繼承自Modle類的模型類一樣。可以進行插入與查詢操作
SonTable_d3.table_exists()
#子表中插入數據
m = SonTable_d3(cur = 65.8,curInt=10,curDouble=1.1,desc='g1',ts = datetime.datetime.now())
m.save()
上面介紹了 TDengine 的 Python ORM 連接庫 crown 的基本安裝和使用方法。除了上面介紹的內容, crown 還提供了很多非常實用的功能,比如動態建表、根據表名獲取模型類、分組查詢等。有興趣的讀者可以前往 GitHub 上查看使用文檔。也歡迎在 GitHub 上多提寶貴意見與通報 bug 。筆者將持續維護該項目,努力提供更加豐富的功能和更加完備的文檔供大家使用。
結語
TDengine 作為優秀的國產開源軟件,擁有優雅的軟件設計和出色的性能表現。非常適合在物聯網大數據應用場景下作為數據基礎平臺使用。未來隨著物聯網行業的蓬勃發展, TDengine 也必將成為物聯網大數據基礎架構的一部分而備受全世界相關領域從業者的廣泛關注。筆者作為一個物聯網行業的一名普通開發人員,非常榮幸有機會開發和維護這樣一個 TDengine 周邊的開源小項目。希望通過這個項目可以讓更多的開發人員可以更加方便便捷的使用 TDengine ,提高工作效率。也希望能夠起到拋磚引玉的作用,鼓勵更多的開發者加入到開源項目開發中來,大家一起來為豐富 TDengine 的周邊生態做貢獻。
如果你有好的idea想和TDengine一起實現,歡迎成為Contributor Family的一員。點擊下方鏈接,加入我們!



























