本文介紹(shao)了 TAOS-JDBCDriver 訂閱功能的使用(yong)場景(jing)、使用(yong)方法(fa)和一些限制。本文的預期讀者(zhe)是(shi)基(ji)于 TAOS-JDBCDriver 開發各(ge)種應用(yong)的軟件開發人員。
如何使用TDengine中的訂閱功能?
TDengine的Java訂閱(yue)接口(kou),目前是與TAOS-JDBCDriver 的 API 配合使用(yong),相(xiang)關源碼(ma)(ma)文件(jian)也在(zai)TDengine JDBC驅動的源碼(ma)(ma)所在(zai)目錄下:和。與訂閱(yue)相(xiang)關的方(fang)法(fa)主要有以下三個,均在(zai)TSDBSubscribe.java中定義和實(shi)現:
subscribe
public long subscribe(String topic, String sql, boolean restart, int period, TSDBSubscribeCallBack callBack) throws SQLException
訂閱一個topic,并設置訂閱內容、輪詢周期、以及異步調用選擇。如果訂閱成功,此方法會返回一個long型值(本質是連接的一個指針值,此處用于表征此成功建立的訂閱);如果訂閱失敗,則此方法應直接拋出異常。
topic:訂閱的名字。
sql:訂閱的內容,用戶可以傳入一條查詢SQL語句,此時訂閱關注的內容為此查詢結果集中的新增記錄。
period:訂閱執行時,內部輪詢的時間。
callBack:執行異步訂閱時的回調,空值表示不使用異步。
public TSDBResultSet consume(long subscription) throws OperationsException, SQLException
獲取最新的訂閱結果,訂閱結果返回為一個TSDBResultSet類的實例。
subscription:subscribe成功后返回的long型值。
public void unsubscribe(long subscription, boolean isKeep) throws SQLException
取消已經建立的訂閱,如果失敗則直接拋出異常。
subscription:subscribe成功后返回的long型值。
isKeep:是否保持訂閱的記錄。
下(xia)面結合(he)一個示例,介(jie)紹(shao)下(xia)其使用方法。
首先是創建訂閱:
/** * sync subscribe * * @param topic * @param sql * @param restart * @param period * @return * @throws SQLException */ public long subscribe(String topic, String sql, boolean restart, int period) /** * async subscribe * * @param topic * @param sql * @param restart * @param period * @param callBack * @throws SQLException */ public long subscribe(String topic, String sql, boolean restart, int period, TSDBSubscribeCallBack callBack)
Class.forName("com.taosdata.jdbc.TSDBDriver");
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":" + port + "/" + dbName + "?user=root&password=taosdata"
, properties);
String rawSql = "select * from devices ;";
subscribe = ((TSDBConnection) connection).createSubscribe();
subscribId = subscribe.subscribe(topic, rawSql, false, 1000);
TDengine中的訂閱既可以是同步的,也可以是異步的。這里,同步的意思是用戶程序要直接調用 consume 來拉取數據,而異步則由 API 在內部的另一個線程中調用 consume,然后把拉取到的數據交給回調函數 callback 去處理。
注意,這里沒有指定起(qi)始(shi)時(shi)間,所(suo)以會讀(du)到所(suo)有時(shi)間的數據(ju)。如果只想從一天前的數據(ju)開始(shi)訂閱,而不需要(yao)更(geng)早的歷史數據(ju),可以再加(jia)上(shang)一個時(shi)間條件:
select * from devices where ts > now - 1d and temperature > 80;
訂閱的 topic 實際上是它的(de)名(ming)字(zi),因為訂閱功能(neng)是在客(ke)戶端 API 中實現的(de),所(suo)以沒必(bi)要保證它全(quan)局(ju)唯一,但需(xu)要它在一臺客(ke)戶端機器(qi)上唯一。
如果名稱為 topic 的訂閱不存在,參數 restart 沒有意義;但如果用戶程序創建這個訂閱后退出,當它再次啟動并重新使用這個 topic 時,restart 就會被用于決定是從頭開始讀取數據,還是接續上次的位置進行讀取。本例中,如果 restart 是 true,用戶程序肯定會讀到所有數據。但如果這個訂閱之前就存在了,并且已經讀取了一部分數據,且 restart 是 false,用戶程序就不會讀到(dao)之前已經讀取的數據了(le)。
subscribe 的最后一個參數是以毫秒為單位的輪詢周期(間隔需要大于 1000 )。在同步模式下,如過前后兩次調用 consume 的時間間隔小于此時間,consume 會阻塞,直(zhi)到間隔(ge)(ge)超(chao)過此(ci)時(shi)間。異步(bu)模式下,這個時(shi)間是兩次調(diao)用(yong)回調(diao)函數(shu)的最(zui)小時(shi)間間隔(ge)(ge)。
當(dang)要結束一(yi)次數(shu)據訂(ding)閱時,需要調用 unsubscribe:
/**
* cancel subscribe
*
* @param subscription
* @param isKeep
* @throws SQLException
*/
public void unsubscribe(long subscription, boolean isKeep)
其第二個參數,用于決定是否在客戶端保留訂閱的進度信息,如果大家還記得前面說過“訂閱功能是在客戶端 API 中實現的”的話,應該可以猜到,如果這個參數是 false,那無論下次調用 subscribe 的時的 restart 參數是什么,訂閱都只能重新開始了。另外,進度信息的保存位置是 {DataDir}/subscribe/,這個目錄下,每個訂閱有一個與其 topic 同(tong)名的文(wen)件,刪掉某(mou)個文(wen)件,同(tong)樣會導致下(xia)次(ci)創建其對應(ying)的訂閱時只能(neng)重新開始。
限制條件
下面(mian)是(shi)一些 TDengine 訂閱(yue)功能的局限(xian),大(da)家需(xu)要在使用中注意(yi)。
- 訂閱的查詢語句只能是
select語句,只能查詢原始數據(不支持聚合函數),只能按時間正序查詢數據。 - 在滿足應用需求的情況下,請盡量將輪詢周期設置的大一些,否則會對系統性能造成影響。
- 暫不支持亂序數據,用戶程序可能讀不到使用 import 方式插入的數據。
- 如果用戶程序異常退出或沒有正確調用 unsubscribe,進度信息可能會有錯誤,這時,后續的同名訂閱可能讀到之前已經讀過的數據。


























