package com.cnbm.influx; import com.cnbm.influx.config.InfluxClient; import com.cnbm.influx.param.QueryDataParam; import com.cnbm.influx.param.PageInfo; import com.cnbm.influx.param.Range; import com.cnbm.influx.param.Tag; import com.cnbm.influx.template.Event; import com.influxdb.client.*; import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; import com.influxdb.query.FluxRecord; import com.influxdb.query.FluxTable; import java.text.SimpleDateFormat; import java.time.Instant; import java.util.*; /** * @Desc: "" * @Author: caixiang * @DATE: 2022/6/25 11:19 */ public class Main { public static void main(String[] args) throws InterruptedException { char[] token = "lkBsC27QZr1W50BSPlGxpTqNNpwuUk5uz1dZZRPSPbCG5VmNDDUo8P3UkZIhGWwfJwkuz6ZGZ7Et4_KBaG3gHw==".toCharArray(); String org = "qgs"; String bucket = "mytest"; InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://192.168.0.170:8086", token, org, bucket); WriteApi writeApi = influxDBClient.makeWriteApi(); // InfluxService influxService = new InfluxService(); // Event event = new Event(); // event.time = Instant.now(); // event.transationId = "asas"; // event.argName = "arg5"; // event.argValue = new Double(11); // influxService.insert(event); // Event event = new Event(); // event.setTime(Instant.now()); // event.setTransationId("asasd11"); // event.setArgName("argName11"); // event.setArgValue(3d); // InfluxClient.Client.insert(event,"ASProcessCompleteEvent"); Point point = Point.measurement("ASProcessCompleteEvent") .addTag("transationId", "112311") .addTag("argName", "argName11") .addField("argValue", 3D) .time(Instant.now().toEpochMilli(), WritePrecision.MS); Point point2 = Point.measurement("ASProcessCompleteEvent") .addTag("transationId", "222312") .addTag("argName", "argName11") .addField("argValue", 4D) .time(Instant.now().toEpochMilli(), WritePrecision.MS); List list = new ArrayList<>(); list.add(point); list.add(point2); writeApi.writePoints(list); //todo api.writeMeasurements(WritePrecision.NS, Arrays.asList(new H2OFeetMeasurement("coyote_creek", 15.0D, null, Instant.ofEpochSecond(0, 15)), new H2OFeetMeasurement("coyote_creek", 16.0D, null, Instant.ofEpochSecond(0, 16)))); // List events = new ArrayList<>(); // for(int i=0;i<99;i++){ // // Event event = new Event(); // event.time = Instant.now(); // event.transationId = "asas"+i; // event.argName = "arg7"; // event.argValue = new Double(i); // events.add(event); // } // List qList = new ArrayList<>(); // Event event = new Event(); // event.time = Instant.now(); // event.transationId = "asas"; // event.argName = "arg7"; // event.argValue = new Double(1); // Thread.sleep(100); // Event event2 = new Event(); // event2.time = Instant.now(); // event2.transationId = "asas"; // event2.argName = "arg7"; // event2.argValue = new Double(2); // qList.add(event); // qList.add(event2); // writeApi.writeMeasurement( WritePrecision.NS, qList); // for(int i=0;i<10;i++){ // Temperature temperature = new Temperature(); // temperature.location = "south"; // temperature.value = new Double(i); // temperature.type = "equipment3"; // temperature.time = Instant.now(); // // writeApi.writeMeasurement( WritePrecision.NS, temperature); // } // String flux = "from(bucket:\"mytest\") |> range(start: -60m)"; // flux += "|> filter(fn: (r) =>\n" + // " r._measurement == \"ASProcessCompleteEvent\" and \n" + //// " r._field == \"type\" and \n" + //对应 Field key // " r.argName == \"arg3\"\n" + //对应 Tags key (Tag 信息无法在FluxRecord 里面获取。) // " )"; // QueryApi queryApi = influxDBClient.getQueryApi(); // // List tables = queryApi.query(flux); // for (FluxTable fluxTable : tables) { // List records = fluxTable.getRecords(); // for (FluxRecord fluxRecord : records) { // System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); //// System.out.println("time: "+fluxRecord.getTime() +" key:"++" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); // // } // } // from(bucket: "mytest") // |> range(start: 2022-06-29T11:30:00Z, stop: 2022-06-29T12:30:00Z) // |> filter(fn: (r) => r["_measurement"] == "ASProcessCompleteEvent") // |> filter(fn: (r) => r["argName"] == "arg4") // |> drop(columns: ["transationId"]) // |> sort(columns: ["_time"], desc: true) // 取前10条数据 // |> limit(n: 10, offset: 0) // 取 10-20 条数据 // |> limit(n: 10, offset: 10) // 取 20-30 条数据 // |> limit(n: 10, offset: 20) // QueryDataParam queryDataParam = new QueryDataParam(); // queryDataParam.setBucket("mytest"); // queryDataParam.setRange(new Range(getDate().toInstant(),new Date().toInstant())); // queryDataParam.setMeasurement("ASProcessCompleteEvent"); // queryDataParam.setTag(new Tag("argName","arg4")); // queryDataParam.setDropedTagName("transationId"); // queryDataParam.setPageInfo(new PageInfo(1,100)); // // List tables = query(queryDataParam,influxDBClient); // List records1 = tables.get(0).getRecords(); // List> lists = Utils.fixedGroup(records1, 10); // for (FluxTable fluxTable : tables) { // List records = fluxTable.getRecords(); // for (FluxRecord fluxRecord : records) { // System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); // // } // } influxDBClient.close(); } public static Date getDate(){ Date date = new Date();//获取当前日期 SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");//格式化一下 Calendar calendar1 = Calendar.getInstance();//获取对日期操作的类对象 //两种写法都可以获取到前三天的日期 // calendar1.set(Calendar.DAY_OF_YEAR,calendar1.get(Calendar.DAY_OF_YEAR) -3); //在当前时间的基础上获取前三天的日期 calendar1.add(Calendar.DATE, -3); //add方法 参数也可传入 月份,获取的是前几月或后几月的日期 //calendar1.add(Calendar.MONTH, -3); Date day = calendar1.getTime(); return day; } private static List query(QueryDataParam param,InfluxDBClient influxDBClient){ String measurement = param.getMeasurement(); String dropedTagName = param.getDropedTagName(); Range range = param.getRange(); String bucket = param.getBucket(); String tagName = param.getTag().getTagName(); String tagValue = param.getTag().getTagValue(); PageInfo pageInfo = param.getPageInfo(); String flux = "from(bucket:\""+bucket+"\")"; flux += "|> range(start: "+range.getBegin().toString()+",stop:"+range.getEnd().toString()+") \n"; flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\") \n"; flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\") \n"; flux += "|> drop(columns: [\""+dropedTagName+"\"]) \n"; flux += "|> sort(columns: [\"_time\"], desc: true) \n"; if(pageInfo!=null){ flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; } QueryApi queryApi = influxDBClient.getQueryApi(); List tables = queryApi.query(flux); for (FluxTable fluxTable : tables) { List records = fluxTable.getRecords(); for (FluxRecord fluxRecord : records) { System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); } } return tables; } }