@@ -1,18 +1,24 @@ | |||||
package com.qgs.dc.influx; | package com.qgs.dc.influx; | ||||
//import com.qgs.dc.influx.config.InfluxClient; | |||||
import com.influxdb.client.*; | |||||
import com.influxdb.client.domain.WritePrecision; | |||||
import com.influxdb.client.write.Point; | |||||
import com.influxdb.client.InfluxDBClient; | |||||
import com.influxdb.client.InfluxDBClientFactory; | |||||
import com.influxdb.client.QueryApi; | |||||
import com.influxdb.client.WriteApi; | |||||
import com.influxdb.query.FluxRecord; | import com.influxdb.query.FluxRecord; | ||||
import com.influxdb.query.FluxTable; | import com.influxdb.query.FluxTable; | ||||
import com.qgs.dc.influx.param.PageInfo; | import com.qgs.dc.influx.param.PageInfo; | ||||
import com.qgs.dc.influx.param.QueryDataParam; | import com.qgs.dc.influx.param.QueryDataParam; | ||||
import com.qgs.dc.influx.param.Range; | import com.qgs.dc.influx.param.Range; | ||||
import com.qgs.dc.influx.param.Tag; | |||||
import java.text.SimpleDateFormat; | import java.text.SimpleDateFormat; | ||||
import java.time.Instant; | import java.time.Instant; | ||||
import java.util.*; | |||||
import java.util.ArrayList; | |||||
import java.util.Calendar; | |||||
import java.util.Date; | |||||
import java.util.List; | |||||
/** | /** | ||||
* @Desc: "" | * @Desc: "" | ||||
@@ -23,7 +29,7 @@ public class Main { | |||||
public static void main(String[] args) throws InterruptedException { | public static void main(String[] args) throws InterruptedException { | ||||
char[] token = "lkBsC27QZr1W50BSPlGxpTqNNpwuUk5uz1dZZRPSPbCG5VmNDDUo8P3UkZIhGWwfJwkuz6ZGZ7Et4_KBaG3gHw==".toCharArray(); | char[] token = "lkBsC27QZr1W50BSPlGxpTqNNpwuUk5uz1dZZRPSPbCG5VmNDDUo8P3UkZIhGWwfJwkuz6ZGZ7Et4_KBaG3gHw==".toCharArray(); | ||||
String org = "qgs"; | String org = "qgs"; | ||||
String bucket = "mytest"; | |||||
String bucket = "qgs-bucket"; | |||||
InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://192.168.0.170:8086", token, org, bucket); | InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://192.168.0.170:8086", token, org, bucket); | ||||
@@ -45,23 +51,23 @@ public class Main { | |||||
// InfluxClient.Client.insert(event,"ASProcessCompleteEvent"); | // InfluxClient.Client.insert(event,"ASProcessCompleteEvent"); | ||||
Point point = Point.measurement("ASProcessCompleteEvent") | |||||
.addTag("transationId", "112311") | |||||
.addTag("argName", "argName11") | |||||
.addField("argValue", 3D) | |||||
.time(Instant.now().toEpochMilli(), WritePrecision.MS); | |||||
Point point2 = Point.measurement("ASProcessCompleteEvent") | |||||
.addTag("transationId", "222312") | |||||
.addTag("argName", "argName11") | |||||
.addField("argValue", 4D) | |||||
.time(Instant.now().toEpochMilli(), WritePrecision.MS); | |||||
List<Point> list = new ArrayList<>(); | |||||
list.add(point); | |||||
list.add(point2); | |||||
writeApi.writePoints(list); | |||||
// Point point = Point.measurement("ASProcessCompleteEvent") | |||||
// .addTag("transationId", "112311") | |||||
// .addTag("argName", "argName11") | |||||
// .addField("argValue", 3D) | |||||
// .time(Instant.now().toEpochMilli(), WritePrecision.MS); | |||||
// | |||||
// | |||||
// Point point2 = Point.measurement("ASProcessCompleteEvent") | |||||
// .addTag("transationId", "222312") | |||||
// .addTag("argName", "argName11") | |||||
// .addField("argValue", 4D) | |||||
// .time(Instant.now().toEpochMilli(), WritePrecision.MS); | |||||
// List<Point> list = new ArrayList<>(); | |||||
// list.add(point); | |||||
// list.add(point2); | |||||
// | |||||
// writeApi.writePoints(list); | |||||
//todo api.writeMeasurements(WritePrecision.NS, Arrays.asList(new H2OFeetMeasurement("coyote_creek", 15.0D, null, Instant.ofEpochSecond(0, 15)), new H2OFeetMeasurement("coyote_creek", 16.0D, null, Instant.ofEpochSecond(0, 16)))); | //todo api.writeMeasurements(WritePrecision.NS, Arrays.asList(new H2OFeetMeasurement("coyote_creek", 15.0D, null, Instant.ofEpochSecond(0, 15)), new H2OFeetMeasurement("coyote_creek", 16.0D, null, Instant.ofEpochSecond(0, 16)))); | ||||
// List<Event> events = new ArrayList<>(); | // List<Event> events = new ArrayList<>(); | ||||
@@ -101,7 +107,7 @@ public class Main { | |||||
// } | // } | ||||
// String flux = "from(bucket:\"mytest\") |> range(start: -60m)"; | |||||
// String flux = "from(bucket:\"mytest\") |> range(start: -6000000000000000m)"; | |||||
// flux += "|> filter(fn: (r) =>\n" + | // flux += "|> filter(fn: (r) =>\n" + | ||||
// " r._measurement == \"ASProcessCompleteEvent\" and \n" + | // " r._measurement == \"ASProcessCompleteEvent\" and \n" + | ||||
//// " r._field == \"type\" and \n" + //对应 Field key | //// " r._field == \"type\" and \n" + //对应 Field key | ||||
@@ -113,6 +119,7 @@ public class Main { | |||||
// for (FluxTable fluxTable : tables) { | // for (FluxTable fluxTable : tables) { | ||||
// List<FluxRecord> records = fluxTable.getRecords(); | // List<FluxRecord> records = fluxTable.getRecords(); | ||||
// for (FluxRecord fluxRecord : records) { | // for (FluxRecord fluxRecord : records) { | ||||
// Double o = (Double)fluxRecord.getValueByKey("_value"); | |||||
// System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); | // System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); | ||||
//// System.out.println("time: "+fluxRecord.getTime() +" key:"++" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); | //// System.out.println("time: "+fluxRecord.getTime() +" key:"++" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); | ||||
// | // | ||||
@@ -128,36 +135,39 @@ public class Main { | |||||
// |> sort(columns: ["_time"], desc: true) | // |> sort(columns: ["_time"], desc: true) | ||||
// 取前10条数据 | // 取前10条数据 | ||||
// |> limit(n: 10, offset: 0) | // |> limit(n: 10, offset: 0) | ||||
// | |||||
// 取 10-20 条数据 | // 取 10-20 条数据 | ||||
// |> limit(n: 10, offset: 10) | // |> limit(n: 10, offset: 10) | ||||
// | |||||
// 取 20-30 条数据 | // 取 20-30 条数据 | ||||
// |> limit(n: 10, offset: 20) | // |> limit(n: 10, offset: 20) | ||||
//22-57 22.3 | |||||
//23-20 78 | |||||
//24 | |||||
// QueryDataParam queryDataParam = new QueryDataParam(); | |||||
// queryDataParam.setBucket("mytest"); | |||||
// queryDataParam.setRange(new Range(getDate().toInstant(),new Date().toInstant())); | |||||
// queryDataParam.setMeasurement("ASProcessCompleteEvent"); | |||||
// queryDataParam.setTag(new Tag("argName","arg4")); | |||||
QueryDataParam queryDataParam = new QueryDataParam(); | |||||
queryDataParam.setBucket("qgs-bucket"); | |||||
queryDataParam.setRange(new Range(getDate().toInstant(),new Date().toInstant())); | |||||
queryDataParam.setMeasurement("Weight"); | |||||
queryDataParam.setTag(new Tag("argName","LTWeight")); | |||||
// queryDataParam.setDropedTagName("transationId"); | // queryDataParam.setDropedTagName("transationId"); | ||||
// queryDataParam.setPageInfo(new PageInfo(1,100)); | |||||
// | |||||
// List<FluxTable> tables = query(queryDataParam,influxDBClient); | |||||
List<String> dropNames = new ArrayList<>(); | |||||
dropNames.add("transationId"); | |||||
dropNames.add("inspectionSheetId"); | |||||
dropNames.add("batchNum"); | |||||
queryDataParam.setDropedTagNames(dropNames); | |||||
queryDataParam.setPageInfo(new PageInfo(1,10000)); | |||||
List<FluxTable> tables = query(queryDataParam,influxDBClient); | |||||
// List<FluxRecord> records1 = tables.get(0).getRecords(); | // List<FluxRecord> records1 = tables.get(0).getRecords(); | ||||
// List<List<FluxRecord>> lists = Utils.fixedGroup(records1, 10); | // List<List<FluxRecord>> lists = Utils.fixedGroup(records1, 10); | ||||
// for (FluxTable fluxTable : tables) { | |||||
// List<FluxRecord> records = fluxTable.getRecords(); | |||||
// for (FluxRecord fluxRecord : records) { | |||||
// System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); | |||||
// | |||||
// } | |||||
// } | |||||
for (FluxTable fluxTable : tables) { | |||||
List<FluxRecord> records = fluxTable.getRecords(); | |||||
for (FluxRecord fluxRecord : records) { | |||||
Instant timms = fluxRecord.getTime(); | |||||
System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); | |||||
} | |||||
} | |||||
influxDBClient.close(); | influxDBClient.close(); | ||||
} | } | ||||
@@ -170,16 +180,16 @@ public class Main { | |||||
//两种写法都可以获取到前三天的日期 | //两种写法都可以获取到前三天的日期 | ||||
// calendar1.set(Calendar.DAY_OF_YEAR,calendar1.get(Calendar.DAY_OF_YEAR) -3); | // calendar1.set(Calendar.DAY_OF_YEAR,calendar1.get(Calendar.DAY_OF_YEAR) -3); | ||||
//在当前时间的基础上获取前三天的日期 | //在当前时间的基础上获取前三天的日期 | ||||
calendar1.add(Calendar.DATE, -3); | |||||
calendar1.add(Calendar.DATE, -1000); | |||||
//add方法 参数也可传入 月份,获取的是前几月或后几月的日期 | //add方法 参数也可传入 月份,获取的是前几月或后几月的日期 | ||||
//calendar1.add(Calendar.MONTH, -3); | //calendar1.add(Calendar.MONTH, -3); | ||||
Date day = calendar1.getTime(); | Date day = calendar1.getTime(); | ||||
return day; | return day; | ||||
} | } | ||||
private static List<FluxTable> query(QueryDataParam param, InfluxDBClient influxDBClient){ | |||||
private static List<FluxTable> query(QueryDataParam param,InfluxDBClient influxDBClient){ | |||||
String measurement = param.getMeasurement(); | String measurement = param.getMeasurement(); | ||||
String dropedTagName = param.getDropedTagName(); | |||||
List<String> dropedTagName = param.getDropedTagNames(); | |||||
Range range = param.getRange(); | Range range = param.getRange(); | ||||
String bucket = param.getBucket(); | String bucket = param.getBucket(); | ||||
String tagName = param.getTag().getTagName(); | String tagName = param.getTag().getTagName(); | ||||
@@ -190,8 +200,11 @@ public class Main { | |||||
flux += "|> range(start: "+range.getBegin().toString()+",stop:"+range.getEnd().toString()+") \n"; | flux += "|> range(start: "+range.getBegin().toString()+",stop:"+range.getEnd().toString()+") \n"; | ||||
flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\") \n"; | flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\") \n"; | ||||
flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\") \n"; | flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\") \n"; | ||||
flux += "|> drop(columns: [\""+dropedTagName+"\"]) \n"; | |||||
for(String dropName:dropedTagName){ | |||||
flux += "|> drop(columns: [\""+ dropName +"\"]) \n"; | |||||
} | |||||
flux += "|> sort(columns: [\"_time\"], desc: true) \n"; | flux += "|> sort(columns: [\"_time\"], desc: true) \n"; | ||||
flux += "|> window(every: 1y) \n"; | |||||
if(pageInfo!=null){ | if(pageInfo!=null){ | ||||
flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; | flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; | ||||
} | } | ||||
@@ -9,9 +9,9 @@ import java.util.List; | |||||
/** | /** | ||||
* @Desc: "" | * @Desc: "" | ||||
* @Author: caixiang | * @Author: caixiang | ||||
* @DATE: 2022/6/29 16:23 | |||||
* @DATE: 2022/7/12 14:23 | |||||
*/ | */ | ||||
public class Utils { | |||||
public class DataUtils { | |||||
public static void main(String[] args) { | public static void main(String[] args) { | ||||
ArrayList<Integer> arrs = new ArrayList<>(); | ArrayList<Integer> arrs = new ArrayList<>(); | ||||
@@ -23,7 +23,45 @@ public class Utils { | |||||
} | } | ||||
public List<List<Date>> split(List<Date> value) { | |||||
List<List<Date>> result = new ArrayList<>(); | |||||
int day = value.iterator().next().getDate(); | |||||
List<Date> newListEntry = new ArrayList<>(); | |||||
for (Date date : value) { | |||||
if (date.getDate() == day) { | |||||
newListEntry.add(date); | |||||
} | |||||
else { | |||||
day = date.getDate(); | |||||
result.add(newListEntry); | |||||
newListEntry = new ArrayList<>(); | |||||
newListEntry.add(date); | |||||
} | |||||
} | |||||
result.add(newListEntry);//because the last sublist was not added | |||||
return result; | |||||
} | |||||
public static String splitToNeed(String s,Integer type){ | |||||
String[] s1 = s.split(" "); | |||||
String[] split = s1[0].split("-"); | |||||
String year = split[0]; | |||||
String mon = split[1]; | |||||
String day = split[2]; | |||||
if(type == 1 ){ | |||||
//年 | |||||
return year; | |||||
}else if(type == 2 ){ | |||||
//月 | |||||
return year+"-"+mon; | |||||
}else { | |||||
//日 | |||||
return s1[0]; | |||||
} | |||||
} | |||||
public static Date getBeforeDate(Integer number){ | public static Date getBeforeDate(Integer number){ | ||||
Date date = new Date();//获取当前日期 | Date date = new Date();//获取当前日期 | ||||
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");//格式化一下 | SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");//格式化一下 | ||||
@@ -37,6 +75,19 @@ public class Utils { | |||||
Date day = calendar1.getTime(); | Date day = calendar1.getTime(); | ||||
return day; | return day; | ||||
} | } | ||||
public static Date getAfterDate(Integer number){ | |||||
Date date = new Date();//获取当前日期 | |||||
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");//格式化一下 | |||||
Calendar calendar1 = Calendar.getInstance();//获取对日期操作的类对象 | |||||
//两种写法都可以获取到前三天的日期 | |||||
// calendar1.set(Calendar.DAY_OF_YEAR,calendar1.get(Calendar.DAY_OF_YEAR) -3); | |||||
//在当前时间的基础上获取前三天的日期 | |||||
calendar1.add(Calendar.DATE, 0+number); | |||||
//add方法 参数也可传入 月份,获取的是前几月或后几月的日期 | |||||
//calendar1.add(Calendar.MONTH, -3); | |||||
Date day = calendar1.getTime(); | |||||
return day; | |||||
} | |||||
/** | /** | ||||
* 将一组数据固定分组,每组n个元素 | * 将一组数据固定分组,每组n个元素 | ||||
@@ -64,6 +115,4 @@ public class Utils { | |||||
} | } | ||||
return result; | return result; | ||||
} | } | ||||
} | } |
@@ -1,6 +1,5 @@ | |||||
package com.qgs.dc.influx.config; | package com.qgs.dc.influx.config; | ||||
import com.influxdb.client.InfluxDBClient; | import com.influxdb.client.InfluxDBClient; | ||||
import com.influxdb.client.InfluxDBClientFactory; | import com.influxdb.client.InfluxDBClientFactory; | ||||
import com.influxdb.client.QueryApi; | import com.influxdb.client.QueryApi; | ||||
@@ -9,6 +8,7 @@ import com.influxdb.client.domain.WritePrecision; | |||||
import com.influxdb.client.write.Point; | import com.influxdb.client.write.Point; | ||||
import com.influxdb.query.FluxTable; | import com.influxdb.query.FluxTable; | ||||
import com.qgs.dc.influx.param.PageInfo; | import com.qgs.dc.influx.param.PageInfo; | ||||
import com.qgs.dc.influx.param.QueryDataGroupByTimeParam; | |||||
import com.qgs.dc.influx.param.QueryDataParam; | import com.qgs.dc.influx.param.QueryDataParam; | ||||
import com.qgs.dc.influx.param.Range; | import com.qgs.dc.influx.param.Range; | ||||
import com.qgs.dc.influx.template.Event; | import com.qgs.dc.influx.template.Event; | ||||
@@ -60,16 +60,17 @@ public enum InfluxClient { | |||||
* true 服务正常健康 | * true 服务正常健康 | ||||
* false 异常 | * false 异常 | ||||
*/ | */ | ||||
private boolean ping() { | |||||
public boolean ping() { | |||||
boolean isConnected = false; | boolean isConnected = false; | ||||
Boolean pong; | Boolean pong; | ||||
try { | try { | ||||
pong = influxDBClient.ping(); | pong = influxDBClient.ping(); | ||||
if (pong != null) { | if (pong != null) { | ||||
isConnected = true; | |||||
isConnected = pong; | |||||
} | } | ||||
} catch (Exception e) { | } catch (Exception e) { | ||||
e.printStackTrace(); | e.printStackTrace(); | ||||
return false; | |||||
} | } | ||||
return isConnected; | return isConnected; | ||||
} | } | ||||
@@ -77,6 +78,7 @@ public enum InfluxClient { | |||||
public void insert(Event event, String measurement){ | public void insert(Event event, String measurement){ | ||||
Point point = Point.measurement(measurement) | Point point = Point.measurement(measurement) | ||||
.addTag("transationId", event.getTransationId()) | .addTag("transationId", event.getTransationId()) | ||||
.addTag("inspectionSheetId", event.getInspectionSheetId()) | |||||
.addTag("argName", event.getArgName()) | .addTag("argName", event.getArgName()) | ||||
.addField("argValue", event.getArgValue()) | .addField("argValue", event.getArgValue()) | ||||
.time(event.getTime().toEpochMilli(), WritePrecision.MS); | .time(event.getTime().toEpochMilli(), WritePrecision.MS); | ||||
@@ -89,11 +91,25 @@ public enum InfluxClient { | |||||
public void batchInsert(List<Event> events, String measurement){ | public void batchInsert(List<Event> events, String measurement){ | ||||
List<Point> list = new ArrayList<>(); | List<Point> list = new ArrayList<>(); | ||||
for(Event event:events){ | for(Event event:events){ | ||||
Point point = Point.measurement(measurement) | |||||
.addTag("transationId", event.getTransationId()) | |||||
.addTag("argName", event.getArgName()) | |||||
.addField("argValue", event.getArgValue()) | |||||
.time(event.getTime().toEpochMilli(), WritePrecision.MS); | |||||
Point point = null; | |||||
if(event.getBatchNum()==null){ | |||||
point = Point.measurement(measurement) | |||||
.addTag("transationId", event.getTransationId()) | |||||
.addTag("inspectionSheetId", event.getInspectionSheetId()) | |||||
.addTag("argName", event.getArgName()) | |||||
.addField("argValue", event.getArgValue()) | |||||
.time(event.getTime().toEpochMilli(), WritePrecision.MS); | |||||
}else { | |||||
point = Point.measurement(measurement) | |||||
.addTag("transationId", event.getTransationId()) | |||||
.addTag("inspectionSheetId", event.getInspectionSheetId()) | |||||
.addTag("batchNum", event.getBatchNum().toString()) | |||||
.addTag("argName", event.getArgName()) | |||||
.addField("argValue", event.getArgValue()) | |||||
.time(event.getTime().toEpochMilli(), WritePrecision.MS); | |||||
} | |||||
list.add(point); | list.add(point); | ||||
} | } | ||||
writeApi.writePoints(list); | writeApi.writePoints(list); | ||||
@@ -102,7 +118,55 @@ public enum InfluxClient { | |||||
public List<FluxTable> query(QueryDataParam param){ | public List<FluxTable> query(QueryDataParam param){ | ||||
String measurement = param.getMeasurement(); | String measurement = param.getMeasurement(); | ||||
String dropedTagName = param.getDropedTagName(); | |||||
List<String> dropedTagNames = param.getDropedTagNames(); | |||||
Range range = param.getRange(); | |||||
String bucket = param.getBucket(); | |||||
String tagName = param.getTag().getTagName(); | |||||
String tagValue = param.getTag().getTagValue(); | |||||
PageInfo pageInfo = param.getPageInfo(); | |||||
String flux = "from(bucket:\""+bucket+"\")"; | |||||
flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")"; | |||||
flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")"; | |||||
flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; | |||||
for(String dropName:dropedTagNames){ | |||||
flux += "|> drop(columns: [\""+dropName+"\"])"; | |||||
} | |||||
flux += "|> sort(columns: [\"_time\"], desc: true)"; | |||||
if(pageInfo!=null){ | |||||
flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; | |||||
} | |||||
return queryApi.query(flux); | |||||
} | |||||
public List<FluxTable> queryByGroup(QueryDataParam param){ | |||||
String measurement = param.getMeasurement(); | |||||
List<String> dropedTagNames = param.getDropedTagNames(); | |||||
Range range = param.getRange(); | |||||
String bucket = param.getBucket(); | |||||
String tagName = param.getTag().getTagName(); | |||||
String tagValue = param.getTag().getTagValue(); | |||||
String groupName = param.getGroupName(); | |||||
PageInfo pageInfo = param.getPageInfo(); | |||||
String flux = "from(bucket:\""+bucket+"\")"; | |||||
flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")"; | |||||
flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")"; | |||||
flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; | |||||
flux += "|> group(columns: [\""+groupName+"\"], mode: \"except\")"; | |||||
for(String dropName:dropedTagNames){ | |||||
flux += "|> drop(columns: [\""+dropName+"\"])"; | |||||
} | |||||
flux += "|> sort(columns: [\"_time\"], desc: true)"; | |||||
if(pageInfo!=null){ | |||||
flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; | |||||
} | |||||
return queryApi.query(flux); | |||||
} | |||||
public List<FluxTable> queryGroupByTime(QueryDataGroupByTimeParam param){ | |||||
String measurement = param.getMeasurement(); | |||||
List<String> dropedTagNames = param.getDropedTagNames(); | |||||
Range range = param.getRange(); | Range range = param.getRange(); | ||||
String bucket = param.getBucket(); | String bucket = param.getBucket(); | ||||
String tagName = param.getTag().getTagName(); | String tagName = param.getTag().getTagName(); | ||||
@@ -113,7 +177,17 @@ public enum InfluxClient { | |||||
flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")"; | flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")"; | ||||
flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")"; | flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")"; | ||||
flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; | flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; | ||||
flux += "|> drop(columns: [\""+dropedTagName+"\"])"; | |||||
for(String dropName:dropedTagNames){ | |||||
flux += "|> drop(columns: [\""+dropName+"\"])"; | |||||
} | |||||
//|> window(every: 1mo) | |||||
if(param.getTimeType() == 1){ | |||||
flux += "|> window(every: 1y)"; | |||||
}else if(param.getTimeType() == 2 ){ | |||||
flux += "|> window(every: 1mo)"; | |||||
}else{ | |||||
flux += "|> window(every: 1d)"; | |||||
} | |||||
flux += "|> sort(columns: [\"_time\"], desc: true)"; | flux += "|> sort(columns: [\"_time\"], desc: true)"; | ||||
if(pageInfo!=null){ | if(pageInfo!=null){ | ||||
flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; | flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; | ||||
@@ -18,6 +18,7 @@ public class Constant { | |||||
public static final LogLevel readTimeout = LogLevel.BODY; | public static final LogLevel readTimeout = LogLevel.BODY; | ||||
public static final LogLevel writeTimeout = LogLevel.BODY; | public static final LogLevel writeTimeout = LogLevel.BODY; | ||||
public static final LogLevel connectTimeout = LogLevel.BODY; | public static final LogLevel connectTimeout = LogLevel.BODY; | ||||
public static final String measurement = "WeightHeiHei"; | |||||
} | } |
@@ -0,0 +1,239 @@ | |||||
package com.qgs.dc.influx.controller; | |||||
import com.alibaba.fastjson.JSON; | |||||
import com.alibaba.fastjson.JSONArray; | |||||
import com.alibaba.fastjson.JSONObject; | |||||
import com.alibaba.fastjson.parser.JSONToken; | |||||
import com.influxdb.client.WriteApi; | |||||
import com.influxdb.client.domain.WritePrecision; | |||||
import com.influxdb.client.write.Point; | |||||
import com.influxdb.query.FluxRecord; | |||||
import com.influxdb.query.FluxTable; | |||||
import com.qgs.dc.influx.common.DataUtils; | |||||
import com.qgs.dc.influx.config.InfluxClient; | |||||
import com.qgs.dc.influx.param.*; | |||||
import com.qgs.dc.influx.template.Event; | |||||
import org.slf4j.Logger; | |||||
import org.slf4j.LoggerFactory; | |||||
import org.springframework.web.bind.annotation.PostMapping; | |||||
import org.springframework.web.bind.annotation.RequestBody; | |||||
import org.springframework.web.bind.annotation.RequestMapping; | |||||
import org.springframework.web.bind.annotation.RestController; | |||||
import java.time.Instant; | |||||
import java.util.ArrayList; | |||||
import java.util.List; | |||||
import java.util.Map; | |||||
import java.util.Random; | |||||
@RestController | |||||
@RequestMapping("/influx") | |||||
public class InfluxController { | |||||
private static final Logger logger = LoggerFactory.getLogger(InfluxController.class); | |||||
@PostMapping("/insertBatch") | |||||
public void insertBatch() throws InterruptedException { | |||||
List<Event> list = new ArrayList<>(); | |||||
Random r = new Random(); | |||||
for(int i=0;i<999;i++){ | |||||
Thread.sleep(10); | |||||
Event event = new Event(); | |||||
event.setTime(Instant.now()); | |||||
event.setTransationId("asas"+i); | |||||
event.setArgName("LTWeight"); | |||||
Double d = r.nextDouble() * 2.5 + 66; | |||||
event.setArgValue(d); | |||||
list.add(event); | |||||
} | |||||
InfluxClient.Client.batchInsert(list,"Weight"); | |||||
} | |||||
@PostMapping("/forTestInsertBatch") | |||||
public void forTestInsertBatch() throws InterruptedException { | |||||
List<Event> list = new ArrayList<>(); | |||||
Random r = new Random(); | |||||
for(int i=0;i<99;i++){ | |||||
Thread.sleep(10); | |||||
Event event = new Event(); | |||||
event.setTime(Instant.now()); | |||||
event.setTransationId("forbatch"+i); | |||||
event.setArgName("LTWeight"); | |||||
Double d = r.nextDouble() * 2.5 + 66; | |||||
event.setArgValue(d); | |||||
event.setBatchNum(4); | |||||
event.setInspectionSheetId(i+""); | |||||
list.add(event); | |||||
} | |||||
InfluxClient.Client.batchInsert(list,"Weight"); | |||||
} | |||||
@PostMapping("/insertBatchJYD") | |||||
public void insertBatchJYD() throws InterruptedException { | |||||
List<Event> list = new ArrayList<>(); | |||||
Random r = new Random(); | |||||
Instant instant = DataUtils.getBeforeDate(400).toInstant(); | |||||
for(int j=0;j<10;j++){ | |||||
for(int i=0;i<99;i++){ | |||||
Thread.sleep(10); | |||||
Event event = new Event(); | |||||
event.setTime(instant); | |||||
event.setTransationId("asas"+i); | |||||
event.setArgName("LTWeight"); | |||||
Double d = r.nextDouble() * 2.5 + 66; | |||||
event.setInspectionSheetId(j+""); | |||||
event.setArgValue(d); | |||||
event.setBatchNum(i); | |||||
list.add(event); | |||||
} | |||||
} | |||||
InfluxClient.Client.batchInsert(list,"WeightHei"); | |||||
} | |||||
@PostMapping("/insertBatchJYDForTest") | |||||
public void insertBatchJYDForTest() throws InterruptedException { | |||||
List<Event> list = new ArrayList<>(); | |||||
Random r = new Random(); | |||||
for(int i=0;i<999;i++){ | |||||
Thread.sleep(10); | |||||
Event event = new Event(); | |||||
event.setTime(DataUtils.getAfterDate(i).toInstant()); | |||||
event.setTransationId("asas"+i); | |||||
event.setArgName("LostDays"); | |||||
int i1 = r.nextInt(10); | |||||
// if(i1<4){ | |||||
// event.setArgValue(new Double(0)); | |||||
// }else { | |||||
// event.setArgValue(new Double(1)); | |||||
// } | |||||
event.setInspectionSheetId(i+""); | |||||
event.setBatchNum(i); | |||||
list.add(event); | |||||
} | |||||
InfluxClient.Client.batchInsert(list,"WeightHeiHei"); | |||||
} | |||||
/** | |||||
* 测试连接是否正常 | |||||
* | |||||
* @return | |||||
* true 服务正常健康 | |||||
* false 异常 | |||||
*/ | |||||
@PostMapping("/ping") | |||||
public void ping() throws InterruptedException { | |||||
boolean ping = InfluxClient.Client.ping(); | |||||
System.out.println(ping); | |||||
} | |||||
@PostMapping("/query") | |||||
public List<FluxTable> query() throws InterruptedException { | |||||
List<Event> list = new ArrayList<>(); | |||||
QueryDataParam queryDataParam = new QueryDataParam(); | |||||
queryDataParam.setBucket("qgs-bucket"); | |||||
queryDataParam.setMeasurement("ASProcessCompleteEventAS"); | |||||
List<String> dropNames = new ArrayList<>(); | |||||
dropNames.add("transationId"); | |||||
dropNames.add("inspectionSheetId"); | |||||
queryDataParam.setDropedTagNames(dropNames); | |||||
queryDataParam.setTag(new Tag("argName","arg6")); | |||||
queryDataParam.setRange(new Range(DataUtils.getBeforeDate(10).toInstant(),Instant.now())); | |||||
queryDataParam.setPageInfo(new PageInfo(1,10)); | |||||
List<FluxTable> query = InfluxClient.Client.query(queryDataParam); | |||||
for (FluxTable fluxTable : query) { | |||||
List<FluxRecord> records = fluxTable.getRecords(); | |||||
for (FluxRecord fluxRecord : records) { | |||||
System.out.println("value: " + fluxRecord.getValueByKey("_value")); | |||||
} | |||||
} | |||||
System.out.println(); | |||||
return query; | |||||
} | |||||
public static void main(String[] args) { | |||||
List<Event> list = new ArrayList<>(); | |||||
QueryDataParam queryDataParam = new QueryDataParam(); | |||||
queryDataParam.setBucket("qgs-bucket"); | |||||
queryDataParam.setMeasurement("ASProcessCompleteEventAS"); | |||||
List<String> dropNames = new ArrayList<>(); | |||||
dropNames.add("transationId"); | |||||
dropNames.add("inspectionSheetId"); | |||||
queryDataParam.setDropedTagNames(dropNames); | |||||
queryDataParam.setTag(new Tag("argName","arg7")); | |||||
queryDataParam.setRange(new Range(DataUtils.getBeforeDate(10).toInstant(),Instant.now())); | |||||
queryDataParam.setPageInfo(new PageInfo(2,10)); | |||||
List<FluxTable> query = InfluxClient.Client.query(queryDataParam); | |||||
for (FluxTable fluxTable : query) { | |||||
List<FluxRecord> records = fluxTable.getRecords(); | |||||
for (FluxRecord fluxRecord : records) { | |||||
System.out.println("value: " + fluxRecord.getValueByKey("_value")); | |||||
} | |||||
} | |||||
} | |||||
public Point insert(Event event, String measurement){ | |||||
Point point = Point.measurement(measurement) | |||||
.addTag("transationId", event.getTransationId()) | |||||
.addTag("argName", event.getArgName()) | |||||
.addField("argValue", event.getArgValue()) | |||||
.time(event.getTime().toEpochMilli(), WritePrecision.MS); | |||||
return point; | |||||
} | |||||
@PostMapping("/insert") | |||||
public void insert() throws InterruptedException { | |||||
Event event = new Event(); | |||||
event.setTime(Instant.now()); | |||||
event.setTransationId("asasd11"); | |||||
event.setArgName("argName11"); | |||||
event.setArgValue(900001d); | |||||
Point asProcessCompleteEvent = insert(event, "ASProcessCompleteEvent"); | |||||
InfluxClient.Client.insert(event,"ASProcessCompleteEvent"); | |||||
} | |||||
@PostMapping("/insertEvents") | |||||
public void insertEvents(@RequestBody InsertParam insertParam) throws InterruptedException{ | |||||
InfluxClient.Client.batchInsert(insertParam.getList(),insertParam.getMeasurement()); | |||||
} | |||||
@PostMapping("/queryEvents") | |||||
public JSONArray queryEvents(@RequestBody QueryDataParam queryDataParam) throws InterruptedException{ | |||||
List<FluxTable> fluxTables = InfluxClient.Client.query(queryDataParam); | |||||
List<FluxRecord> fluxRecords = fluxTables.get(0).getRecords(); | |||||
JSONArray jsonArray = new JSONArray(); | |||||
for (FluxRecord fluxRecord:fluxRecords){ | |||||
Map<String,Object> map = fluxRecord.getValues(); | |||||
System.out.println(map); | |||||
//todo | |||||
JSONObject jsonObject = new JSONObject(); | |||||
jsonObject.put(map.get("argName").toString(),map.get("_value").toString()); | |||||
jsonObject.put("transationId",map.get("transationId").toString()); | |||||
jsonObject.put("measurement",map.get("_measurement").toString()); | |||||
jsonObject.put("time",map.get("_time").toString()); | |||||
jsonArray.add(jsonObject); | |||||
} | |||||
System.out.println(jsonArray); | |||||
return jsonArray; | |||||
} | |||||
} |
@@ -1,73 +0,0 @@ | |||||
package com.qgs.dc.influx.controller; | |||||
import com.influxdb.client.domain.WritePrecision; | |||||
import com.influxdb.client.write.Point; | |||||
import com.qgs.dc.influx.config.InfluxClient; | |||||
import com.qgs.dc.influx.param.InsertParam; | |||||
import com.qgs.dc.influx.template.Event; | |||||
import org.slf4j.Logger; | |||||
import org.slf4j.LoggerFactory; | |||||
import org.springframework.web.bind.annotation.PostMapping; | |||||
import org.springframework.web.bind.annotation.RequestBody; | |||||
import org.springframework.web.bind.annotation.RequestMapping; | |||||
import org.springframework.web.bind.annotation.RestController; | |||||
import java.time.Instant; | |||||
import java.util.ArrayList; | |||||
import java.util.List; | |||||
@RestController | |||||
@RequestMapping("/influx") | |||||
public class InfluxDemoController { | |||||
private static final Logger logger = LoggerFactory.getLogger(InfluxDemoController.class); | |||||
// try (WriteApi writeApi = influxDBClient.makeWriteApi()) { | |||||
// Temperature temperature = new Temperature(); | |||||
// temperature.setLocation("east"); | |||||
// temperature.setValue(106.2D); | |||||
// temperature.setTime(Instant.now()); | |||||
// writeApi.writeMeasurement(WritePrecision.NS,temperature); | |||||
// } | |||||
// | |||||
// try (WriteApi writeApi = influxDBClient.makeWriteApi()) { | |||||
// Point point = Point.measurement("temperature") | |||||
// .addTag("location","south") | |||||
// .addTag("owner","wxm") | |||||
// .addField("wxm",230.8); | |||||
// writeApi.writePoint(point); | |||||
// } | |||||
@PostMapping("/insertBatch") | |||||
public void insertBatch(@RequestBody InsertParam insertParam) throws InterruptedException { | |||||
InfluxClient.Client.batchInsert(insertParam.getList(),insertParam.getMeasurement()); | |||||
} | |||||
public Point insert(Event event, String measurement){ | |||||
Point point = Point.measurement(measurement) | |||||
.addTag("transationId", event.getTransationId()) | |||||
.addTag("argName", event.getArgName()) | |||||
.addField("argValue", event.getArgValue()) | |||||
.time(event.getTime().toEpochMilli(), WritePrecision.MS); | |||||
return point; | |||||
} | |||||
@PostMapping("/insert") | |||||
public void insert() throws InterruptedException { | |||||
Event event = new Event(); | |||||
event.setTime(Instant.now()); | |||||
event.setTransationId("asasd11"); | |||||
event.setArgName("argName11"); | |||||
event.setArgValue(99765d); | |||||
Point asProcessCompleteEvent = insert(event, "ASProcessCompleteEvent"); | |||||
InfluxClient.Client.insert(event,"ASProcessCompleteEvent"); | |||||
} | |||||
} |
@@ -21,4 +21,6 @@ public class BaseParam implements Serializable { | |||||
@NotNull(message = "查询时间段不能为空") | @NotNull(message = "查询时间段不能为空") | ||||
private Range range; | private Range range; | ||||
} | } |
@@ -5,11 +5,6 @@ import lombok.Data; | |||||
import java.util.List; | import java.util.List; | ||||
/** | |||||
* @Desc: "" | |||||
* @Author: caixiang | |||||
* @DATE: 2022/10/13 16:10 | |||||
*/ | |||||
@Data | @Data | ||||
public class InsertParam { | public class InsertParam { | ||||
List<Event> list; | List<Event> list; | ||||
@@ -0,0 +1,34 @@ | |||||
package com.qgs.dc.influx.param; | |||||
import lombok.Data; | |||||
import lombok.EqualsAndHashCode; | |||||
import lombok.experimental.Accessors; | |||||
import java.util.List; | |||||
/** | |||||
* @Desc: "influx 查询条件构造" | |||||
* @Author: caixiang | |||||
* @DATE: 2022/6/29 10:17 | |||||
* | |||||
* 注意: | |||||
* 必填 | |||||
* ① measurement 不能为空 | |||||
* ② 时间段 不能为空 | |||||
* ③ bucket 不能为空 | |||||
* 非必填 | |||||
* ① 分页信息可选 | |||||
* ② tag | |||||
* | |||||
*/ | |||||
@Data | |||||
@EqualsAndHashCode(callSuper = false) | |||||
@Accessors(chain = true) | |||||
public class QueryDataGroupByTimeParam extends BaseParam{ | |||||
private Tag tag; | |||||
//查询的时候,需要忽略的字段。(transationId是唯一标识会对 最终的查询结果集产生影响) | |||||
private List<String> dropedTagNames; | |||||
private String bucket; | |||||
//1-按年分组; 2-按月分组; 3-按日分组 | |||||
private Integer timeType; | |||||
} |
@@ -4,6 +4,8 @@ import lombok.Data; | |||||
import lombok.EqualsAndHashCode; | import lombok.EqualsAndHashCode; | ||||
import lombok.experimental.Accessors; | import lombok.experimental.Accessors; | ||||
import java.util.List; | |||||
/** | /** | ||||
* @Desc: "influx 查询条件构造" | * @Desc: "influx 查询条件构造" | ||||
* @Author: caixiang | * @Author: caixiang | ||||
@@ -25,7 +27,8 @@ import lombok.experimental.Accessors; | |||||
public class QueryDataParam extends BaseParam{ | public class QueryDataParam extends BaseParam{ | ||||
private Tag tag; | private Tag tag; | ||||
//查询的时候,需要忽略的字段。(transationId是唯一标识会对 最终的查询结果集产生影响) | //查询的时候,需要忽略的字段。(transationId是唯一标识会对 最终的查询结果集产生影响) | ||||
private String dropedTagName; | |||||
private List<String> dropedTagNames; | |||||
private String bucket; | private String bucket; | ||||
private String groupName; | |||||
} | } |
@@ -1,13 +1,11 @@ | |||||
package com.qgs.dc.influx.template; | package com.qgs.dc.influx.template; | ||||
import com.influxdb.annotations.Column; | |||||
import com.influxdb.annotations.Measurement; | |||||
import lombok.Data; | import lombok.Data; | ||||
import java.time.Instant; | import java.time.Instant; | ||||
/** | /** | ||||
* @Desc: "" | |||||
* @Desc: "常规计量值 - entity" | |||||
* @Author: caixiang | * @Author: caixiang | ||||
* @DATE: 2022/6/25 11:13 | * @DATE: 2022/6/25 11:13 | ||||
*/ | */ | ||||
@@ -16,9 +14,15 @@ public class Event { | |||||
private Instant time; | private Instant time; | ||||
private String inspectionSheetId; | |||||
private String transationId; | private String transationId; | ||||
private String argName; | private String argName; | ||||
private Double argValue; | private Double argValue; | ||||
} | |||||
//批次号,可选的 | |||||
private Integer batchNum; | |||||
} |
@@ -0,0 +1,33 @@ | |||||
package com.qgs.dc.influx.template; | |||||
import lombok.Data; | |||||
import java.time.Instant; | |||||
/** | |||||
* @Desc: "常规计数值 - entity" | |||||
* @Author: caixiang | |||||
* @DATE: 2022/6/25 11:13 | |||||
*/ | |||||
@Data | |||||
public class EventForCount { | |||||
private Instant time; | |||||
private String inspectionSheetId; | |||||
private String transationId; | |||||
private String argName; | |||||
//如果是计数类型,,1 = 代表ok ;2 = nok | |||||
//todo 剩下样本量怎么估算。 | |||||
private Double argValue; | |||||
//类型 : 1 计量型 ;2.计数型 | |||||
private Integer type; | |||||
} |