243 lines
9.1 KiB
Java
243 lines
9.1 KiB
Java
package com.cnbm.influx.config;
|
||
|
||
import com.cnbm.influx.constant.Constant;
|
||
import com.cnbm.influx.param.PageInfo;
|
||
import com.cnbm.influx.param.QueryDataGroupByTimeParam;
|
||
import com.cnbm.influx.param.QueryDataParam;
|
||
import com.cnbm.influx.param.Range;
|
||
import com.cnbm.influx.template.Event;
|
||
import com.influxdb.client.InfluxDBClient;
|
||
import com.influxdb.client.InfluxDBClientFactory;
|
||
import com.influxdb.client.QueryApi;
|
||
import com.influxdb.client.WriteApi;
|
||
import com.influxdb.client.domain.WritePrecision;
|
||
import com.influxdb.client.write.Point;
|
||
import com.influxdb.query.FluxTable;
|
||
|
||
import java.util.ArrayList;
|
||
import java.util.List;
|
||
|
||
public enum InfluxClient {
|
||
|
||
/**
|
||
* influxdb 读写客户端,,如果write比较繁忙,后续可以考虑,维护 client一个线程池。
|
||
* */
|
||
Client("http://192.168.0.170:8086","lkBsC27QZr1W50BSPlGxpTqNNpwuUk5uz1dZZRPSPbCG5VmNDDUo8P3UkZIhGWwfJwkuz6ZGZ7Et4_KBaG3gHw==","qgs","qgs-bucket"),
|
||
|
||
;
|
||
private String url;
|
||
private String token;
|
||
private String org;
|
||
private String bucket;
|
||
|
||
private InfluxDBClient influxDBClient;
|
||
private WriteApi writeApi;
|
||
|
||
private QueryApi queryApi;
|
||
|
||
InfluxClient(String url,String token,String org,String bucket){
|
||
this.url = url;
|
||
this.token = token;
|
||
this.org = org;
|
||
this.bucket = bucket;
|
||
this.influxDBClient = InfluxDBClientFactory.create(this.url, this.token.toCharArray(),this.org,this.bucket);
|
||
this.writeApi = influxDBClient.makeWriteApi();
|
||
this.queryApi = influxDBClient.getQueryApi();
|
||
}
|
||
|
||
|
||
public QueryApi getQueryApi() {
|
||
return queryApi;
|
||
}
|
||
|
||
public WriteApi getWriteApi() {
|
||
return writeApi;
|
||
}
|
||
|
||
/**
|
||
* 测试连接是否正常
|
||
*
|
||
* @return
|
||
* true 服务正常健康
|
||
* false 异常
|
||
*/
|
||
public boolean ping() {
|
||
boolean isConnected = false;
|
||
Boolean pong;
|
||
try {
|
||
pong = influxDBClient.ping();
|
||
if (pong != null) {
|
||
isConnected = pong;
|
||
}
|
||
} catch (Exception e) {
|
||
e.printStackTrace();
|
||
return false;
|
||
}
|
||
return isConnected;
|
||
}
|
||
|
||
/**
|
||
* desc: 异步批量 写入数据/更新数据
|
||
* notes: 如果是更新数据,要保证time字段不能改变
|
||
* auth: caixaing
|
||
* */
|
||
public void insert(Event event, String measurement){
|
||
|
||
Point point = null;
|
||
if(event.getBatchNum()==null){
|
||
point = Point.measurement(measurement)
|
||
.addTag("transationId", event.getTransationId())
|
||
.addTag("argName", event.getArgName())
|
||
.addField("argValue", event.getArgValue())
|
||
.time(event.getTime().toEpochMilli(), WritePrecision.MS);
|
||
}else {
|
||
point = Point.measurement(measurement)
|
||
.addTag("transationId", event.getTransationId())
|
||
|
||
.addTag("inspectionSheetId", event.getInspectionSheetId())
|
||
.addTag("sampleNumber", event.getSampleNumber())
|
||
.addTag("batchNum", event.getBatchNum())
|
||
|
||
.addTag("argName", event.getArgName())
|
||
.addField("argValue", event.getArgValue())
|
||
.time(event.getTime().toEpochMilli(), WritePrecision.MS);
|
||
}
|
||
writeApi.writePoint(point);
|
||
|
||
}
|
||
|
||
/**
|
||
* desc: 异步批量 写入数据/更新数据
|
||
* notes: 如果是更新数据,要保证time字段不能改变
|
||
* auth: caixaing
|
||
* */
|
||
public void batchInsert(List<Event> events, String measurement){
|
||
List<Point> list = new ArrayList<>();
|
||
for(Event event:events){
|
||
Point point = null;
|
||
if(event.getBatchNum()==null){
|
||
point = Point.measurement(measurement)
|
||
.addTag("transationId", event.getTransationId())
|
||
.addTag("argName", event.getArgName())
|
||
.addField("argValue", event.getArgValue())
|
||
.time(event.getTime().toEpochMilli(), WritePrecision.MS);
|
||
}else {
|
||
point = Point.measurement(measurement)
|
||
.addTag("transationId", event.getTransationId())
|
||
.addTag("inspectionSheetId", event.getInspectionSheetId())
|
||
|
||
// .addTag("batchNum", event.getBatchNum())
|
||
.addTag("sampleNumber", event.getSampleNumber())
|
||
.addField("batchNum", event.getBatchNum())
|
||
|
||
.addTag("argName", event.getArgName())
|
||
.addField("argValue", event.getArgValue())
|
||
.time(event.getTime().toEpochMilli(), WritePrecision.MS);
|
||
}
|
||
|
||
|
||
list.add(point);
|
||
}
|
||
writeApi.writePoints(list);
|
||
}
|
||
|
||
|
||
public List<FluxTable> query(QueryDataParam param){
|
||
String measurement = param.getMeasurement();
|
||
List<String> dropedTagNames = param.getDropedTagNames();
|
||
Range range = param.getRange();
|
||
String bucket = param.getBucket();
|
||
PageInfo pageInfo = param.getPageInfo();
|
||
|
||
String flux = "from(bucket:\""+bucket+"\")";
|
||
flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")";
|
||
flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")";
|
||
|
||
if(param.getTag()!=null){
|
||
String tagName = param.getTag().getTagName();
|
||
String tagValue = param.getTag().getTagValue();
|
||
flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")";
|
||
}
|
||
|
||
//调整时区,查询出的结果 +8个小时
|
||
//flux += "|> timeShift(duration: 8h)";
|
||
|
||
|
||
for(String dropName:dropedTagNames){
|
||
flux += "|> drop(columns: [\""+dropName+"\"])";
|
||
}
|
||
flux += "|> sort(columns: [\"_time\"], desc: true)";
|
||
if(pageInfo!=null){
|
||
flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")";
|
||
}
|
||
return queryApi.query(flux);
|
||
}
|
||
|
||
public List<FluxTable> queryByGroup(QueryDataParam param){
|
||
String measurement = param.getMeasurement();
|
||
List<String> dropedTagNames = param.getDropedTagNames();
|
||
Range range = param.getRange();
|
||
String bucket = param.getBucket();
|
||
String tagName = param.getTag().getTagName();
|
||
String tagValue = param.getTag().getTagValue();
|
||
String groupName = param.getGroupName();
|
||
PageInfo pageInfo = param.getPageInfo();
|
||
|
||
String flux = "from(bucket:\""+bucket+"\")";
|
||
flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")";
|
||
flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")";
|
||
flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")";
|
||
flux += "|> timeShift(duration: 8h)";
|
||
flux += "|> group(columns: [\""+groupName+"\"], mode: \"except\")";
|
||
for(String dropName:dropedTagNames){
|
||
flux += "|> drop(columns: [\""+dropName+"\"])";
|
||
}
|
||
flux += "|> sort(columns: [\"_time\"], desc: true)";
|
||
if(pageInfo!=null){
|
||
flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")";
|
||
}
|
||
return queryApi.query(flux);
|
||
}
|
||
|
||
public List<FluxTable> queryGroupByTime(QueryDataGroupByTimeParam param){
|
||
String measurement = param.getMeasurement();
|
||
List<String> dropedTagNames = param.getDropedTagNames();
|
||
Range range = param.getRange();
|
||
String bucket = param.getBucket();
|
||
String tagName = param.getTag().getTagName();
|
||
String tagValue = param.getTag().getTagValue();
|
||
PageInfo pageInfo = param.getPageInfo();
|
||
|
||
String flux = "from(bucket:\""+bucket+"\")";
|
||
flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")";
|
||
flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")";
|
||
flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")";
|
||
flux += "|> timeShift(duration: 8h)";
|
||
for(String dropName:dropedTagNames){
|
||
flux += "|> drop(columns: [\""+dropName+"\"])";
|
||
}
|
||
//|> window(every: 1mo)
|
||
if(param.getTimeType() == 1){
|
||
flux += "|> window(every: 1y)";
|
||
}else if(param.getTimeType() == 2 ){
|
||
flux += "|> window(every: 1mo)";
|
||
}else{
|
||
flux += "|> window(every: 1d)";
|
||
}
|
||
flux += "|> sort(columns: [\"_time\"], desc: true)";
|
||
if(pageInfo!=null){
|
||
flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")";
|
||
}
|
||
|
||
|
||
// List<FluxTable> tables = queryApi.query(flux);
|
||
// for (FluxTable fluxTable : tables) {
|
||
// List<FluxRecord> records = fluxTable.getRecords();
|
||
// for (FluxRecord fluxRecord : records) {
|
||
// System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement());
|
||
//
|
||
// }
|
||
// }
|
||
return queryApi.query(flux);
|
||
}
|
||
} |