diff --git a/src/main/java/com/qgs/dc/influx/Main.java b/src/main/java/com/qgs/dc/influx/Main.java index d31dcc8..6239a1a 100644 --- a/src/main/java/com/qgs/dc/influx/Main.java +++ b/src/main/java/com/qgs/dc/influx/Main.java @@ -1,18 +1,24 @@ package com.qgs.dc.influx; +//import com.qgs.dc.influx.config.InfluxClient; -import com.influxdb.client.*; -import com.influxdb.client.domain.WritePrecision; -import com.influxdb.client.write.Point; +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.InfluxDBClientFactory; +import com.influxdb.client.QueryApi; +import com.influxdb.client.WriteApi; import com.influxdb.query.FluxRecord; import com.influxdb.query.FluxTable; import com.qgs.dc.influx.param.PageInfo; import com.qgs.dc.influx.param.QueryDataParam; import com.qgs.dc.influx.param.Range; +import com.qgs.dc.influx.param.Tag; import java.text.SimpleDateFormat; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.List; /** * @Desc: "" @@ -23,7 +29,7 @@ public class Main { public static void main(String[] args) throws InterruptedException { char[] token = "lkBsC27QZr1W50BSPlGxpTqNNpwuUk5uz1dZZRPSPbCG5VmNDDUo8P3UkZIhGWwfJwkuz6ZGZ7Et4_KBaG3gHw==".toCharArray(); String org = "qgs"; - String bucket = "mytest"; + String bucket = "qgs-bucket"; InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://192.168.0.170:8086", token, org, bucket); @@ -45,23 +51,23 @@ public class Main { // InfluxClient.Client.insert(event,"ASProcessCompleteEvent"); - Point point = Point.measurement("ASProcessCompleteEvent") - .addTag("transationId", "112311") - .addTag("argName", "argName11") - .addField("argValue", 3D) - .time(Instant.now().toEpochMilli(), WritePrecision.MS); - - - Point point2 = Point.measurement("ASProcessCompleteEvent") - .addTag("transationId", "222312") - .addTag("argName", "argName11") - .addField("argValue", 4D) - .time(Instant.now().toEpochMilli(), WritePrecision.MS); - List list = new ArrayList<>(); - list.add(point); - list.add(point2); - - writeApi.writePoints(list); +// Point point = Point.measurement("ASProcessCompleteEvent") +// .addTag("transationId", "112311") +// .addTag("argName", "argName11") +// .addField("argValue", 3D) +// .time(Instant.now().toEpochMilli(), WritePrecision.MS); +// +// +// Point point2 = Point.measurement("ASProcessCompleteEvent") +// .addTag("transationId", "222312") +// .addTag("argName", "argName11") +// .addField("argValue", 4D) +// .time(Instant.now().toEpochMilli(), WritePrecision.MS); +// List list = new ArrayList<>(); +// list.add(point); +// list.add(point2); +// +// writeApi.writePoints(list); //todo api.writeMeasurements(WritePrecision.NS, Arrays.asList(new H2OFeetMeasurement("coyote_creek", 15.0D, null, Instant.ofEpochSecond(0, 15)), new H2OFeetMeasurement("coyote_creek", 16.0D, null, Instant.ofEpochSecond(0, 16)))); // List events = new ArrayList<>(); @@ -101,7 +107,7 @@ public class Main { // } -// String flux = "from(bucket:\"mytest\") |> range(start: -60m)"; +// String flux = "from(bucket:\"mytest\") |> range(start: -6000000000000000m)"; // flux += "|> filter(fn: (r) =>\n" + // " r._measurement == \"ASProcessCompleteEvent\" and \n" + //// " r._field == \"type\" and \n" + //对应 Field key @@ -113,6 +119,7 @@ public class Main { // for (FluxTable fluxTable : tables) { // List records = fluxTable.getRecords(); // for (FluxRecord fluxRecord : records) { +// Double o = (Double)fluxRecord.getValueByKey("_value"); // System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); //// System.out.println("time: "+fluxRecord.getTime() +" key:"++" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); // @@ -128,36 +135,39 @@ public class Main { // |> sort(columns: ["_time"], desc: true) // 取前10条数据 // |> limit(n: 10, offset: 0) - +// // 取 10-20 条数据 // |> limit(n: 10, offset: 10) - +// // 取 20-30 条数据 // |> limit(n: 10, offset: 20) - //22-57 22.3 - //23-20 78 - //24 -// QueryDataParam queryDataParam = new QueryDataParam(); -// queryDataParam.setBucket("mytest"); -// queryDataParam.setRange(new Range(getDate().toInstant(),new Date().toInstant())); -// queryDataParam.setMeasurement("ASProcessCompleteEvent"); -// queryDataParam.setTag(new Tag("argName","arg4")); + + QueryDataParam queryDataParam = new QueryDataParam(); + queryDataParam.setBucket("qgs-bucket"); + queryDataParam.setRange(new Range(getDate().toInstant(),new Date().toInstant())); + queryDataParam.setMeasurement("Weight"); + queryDataParam.setTag(new Tag("argName","LTWeight")); // queryDataParam.setDropedTagName("transationId"); -// queryDataParam.setPageInfo(new PageInfo(1,100)); -// -// List tables = query(queryDataParam,influxDBClient); + List dropNames = new ArrayList<>(); + dropNames.add("transationId"); + dropNames.add("inspectionSheetId"); + dropNames.add("batchNum"); + queryDataParam.setDropedTagNames(dropNames); + queryDataParam.setPageInfo(new PageInfo(1,10000)); + + List tables = query(queryDataParam,influxDBClient); // List records1 = tables.get(0).getRecords(); // List> lists = Utils.fixedGroup(records1, 10); -// for (FluxTable fluxTable : tables) { -// List records = fluxTable.getRecords(); -// for (FluxRecord fluxRecord : records) { -// System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); -// -// } -// } + for (FluxTable fluxTable : tables) { + List records = fluxTable.getRecords(); + for (FluxRecord fluxRecord : records) { + Instant timms = fluxRecord.getTime(); + System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); + } + } influxDBClient.close(); } @@ -170,16 +180,16 @@ public class Main { //两种写法都可以获取到前三天的日期 // calendar1.set(Calendar.DAY_OF_YEAR,calendar1.get(Calendar.DAY_OF_YEAR) -3); //在当前时间的基础上获取前三天的日期 - calendar1.add(Calendar.DATE, -3); + calendar1.add(Calendar.DATE, -1000); //add方法 参数也可传入 月份,获取的是前几月或后几月的日期 //calendar1.add(Calendar.MONTH, -3); Date day = calendar1.getTime(); return day; } - private static List query(QueryDataParam param, InfluxDBClient influxDBClient){ + private static List query(QueryDataParam param,InfluxDBClient influxDBClient){ String measurement = param.getMeasurement(); - String dropedTagName = param.getDropedTagName(); + List dropedTagName = param.getDropedTagNames(); Range range = param.getRange(); String bucket = param.getBucket(); String tagName = param.getTag().getTagName(); @@ -190,8 +200,11 @@ public class Main { flux += "|> range(start: "+range.getBegin().toString()+",stop:"+range.getEnd().toString()+") \n"; flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\") \n"; flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\") \n"; - flux += "|> drop(columns: [\""+dropedTagName+"\"]) \n"; + for(String dropName:dropedTagName){ + flux += "|> drop(columns: [\""+ dropName +"\"]) \n"; + } flux += "|> sort(columns: [\"_time\"], desc: true) \n"; + flux += "|> window(every: 1y) \n"; if(pageInfo!=null){ flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; } diff --git a/src/main/java/com/qgs/dc/influx/common/Utils.java b/src/main/java/com/qgs/dc/influx/common/DataUtils.java similarity index 52% rename from src/main/java/com/qgs/dc/influx/common/Utils.java rename to src/main/java/com/qgs/dc/influx/common/DataUtils.java index abcbab8..beb9b05 100644 --- a/src/main/java/com/qgs/dc/influx/common/Utils.java +++ b/src/main/java/com/qgs/dc/influx/common/DataUtils.java @@ -9,9 +9,9 @@ import java.util.List; /** * @Desc: "" * @Author: caixiang - * @DATE: 2022/6/29 16:23 + * @DATE: 2022/7/12 14:23 */ -public class Utils { +public class DataUtils { public static void main(String[] args) { ArrayList arrs = new ArrayList<>(); @@ -23,7 +23,45 @@ public class Utils { } + public List> split(List value) { + List> result = new ArrayList<>(); + int day = value.iterator().next().getDate(); + List newListEntry = new ArrayList<>(); + + for (Date date : value) { + if (date.getDate() == day) { + newListEntry.add(date); + } + else { + day = date.getDate(); + result.add(newListEntry); + newListEntry = new ArrayList<>(); + newListEntry.add(date); + } + } + result.add(newListEntry);//because the last sublist was not added + + return result; + } + + public static String splitToNeed(String s,Integer type){ + String[] s1 = s.split(" "); + String[] split = s1[0].split("-"); + String year = split[0]; + String mon = split[1]; + String day = split[2]; + if(type == 1 ){ + //年 + return year; + }else if(type == 2 ){ + //月 + return year+"-"+mon; + }else { + //日 + return s1[0]; + } + } public static Date getBeforeDate(Integer number){ Date date = new Date();//获取当前日期 SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");//格式化一下 @@ -37,6 +75,19 @@ public class Utils { Date day = calendar1.getTime(); return day; } + public static Date getAfterDate(Integer number){ + Date date = new Date();//获取当前日期 + SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");//格式化一下 + Calendar calendar1 = Calendar.getInstance();//获取对日期操作的类对象 + //两种写法都可以获取到前三天的日期 + // calendar1.set(Calendar.DAY_OF_YEAR,calendar1.get(Calendar.DAY_OF_YEAR) -3); + //在当前时间的基础上获取前三天的日期 + calendar1.add(Calendar.DATE, 0+number); + //add方法 参数也可传入 月份,获取的是前几月或后几月的日期 + //calendar1.add(Calendar.MONTH, -3); + Date day = calendar1.getTime(); + return day; + } /** * 将一组数据固定分组,每组n个元素 @@ -64,6 +115,4 @@ public class Utils { } return result; } - - } diff --git a/src/main/java/com/qgs/dc/influx/config/InfluxClient.java b/src/main/java/com/qgs/dc/influx/config/InfluxClient.java index f437875..c3834f7 100644 --- a/src/main/java/com/qgs/dc/influx/config/InfluxClient.java +++ b/src/main/java/com/qgs/dc/influx/config/InfluxClient.java @@ -1,6 +1,5 @@ package com.qgs.dc.influx.config; - import com.influxdb.client.InfluxDBClient; import com.influxdb.client.InfluxDBClientFactory; import com.influxdb.client.QueryApi; @@ -9,6 +8,7 @@ import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; import com.influxdb.query.FluxTable; import com.qgs.dc.influx.param.PageInfo; +import com.qgs.dc.influx.param.QueryDataGroupByTimeParam; import com.qgs.dc.influx.param.QueryDataParam; import com.qgs.dc.influx.param.Range; import com.qgs.dc.influx.template.Event; @@ -60,16 +60,17 @@ public enum InfluxClient { * true 服务正常健康 * false 异常 */ - private boolean ping() { + public boolean ping() { boolean isConnected = false; Boolean pong; try { pong = influxDBClient.ping(); if (pong != null) { - isConnected = true; + isConnected = pong; } } catch (Exception e) { e.printStackTrace(); + return false; } return isConnected; } @@ -77,6 +78,7 @@ public enum InfluxClient { public void insert(Event event, String measurement){ Point point = Point.measurement(measurement) .addTag("transationId", event.getTransationId()) + .addTag("inspectionSheetId", event.getInspectionSheetId()) .addTag("argName", event.getArgName()) .addField("argValue", event.getArgValue()) .time(event.getTime().toEpochMilli(), WritePrecision.MS); @@ -89,11 +91,25 @@ public enum InfluxClient { public void batchInsert(List events, String measurement){ List list = new ArrayList<>(); for(Event event:events){ - Point point = Point.measurement(measurement) - .addTag("transationId", event.getTransationId()) - .addTag("argName", event.getArgName()) - .addField("argValue", event.getArgValue()) - .time(event.getTime().toEpochMilli(), WritePrecision.MS); + Point point = null; + if(event.getBatchNum()==null){ + point = Point.measurement(measurement) + .addTag("transationId", event.getTransationId()) + .addTag("inspectionSheetId", event.getInspectionSheetId()) + .addTag("argName", event.getArgName()) + .addField("argValue", event.getArgValue()) + .time(event.getTime().toEpochMilli(), WritePrecision.MS); + }else { + point = Point.measurement(measurement) + .addTag("transationId", event.getTransationId()) + .addTag("inspectionSheetId", event.getInspectionSheetId()) + .addTag("batchNum", event.getBatchNum().toString()) + .addTag("argName", event.getArgName()) + .addField("argValue", event.getArgValue()) + .time(event.getTime().toEpochMilli(), WritePrecision.MS); + } + + list.add(point); } writeApi.writePoints(list); @@ -102,7 +118,55 @@ public enum InfluxClient { public List query(QueryDataParam param){ String measurement = param.getMeasurement(); - String dropedTagName = param.getDropedTagName(); + List dropedTagNames = param.getDropedTagNames(); + Range range = param.getRange(); + String bucket = param.getBucket(); + String tagName = param.getTag().getTagName(); + String tagValue = param.getTag().getTagValue(); + PageInfo pageInfo = param.getPageInfo(); + + String flux = "from(bucket:\""+bucket+"\")"; + flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")"; + flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")"; + flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; + for(String dropName:dropedTagNames){ + flux += "|> drop(columns: [\""+dropName+"\"])"; + } + flux += "|> sort(columns: [\"_time\"], desc: true)"; + if(pageInfo!=null){ + flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; + } + return queryApi.query(flux); + } + + public List queryByGroup(QueryDataParam param){ + String measurement = param.getMeasurement(); + List dropedTagNames = param.getDropedTagNames(); + Range range = param.getRange(); + String bucket = param.getBucket(); + String tagName = param.getTag().getTagName(); + String tagValue = param.getTag().getTagValue(); + String groupName = param.getGroupName(); + PageInfo pageInfo = param.getPageInfo(); + + String flux = "from(bucket:\""+bucket+"\")"; + flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")"; + flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")"; + flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; + flux += "|> group(columns: [\""+groupName+"\"], mode: \"except\")"; + for(String dropName:dropedTagNames){ + flux += "|> drop(columns: [\""+dropName+"\"])"; + } + flux += "|> sort(columns: [\"_time\"], desc: true)"; + if(pageInfo!=null){ + flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; + } + return queryApi.query(flux); + } + + public List queryGroupByTime(QueryDataGroupByTimeParam param){ + String measurement = param.getMeasurement(); + List dropedTagNames = param.getDropedTagNames(); Range range = param.getRange(); String bucket = param.getBucket(); String tagName = param.getTag().getTagName(); @@ -113,7 +177,17 @@ public enum InfluxClient { flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")"; flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")"; flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; - flux += "|> drop(columns: [\""+dropedTagName+"\"])"; + for(String dropName:dropedTagNames){ + flux += "|> drop(columns: [\""+dropName+"\"])"; + } + //|> window(every: 1mo) + if(param.getTimeType() == 1){ + flux += "|> window(every: 1y)"; + }else if(param.getTimeType() == 2 ){ + flux += "|> window(every: 1mo)"; + }else{ + flux += "|> window(every: 1d)"; + } flux += "|> sort(columns: [\"_time\"], desc: true)"; if(pageInfo!=null){ flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; diff --git a/src/main/java/com/qgs/dc/influx/constant/Constant.java b/src/main/java/com/qgs/dc/influx/constant/Constant.java index 8ac20f8..fcda73c 100644 --- a/src/main/java/com/qgs/dc/influx/constant/Constant.java +++ b/src/main/java/com/qgs/dc/influx/constant/Constant.java @@ -18,6 +18,7 @@ public class Constant { public static final LogLevel readTimeout = LogLevel.BODY; public static final LogLevel writeTimeout = LogLevel.BODY; public static final LogLevel connectTimeout = LogLevel.BODY; + public static final String measurement = "WeightHeiHei"; } diff --git a/src/main/java/com/qgs/dc/influx/controller/InfluxController.java b/src/main/java/com/qgs/dc/influx/controller/InfluxController.java new file mode 100644 index 0000000..d8dbaf0 --- /dev/null +++ b/src/main/java/com/qgs/dc/influx/controller/InfluxController.java @@ -0,0 +1,239 @@ +package com.qgs.dc.influx.controller; + + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.parser.JSONToken; +import com.influxdb.client.WriteApi; +import com.influxdb.client.domain.WritePrecision; +import com.influxdb.client.write.Point; +import com.influxdb.query.FluxRecord; +import com.influxdb.query.FluxTable; +import com.qgs.dc.influx.common.DataUtils; +import com.qgs.dc.influx.config.InfluxClient; +import com.qgs.dc.influx.param.*; +import com.qgs.dc.influx.template.Event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; + +@RestController +@RequestMapping("/influx") +public class InfluxController { + private static final Logger logger = LoggerFactory.getLogger(InfluxController.class); + + + + + @PostMapping("/insertBatch") + public void insertBatch() throws InterruptedException { + List list = new ArrayList<>(); + Random r = new Random(); + + for(int i=0;i<999;i++){ + Thread.sleep(10); + Event event = new Event(); + event.setTime(Instant.now()); + event.setTransationId("asas"+i); + event.setArgName("LTWeight"); + Double d = r.nextDouble() * 2.5 + 66; + event.setArgValue(d); + + list.add(event); + } + InfluxClient.Client.batchInsert(list,"Weight"); + } + + @PostMapping("/forTestInsertBatch") + public void forTestInsertBatch() throws InterruptedException { + List list = new ArrayList<>(); + Random r = new Random(); + for(int i=0;i<99;i++){ + Thread.sleep(10); + Event event = new Event(); + event.setTime(Instant.now()); + event.setTransationId("forbatch"+i); + event.setArgName("LTWeight"); + Double d = r.nextDouble() * 2.5 + 66; + event.setArgValue(d); + event.setBatchNum(4); + event.setInspectionSheetId(i+""); + list.add(event); + } + InfluxClient.Client.batchInsert(list,"Weight"); + } + + + + + @PostMapping("/insertBatchJYD") + public void insertBatchJYD() throws InterruptedException { + List list = new ArrayList<>(); + Random r = new Random(); + Instant instant = DataUtils.getBeforeDate(400).toInstant(); + + for(int j=0;j<10;j++){ + for(int i=0;i<99;i++){ + Thread.sleep(10); + Event event = new Event(); + event.setTime(instant); + event.setTransationId("asas"+i); + event.setArgName("LTWeight"); + Double d = r.nextDouble() * 2.5 + 66; + event.setInspectionSheetId(j+""); + event.setArgValue(d); + event.setBatchNum(i); + list.add(event); + } + } + InfluxClient.Client.batchInsert(list,"WeightHei"); + } + + + + @PostMapping("/insertBatchJYDForTest") + public void insertBatchJYDForTest() throws InterruptedException { + List list = new ArrayList<>(); + Random r = new Random(); + + for(int i=0;i<999;i++){ + Thread.sleep(10); + Event event = new Event(); + event.setTime(DataUtils.getAfterDate(i).toInstant()); + event.setTransationId("asas"+i); + event.setArgName("LostDays"); + int i1 = r.nextInt(10); +// if(i1<4){ +// event.setArgValue(new Double(0)); +// }else { +// event.setArgValue(new Double(1)); +// } + event.setInspectionSheetId(i+""); + + event.setBatchNum(i); + list.add(event); + } + InfluxClient.Client.batchInsert(list,"WeightHeiHei"); + } + + /** + * 测试连接是否正常 + * + * @return + * true 服务正常健康 + * false 异常 + */ + @PostMapping("/ping") + public void ping() throws InterruptedException { + boolean ping = InfluxClient.Client.ping(); + System.out.println(ping); + } + + @PostMapping("/query") + public List query() throws InterruptedException { + List list = new ArrayList<>(); + + QueryDataParam queryDataParam = new QueryDataParam(); + queryDataParam.setBucket("qgs-bucket"); + queryDataParam.setMeasurement("ASProcessCompleteEventAS"); + List dropNames = new ArrayList<>(); + dropNames.add("transationId"); + dropNames.add("inspectionSheetId"); + queryDataParam.setDropedTagNames(dropNames); + queryDataParam.setTag(new Tag("argName","arg6")); + queryDataParam.setRange(new Range(DataUtils.getBeforeDate(10).toInstant(),Instant.now())); + queryDataParam.setPageInfo(new PageInfo(1,10)); + List query = InfluxClient.Client.query(queryDataParam); + + + for (FluxTable fluxTable : query) { + List records = fluxTable.getRecords(); + for (FluxRecord fluxRecord : records) { + System.out.println("value: " + fluxRecord.getValueByKey("_value")); + } + } + System.out.println(); + + return query; + } + + public static void main(String[] args) { + List list = new ArrayList<>(); + + QueryDataParam queryDataParam = new QueryDataParam(); + queryDataParam.setBucket("qgs-bucket"); + queryDataParam.setMeasurement("ASProcessCompleteEventAS"); + List dropNames = new ArrayList<>(); + dropNames.add("transationId"); + dropNames.add("inspectionSheetId"); + queryDataParam.setDropedTagNames(dropNames); + queryDataParam.setTag(new Tag("argName","arg7")); + queryDataParam.setRange(new Range(DataUtils.getBeforeDate(10).toInstant(),Instant.now())); + queryDataParam.setPageInfo(new PageInfo(2,10)); + List query = InfluxClient.Client.query(queryDataParam); + + + for (FluxTable fluxTable : query) { + List records = fluxTable.getRecords(); + for (FluxRecord fluxRecord : records) { + System.out.println("value: " + fluxRecord.getValueByKey("_value")); + } + } + } + + + public Point insert(Event event, String measurement){ + Point point = Point.measurement(measurement) + .addTag("transationId", event.getTransationId()) + .addTag("argName", event.getArgName()) + .addField("argValue", event.getArgValue()) + .time(event.getTime().toEpochMilli(), WritePrecision.MS); + return point; + } + + @PostMapping("/insert") + public void insert() throws InterruptedException { + Event event = new Event(); + event.setTime(Instant.now()); + event.setTransationId("asasd11"); + event.setArgName("argName11"); + event.setArgValue(900001d); + Point asProcessCompleteEvent = insert(event, "ASProcessCompleteEvent"); + InfluxClient.Client.insert(event,"ASProcessCompleteEvent"); + } + + @PostMapping("/insertEvents") + public void insertEvents(@RequestBody InsertParam insertParam) throws InterruptedException{ + InfluxClient.Client.batchInsert(insertParam.getList(),insertParam.getMeasurement()); + } + + @PostMapping("/queryEvents") + public JSONArray queryEvents(@RequestBody QueryDataParam queryDataParam) throws InterruptedException{ + List fluxTables = InfluxClient.Client.query(queryDataParam); + List fluxRecords = fluxTables.get(0).getRecords(); + JSONArray jsonArray = new JSONArray(); + for (FluxRecord fluxRecord:fluxRecords){ + Map map = fluxRecord.getValues(); + System.out.println(map); + //todo + JSONObject jsonObject = new JSONObject(); + jsonObject.put(map.get("argName").toString(),map.get("_value").toString()); + jsonObject.put("transationId",map.get("transationId").toString()); + jsonObject.put("measurement",map.get("_measurement").toString()); + jsonObject.put("time",map.get("_time").toString()); + jsonArray.add(jsonObject); + } + System.out.println(jsonArray); + return jsonArray; + } +} diff --git a/src/main/java/com/qgs/dc/influx/controller/InfluxDemoController.java b/src/main/java/com/qgs/dc/influx/controller/InfluxDemoController.java deleted file mode 100644 index 63c0228..0000000 --- a/src/main/java/com/qgs/dc/influx/controller/InfluxDemoController.java +++ /dev/null @@ -1,73 +0,0 @@ -package com.qgs.dc.influx.controller; - -import com.influxdb.client.domain.WritePrecision; -import com.influxdb.client.write.Point; -import com.qgs.dc.influx.config.InfluxClient; -import com.qgs.dc.influx.param.InsertParam; -import com.qgs.dc.influx.template.Event; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; - -@RestController -@RequestMapping("/influx") -public class InfluxDemoController { - private static final Logger logger = LoggerFactory.getLogger(InfluxDemoController.class); - - - -// try (WriteApi writeApi = influxDBClient.makeWriteApi()) { -// Temperature temperature = new Temperature(); -// temperature.setLocation("east"); -// temperature.setValue(106.2D); -// temperature.setTime(Instant.now()); -// writeApi.writeMeasurement(WritePrecision.NS,temperature); -// } -// -// try (WriteApi writeApi = influxDBClient.makeWriteApi()) { -// Point point = Point.measurement("temperature") -// .addTag("location","south") -// .addTag("owner","wxm") -// .addField("wxm",230.8); -// writeApi.writePoint(point); -// } - - - @PostMapping("/insertBatch") - public void insertBatch(@RequestBody InsertParam insertParam) throws InterruptedException { - InfluxClient.Client.batchInsert(insertParam.getList(),insertParam.getMeasurement()); - } - - - public Point insert(Event event, String measurement){ - Point point = Point.measurement(measurement) - .addTag("transationId", event.getTransationId()) - .addTag("argName", event.getArgName()) - .addField("argValue", event.getArgValue()) - .time(event.getTime().toEpochMilli(), WritePrecision.MS); - return point; - - } - - - @PostMapping("/insert") - public void insert() throws InterruptedException { - Event event = new Event(); - event.setTime(Instant.now()); - event.setTransationId("asasd11"); - event.setArgName("argName11"); - event.setArgValue(99765d); - Point asProcessCompleteEvent = insert(event, "ASProcessCompleteEvent"); - InfluxClient.Client.insert(event,"ASProcessCompleteEvent"); - } - - - - -} diff --git a/src/main/java/com/qgs/dc/influx/param/BaseParam.java b/src/main/java/com/qgs/dc/influx/param/BaseParam.java index 3452566..5488e11 100644 --- a/src/main/java/com/qgs/dc/influx/param/BaseParam.java +++ b/src/main/java/com/qgs/dc/influx/param/BaseParam.java @@ -21,4 +21,6 @@ public class BaseParam implements Serializable { @NotNull(message = "查询时间段不能为空") private Range range; + + } diff --git a/src/main/java/com/qgs/dc/influx/param/InsertParam.java b/src/main/java/com/qgs/dc/influx/param/InsertParam.java index 58d462d..5ec7004 100644 --- a/src/main/java/com/qgs/dc/influx/param/InsertParam.java +++ b/src/main/java/com/qgs/dc/influx/param/InsertParam.java @@ -5,11 +5,6 @@ import lombok.Data; import java.util.List; -/** - * @Desc: "" - * @Author: caixiang - * @DATE: 2022/10/13 16:10 - */ @Data public class InsertParam { List list; diff --git a/src/main/java/com/qgs/dc/influx/param/QueryDataGroupByTimeParam.java b/src/main/java/com/qgs/dc/influx/param/QueryDataGroupByTimeParam.java new file mode 100644 index 0000000..98d3d9c --- /dev/null +++ b/src/main/java/com/qgs/dc/influx/param/QueryDataGroupByTimeParam.java @@ -0,0 +1,34 @@ +package com.qgs.dc.influx.param; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.experimental.Accessors; + +import java.util.List; + +/** + * @Desc: "influx 查询条件构造" + * @Author: caixiang + * @DATE: 2022/6/29 10:17 + * + * 注意: + * 必填 + * ① measurement 不能为空 + * ② 时间段 不能为空 + * ③ bucket 不能为空 + * 非必填 + * ① 分页信息可选 + * ② tag + * + */ +@Data +@EqualsAndHashCode(callSuper = false) +@Accessors(chain = true) +public class QueryDataGroupByTimeParam extends BaseParam{ + private Tag tag; + //查询的时候,需要忽略的字段。(transationId是唯一标识会对 最终的查询结果集产生影响) + private List dropedTagNames; + private String bucket; + //1-按年分组; 2-按月分组; 3-按日分组 + private Integer timeType; +} diff --git a/src/main/java/com/qgs/dc/influx/param/QueryDataParam.java b/src/main/java/com/qgs/dc/influx/param/QueryDataParam.java index 604cc93..31c8c06 100644 --- a/src/main/java/com/qgs/dc/influx/param/QueryDataParam.java +++ b/src/main/java/com/qgs/dc/influx/param/QueryDataParam.java @@ -4,6 +4,8 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.experimental.Accessors; +import java.util.List; + /** * @Desc: "influx 查询条件构造" * @Author: caixiang @@ -25,7 +27,8 @@ import lombok.experimental.Accessors; public class QueryDataParam extends BaseParam{ private Tag tag; //查询的时候,需要忽略的字段。(transationId是唯一标识会对 最终的查询结果集产生影响) - private String dropedTagName; + private List dropedTagNames; private String bucket; + private String groupName; } diff --git a/src/main/java/com/qgs/dc/influx/template/Event.java b/src/main/java/com/qgs/dc/influx/template/Event.java index 822df54..3093827 100644 --- a/src/main/java/com/qgs/dc/influx/template/Event.java +++ b/src/main/java/com/qgs/dc/influx/template/Event.java @@ -1,13 +1,11 @@ package com.qgs.dc.influx.template; -import com.influxdb.annotations.Column; -import com.influxdb.annotations.Measurement; import lombok.Data; import java.time.Instant; /** - * @Desc: "" + * @Desc: "常规计量值 - entity" * @Author: caixiang * @DATE: 2022/6/25 11:13 */ @@ -16,9 +14,15 @@ public class Event { private Instant time; + private String inspectionSheetId; + private String transationId; private String argName; private Double argValue; -} + + //批次号,可选的 + private Integer batchNum; + +} \ No newline at end of file diff --git a/src/main/java/com/qgs/dc/influx/template/EventForCount.java b/src/main/java/com/qgs/dc/influx/template/EventForCount.java new file mode 100644 index 0000000..65577bb --- /dev/null +++ b/src/main/java/com/qgs/dc/influx/template/EventForCount.java @@ -0,0 +1,33 @@ +package com.qgs.dc.influx.template; + +import lombok.Data; + +import java.time.Instant; + +/** + * @Desc: "常规计数值 - entity" + * @Author: caixiang + * @DATE: 2022/6/25 11:13 + */ +@Data +public class EventForCount { + + private Instant time; + + private String inspectionSheetId; + + private String transationId; + + private String argName; + + + //如果是计数类型,,1 = 代表ok ;2 = nok + //todo 剩下样本量怎么估算。 + private Double argValue; + + //类型 : 1 计量型 ;2.计数型 + private Integer type; + + + +} \ No newline at end of file