做工業互聯網(wang)或物聯網(wang)系統,最基本的(de)(de)(de)需求(qiu)是展示數據曲(qu)線,比(bi)如(ru)功率曲(qu)線,類(lei)似于股(gu)票的(de)(de)(de)分(fen)時圖(tu),通(tong)常我們會取(qu)每分(fen)鐘內(nei)該設備上(shang)(shang)報的(de)(de)(de)最后一次功率值為這一分(fen)鐘的(de)(de)(de)功率,如(ru)果某一分(fen)鐘內(nei),設備沒有(you)上(shang)(shang)報,則取(qu)上(shang)(shang)一分(fen)鐘的(de)(de)(de)功率值,以此類(lei)推(tui)。舉(ju)例如(ru)下:

得到的分鐘曲線:


通常我們會把(ba)(ba)(ba)(ba)設(she)備上(shang)報的數據先(xian)寫入(ru)(ru)Apache Kafka。如果(guo)是離線計(ji)(ji)算(suan)(suan)場景,可能會考慮(lv)把(ba)(ba)(ba)(ba)數據寫入(ru)(ru)Hive,然后(hou)使用Spark SQL定時讀取Hive,再(zai)把(ba)(ba)(ba)(ba)計(ji)(ji)算(suan)(suan)結(jie)果(guo)寫入(ru)(ru)HBase;如果(guo)是實(shi)時計(ji)(ji)算(suan)(suan)場景,則會使用Apache Flink消費Kafka數據,把(ba)(ba)(ba)(ba)結(jie)果(guo)寫入(ru)(ru)HBase,這種情況(kuang)下還(huan)需要考慮(lv)數據亂序和延遲投遞計(ji)(ji)算(suan)(suan)等(deng)問(wen)題(ti)。
而(er)且(qie),基于傳統大數據Hadoop的(de)架構(gou),需要(yao)搭建ZooKeeper和HDFS,然(ran)后才是Hive和HBase,整(zheng)個(ge)體系維護(hu)成(cheng)本(ben)很(hen)高。此外,HBase基于鍵(jian)值存儲時序數據,會浪費很(hen)多空間在同一鍵(jian)值的(de)數據設計架構(gou)上(shang)面。
以上所舉,是(shi)物聯(lian)網設備屬性曲線計算場景的其(qi)中一(yi)個痛(tong)點(dian),另(ling)外還(huan)需要考慮數(shu)據增長、數(shu)據核(he)對以及數(shu)據容災等(deng)特點(dian)。
筆者所在的(de)公(gong)司,要(yao)基于3D打印技術給(gei)客戶提供整體化解決方案,自(zi)然(ran)需(xu)要(yao)對設備(bei)的(de)運行狀態(tai)做持續追蹤,需(xu)要(yao)存儲設備(bei)的(de)運行數據。這時候我(wo)們找(zhao)到了開源(yuan)的(de)物聯網(wang)大數據平臺TDengine()。
參考(kao)TDengine Database的(de)文(wen)檔中SQL的(de)寫法(fa),在數據齊(qi)全的(de)情(qing)況下,可以輕(qing)松地(di)用一句SQL解決上面(mian)的(de)問題:
select last(val) a from super_table_xx where ts >= '2021-06-07 18:10:00' and ts <= '2021-06-07 18:20:00' interval(60s) fill(value, 0);
為什么(me)類似的SQL,TDengine Database的執行效率(lv)可以如此之高呢(ni)?
這(zhe)就在(zai)于它的(de)超級表以(yi)及子(zi)表,針對單個(ge)設(she)備的(de)數(shu)據,TDengine設(she)計了按照時間(jian)連(lian)續存儲的(de)特(te)性。而事實上,業(ye)務(wu)系統(tong)在(zai)使(shi)用物聯網數(shu)據的(de)時候,無論是(shi)即時查詢還是(shi)離線分析(xi),存在(zai)讀取單個(ge)設(she)備的(de)一個(ge)連(lian)續時間(jian)段數(shu)據的(de)特(te)點。
假設(she),我們要存(cun)儲(chu)設(she)備的溫度與濕度,我們可以設(she)計超(chao)級(ji)表如下:
create stable if not exists s_device (ts TIMESTAMP,
temperature double,
humidity double
) TAGS (device_sn BINARY(1000));
實(shi)際使用中,例如針對設備’d1’和’d2’的數(shu)據執行插入的SQL如下:
insert into s_device_d1 (ts, temperature, humidity) USING s_device (device_sn) TAGS ('d1') values (1623157875000, 35.34, 80.24);
insert into s_device_d2 (ts, temperature, humidity) USING s_device (device_sn) TAGS ('d2') values (1623157891000, 29.63, 79.48);
搜索(suo)設備’d1’某個時間段(duan)的(de)數據,其SQL如(ru)下:
select * from s_device where device_sn = 'd1' and ts > 1623157871000 and ts < 1623157890000 ;
假設統計過去7天(tian)的平均(jun)溫度曲線,每小時1個點:
select avg(temperature) temperature from s_device where device_sn = #{deviceSn} and ts >= #{startTime} and ts < #{endTime} interval(1h)
TDengine還(huan)提供了很多(duo)聚(ju)合函數,類似上(shang)面(mian)的計算(suan)1分鐘(zhong)連續曲線(xian)的last和fill,以及其他常用的sum和max等(deng)。
在(zai)和應用程序結合的過程中,我(wo)(wo)們(men)選(xuan)用MyBatis這種靈活易(yi)上(shang)手的ORM框架,例如,針對上(shang)面(mian)的數據表’s_device’,我(wo)(wo)們(men)先定義entity :
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.sql.Timestamp;
/**
* @author: DaLuo
* @date: 2021/06/25
* @description:
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@TableName(value = "s_device")
public class TestSuperDeviceEntity {
private Timestamp ts;
private Float temperature;
private Float humidity;
@TableField(value = "device_sn")
private String device_sn ;
}
再定義(yi) mapper:
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.hg.device.kafka.tdengine.entity.TestSuperDeviceEntity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import java.sql.Timestamp;
import java.util.List;
/**
* @author: DaLuo
* @date: 2021/06/25
* @description:
*/
@Mapper
public interface TestSuperDeviceMapper extends BaseMapper<TestSuperDeviceEntity> {
/**
* 單個插入
* @param entity
* @return
*/
@Insert({
"INSERT INTO 's_device_${entity.deviceSn}' (ts ,temperature, humidity ) ",
"USING s_device (device_sn) TAGS (#{entity.deviceSn}) ",
"VALUES (#{entity.ts}, #{entity.temperature}, #{entity.humidity})"
})
int insertOne(@Param(value = "entity") TestSuperDeviceEntity entity);
/**
* 批量插入
* @param entities
* @return
*/
@Insert({
"<script>",
"INSERT INTO ",
"<foreach collection='list' item='item' separator=' '>",
"'s_device_${item.deviceSn}' (ts ,temperature, humidity) USING s_device (device_sn) TAGS (#{item.deviceSn}) ",
"VALUES (#{item.ts}, #{item.temperature}, #{item.humidity})",
"</foreach>",
"</script>"
})
int batchInsert(@Param("list") List<TestSuperDeviceEntity> entities);
/**
* 查詢過去一段時間范圍的平均溫度,每小時1個數據點
* @param deviceSn
* @param startTime inclusive
* @param endTime exclusive
* @return
*/
@Select("select avg(temperature) temperature from s_device where device_sn = #{deviceSn} and ts >= #{startTime} and ts < #{endTime} interval(1h)")
List<TempSevenDaysTemperature> selectSevenDaysTemperature(
@Param(value = "deviceSn") String deviceSn,
@Param(value = "startTime") long startTime,
@Param(value = "endTime") long endTime);
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
class TempSevenDaysTemperature {
private Timestamp ts;
private float temperature;
}
}
TDengine有一個(ge)很巧妙的(de)設計(ji),就是不用預(yu)先創建子表,所以(yi)我們可(ke)以(yi)很方便地利(li)用’tag’標簽作為子表名(ming)稱的(de)一部(bu)分,即(ji)時插入數據同時創建子表。
注意:考慮(lv)到跨時(shi)(shi)區(qu)的(de)(de)國際化特(te)性,我們所有(you)的(de)(de)時(shi)(shi)間存(cun)儲(chu)查詢交(jiao)互,都(dou)是使(shi)(shi)用的(de)(de)時(shi)(shi)間戳(chuo),而(er)非”yyyy-mm-dd hh:MM:ss”格(ge)式,因為數據存(cun)儲(chu)涉及到應(ying)用程序時(shi)(shi)區(qu),連(lian)接字符串時(shi)(shi)區(qu),TDengine服務時(shi)(shi)區(qu),使(shi)(shi)用”yyyy-mm-dd hh:MM:ss”格(ge)式容易導致時(shi)(shi)間存(cun)儲(chu)的(de)(de)不(bu)準確性,而(er)使(shi)(shi)用時(shi)(shi)間戳(chuo),長(chang)整型的(de)(de)數據格(ge)式則(ze)可以完(wan)美地避免此類問題。
Java使用(yong)TDengine JDBC-driver目(mu)前(qian)有兩(liang)種方式:JDBC-JNI和JDBC-RESTful,前(qian)者(zhe)在(zai)寫(xie)入性能上(shang)更有優勢(shi)。但是需(xu)要(yao)在(zai)應用(yong)程(cheng)序運行的服務器(qi)上(shang)安裝(zhuang)TDengine客戶(hu)端驅(qu)動(dong)。
我們(men)(men)的(de)應用(yong)程序(xu)(xu)用(yong)到了Kubernetes集群,程序(xu)(xu)是(shi)運(yun)行在Docker里面,為此(ci)我們(men)(men)制作了一個適(shi)合我們(men)(men)應用(yong)程序(xu)(xu)運(yun)行的(de)鏡像,例如基礎鏡像的(de)Dockerfile如下所示:
FROM openjdk:8-jdk-oraclelinux7
COPY TDengine-client-2.0.16.0-Linux-x64.tar.gz /
RUN tar -xzvf /TDengine-client-2.0.16.0-Linux-x64.tar.gz && cd /TDengine-client-2.0.16.0 && pwd && ls && ./install_client.sh
build:
docker build -t tdengine-openjdk-8-runtime:2.0.16.0 -f Dockerfile .
引用(yong)程序鏡像Dockerfile所示:
FROM tdengine-openjdk-8-runtime:2.0.16.0
ENV JAVA_OPTS="-Duser.timezone=Asia/Shanghai -Djava.security.egd=file:/dev/./urandom"
COPY app.jar /app.jar
ENTRYPOINT ["java","-jar","/app.jar"]
這樣我們的應用程序就可以調度在任意(yi)的K8s節點上了。
另外,我們(men)的(de)程序(xu)涉及到任務(wu)自動(dong)化調度,需(xu)要頻繁地和設(she)備(bei)下位機進行MQTT數據(ju)交互,比如,云端發送指令1000-“開(kai)始任務(wu)A”,下位機回復指令2000-“收到任務(wu)A”,把(ba)(ba)指令理解(jie)成設(she)備(bei),把(ba)(ba)指令序(xu)列號以及內(nei)容理解(jie)成它(ta)的(de)屬(shu)性,自然這種數據(ju)也(ye)是非(fei)常適合存儲在(zai)TDengine時序(xu)數據(ju)庫中(zhong)的(de):
*************************** 1.row ***************************
ts: 2021-06-23 16:10:30.000
msg: {"task_id":"7b40ed4edc1149f1837179c77d8c3c1f","action":"start"}
device_sn: deviceA
kind: 1000
*************************** 2.row ***************************
ts: 2021-06-23 16:10:31.000
msg: {"task_id":"7b40ed4edc1149f1837179c77d8c3c1f","action":"received"}
device_sn: deviceA
kind: 2000
我們云端在和(he)設備對(dui)接(jie)的過程(cheng)中,頻繁需要考究消息是否發送的問題,所(suo)以(yi)迫(po)切需要對(dui)指(zhi)令進行保存,從(cong)而在應用程(cheng)序中新(xin)辟線程(cheng),專門訂閱指(zhi)令集消息,批量寫入到TDengine數據庫。
最后,TDengine Database還有一(yi)個超級表log.dn,里面保留了內存(cun)、CPU等使用信息,所以(yi)我(wo)們可以(yi)利用Grafana展示這些(xie)數據,為監控(kong)提供可靠的運營數據參照!

作者介紹:大羅,黑格(ge)智造架構(gou)師,主要(yao)從事云原生,大數據系統(tong)開發(fa),曾參與國家示范級工業(ye)互(hu)聯(lian)網系統(tong)建設(she)等(deng)。


























