134 lines
4.6 KiB
Java
134 lines
4.6 KiB
Java
|
package com.cnbm.influx.config;
|
|||
|
|
|||
|
import com.cnbm.influx.constant.Constant;
|
|||
|
import com.cnbm.influx.param.PageInfo;
|
|||
|
import com.cnbm.influx.param.QueryDataParam;
|
|||
|
import com.cnbm.influx.param.Range;
|
|||
|
import com.cnbm.influx.template.Event;
|
|||
|
import com.influxdb.client.InfluxDBClient;
|
|||
|
import com.influxdb.client.InfluxDBClientFactory;
|
|||
|
import com.influxdb.client.QueryApi;
|
|||
|
import com.influxdb.client.WriteApi;
|
|||
|
import com.influxdb.client.domain.WritePrecision;
|
|||
|
import com.influxdb.client.write.Point;
|
|||
|
import com.influxdb.query.FluxTable;
|
|||
|
|
|||
|
import java.util.ArrayList;
|
|||
|
import java.util.List;
|
|||
|
|
|||
|
public enum InfluxClient {
|
|||
|
|
|||
|
/**
|
|||
|
* influxdb 读写客户端,,如果write比较繁忙,后续可以考虑,维护 client一个线程池。
|
|||
|
* */
|
|||
|
Client("http://192.168.0.170:8086","lkBsC27QZr1W50BSPlGxpTqNNpwuUk5uz1dZZRPSPbCG5VmNDDUo8P3UkZIhGWwfJwkuz6ZGZ7Et4_KBaG3gHw==","qgs","qgs-bucket"),
|
|||
|
|
|||
|
;
|
|||
|
private String url;
|
|||
|
private String token;
|
|||
|
private String org;
|
|||
|
private String bucket;
|
|||
|
|
|||
|
private InfluxDBClient influxDBClient;
|
|||
|
private WriteApi writeApi;
|
|||
|
|
|||
|
private QueryApi queryApi;
|
|||
|
|
|||
|
InfluxClient(String url,String token,String org,String bucket){
|
|||
|
this.url = url;
|
|||
|
this.token = token;
|
|||
|
this.org = org;
|
|||
|
this.bucket = bucket;
|
|||
|
this.influxDBClient = InfluxDBClientFactory.create(this.url, this.token.toCharArray(),this.org,this.bucket);
|
|||
|
this.writeApi = influxDBClient.makeWriteApi();
|
|||
|
this.queryApi = influxDBClient.getQueryApi();
|
|||
|
}
|
|||
|
|
|||
|
|
|||
|
public QueryApi getQueryApi() {
|
|||
|
return queryApi;
|
|||
|
}
|
|||
|
|
|||
|
public WriteApi getWriteApi() {
|
|||
|
return writeApi;
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* 测试连接是否正常
|
|||
|
*
|
|||
|
* @return
|
|||
|
* true 服务正常健康
|
|||
|
* false 异常
|
|||
|
*/
|
|||
|
private boolean ping() {
|
|||
|
boolean isConnected = false;
|
|||
|
Boolean pong;
|
|||
|
try {
|
|||
|
pong = influxDBClient.ping();
|
|||
|
if (pong != null) {
|
|||
|
isConnected = true;
|
|||
|
}
|
|||
|
} catch (Exception e) {
|
|||
|
e.printStackTrace();
|
|||
|
}
|
|||
|
return isConnected;
|
|||
|
}
|
|||
|
|
|||
|
public void insert(Event event, String measurement){
|
|||
|
Point point = Point.measurement(measurement)
|
|||
|
.addTag("transationId", event.getTransationId())
|
|||
|
.addTag("argName", event.getArgName())
|
|||
|
.addField("argValue", event.getArgValue())
|
|||
|
.time(event.getTime().toEpochMilli(), WritePrecision.MS);
|
|||
|
writeApi.writePoint(point);
|
|||
|
|
|||
|
}
|
|||
|
|
|||
|
//异步 批量写入数据
|
|||
|
//如果要批量插入的话,一次也只能写入
|
|||
|
public void batchInsert(List<Event> events, String measurement){
|
|||
|
List<Point> list = new ArrayList<>();
|
|||
|
for(Event event:events){
|
|||
|
Point point = Point.measurement(measurement)
|
|||
|
.addTag("transationId", event.getTransationId())
|
|||
|
.addTag("argName", event.getArgName())
|
|||
|
.addField("argValue", event.getArgValue())
|
|||
|
.time(event.getTime().toEpochMilli(), WritePrecision.MS);
|
|||
|
list.add(point);
|
|||
|
}
|
|||
|
writeApi.writePoints(list);
|
|||
|
}
|
|||
|
|
|||
|
|
|||
|
public List<FluxTable> query(QueryDataParam param){
|
|||
|
String measurement = param.getMeasurement();
|
|||
|
String dropedTagName = param.getDropedTagName();
|
|||
|
Range range = param.getRange();
|
|||
|
String bucket = param.getBucket();
|
|||
|
String tagName = param.getTag().getTagName();
|
|||
|
String tagValue = param.getTag().getTagValue();
|
|||
|
PageInfo pageInfo = param.getPageInfo();
|
|||
|
|
|||
|
String flux = "from(bucket:\""+bucket+"\")";
|
|||
|
flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")";
|
|||
|
flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")";
|
|||
|
flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")";
|
|||
|
flux += "|> drop(columns: [\""+dropedTagName+"\"])";
|
|||
|
flux += "|> sort(columns: [\"_time\"], desc: true)";
|
|||
|
if(pageInfo!=null){
|
|||
|
flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")";
|
|||
|
}
|
|||
|
|
|||
|
|
|||
|
// List<FluxTable> tables = queryApi.query(flux);
|
|||
|
// for (FluxTable fluxTable : tables) {
|
|||
|
// List<FluxRecord> records = fluxTable.getRecords();
|
|||
|
// for (FluxRecord fluxRecord : records) {
|
|||
|
// System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement());
|
|||
|
//
|
|||
|
// }
|
|||
|
// }
|
|||
|
return queryApi.query(flux);
|
|||
|
}
|
|||
|
}
|