package com.cnbm.influx.config; import com.cnbm.influx.constant.Constant; import com.cnbm.influx.param.PageInfo; import com.cnbm.influx.param.QueryDataGroupByTimeParam; import com.cnbm.influx.param.QueryDataParam; import com.cnbm.influx.param.Range; import com.cnbm.influx.template.Event; import com.influxdb.client.InfluxDBClient; import com.influxdb.client.InfluxDBClientFactory; import com.influxdb.client.QueryApi; import com.influxdb.client.WriteApi; import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; import com.influxdb.query.FluxTable; import java.util.ArrayList; import java.util.List; public enum InfluxClient { /** * influxdb 读写客户端,,如果write比较繁忙,后续可以考虑,维护 client一个线程池。 * */ Client("http://192.168.0.170:8086","lkBsC27QZr1W50BSPlGxpTqNNpwuUk5uz1dZZRPSPbCG5VmNDDUo8P3UkZIhGWwfJwkuz6ZGZ7Et4_KBaG3gHw==","qgs","qgs-bucket"), ; private String url; private String token; private String org; private String bucket; private InfluxDBClient influxDBClient; private WriteApi writeApi; private QueryApi queryApi; InfluxClient(String url,String token,String org,String bucket){ this.url = url; this.token = token; this.org = org; this.bucket = bucket; this.influxDBClient = InfluxDBClientFactory.create(this.url, this.token.toCharArray(),this.org,this.bucket); this.writeApi = influxDBClient.makeWriteApi(); this.queryApi = influxDBClient.getQueryApi(); } public QueryApi getQueryApi() { return queryApi; } public WriteApi getWriteApi() { return writeApi; } /** * 测试连接是否正常 * * @return * true 服务正常健康 * false 异常 */ public boolean ping() { boolean isConnected = false; Boolean pong; try { pong = influxDBClient.ping(); if (pong != null) { isConnected = pong; } } catch (Exception e) { e.printStackTrace(); return false; } return isConnected; } /** * desc: 异步批量 写入数据/更新数据 * notes: 如果是更新数据,要保证time字段不能改变 * auth: caixaing * */ public void insert(Event event, String measurement){ Point point = null; if(event.getBatchNum()==null){ point = Point.measurement(measurement) .addTag("transationId", event.getTransationId()) .addTag("argName", event.getArgName()) .addField("argValue", event.getArgValue()) .time(event.getTime().toEpochMilli(), WritePrecision.MS); }else { point = Point.measurement(measurement) .addTag("transationId", event.getTransationId()) .addTag("inspectionSheetId", event.getInspectionSheetId()) .addTag("sampleNumber", event.getSampleNumber()) .addTag("batchNum", event.getBatchNum()) .addTag("argName", event.getArgName()) .addField("argValue", event.getArgValue()) .time(event.getTime().toEpochMilli(), WritePrecision.MS); } writeApi.writePoint(point); } /** * desc: 异步批量 写入数据/更新数据 * notes: 如果是更新数据,要保证time字段不能改变 * auth: caixaing * */ public void batchInsert(List events, String measurement){ List list = new ArrayList<>(); for(Event event:events){ Point point = null; if(event.getBatchNum()==null){ point = Point.measurement(measurement) .addTag("transationId", event.getTransationId()) .addTag("argName", event.getArgName()) .addField("argValue", event.getArgValue()) .time(event.getTime().toEpochMilli(), WritePrecision.MS); }else { point = Point.measurement(measurement) .addTag("transationId", event.getTransationId()) .addTag("inspectionSheetId", event.getInspectionSheetId()) .addTag("sampleNumber", event.getSampleNumber()) .addTag("batchNum", event.getBatchNum()) .addTag("argName", event.getArgName()) .addField("argValue", event.getArgValue()) .time(event.getTime().toEpochMilli(), WritePrecision.MS); } list.add(point); } writeApi.writePoints(list); } public List query(QueryDataParam param){ String measurement = param.getMeasurement(); List dropedTagNames = param.getDropedTagNames(); Range range = param.getRange(); String bucket = param.getBucket(); PageInfo pageInfo = param.getPageInfo(); String flux = "from(bucket:\""+bucket+"\")"; flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")"; flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")"; if(param.getTag()!=null){ String tagName = param.getTag().getTagName(); String tagValue = param.getTag().getTagValue(); flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; } //调整时区,查询出的结果 +8个小时 flux += "|> timeShift(duration: 8h)"; for(String dropName:dropedTagNames){ flux += "|> drop(columns: [\""+dropName+"\"])"; } flux += "|> sort(columns: [\"_time\"], desc: true)"; if(pageInfo!=null){ flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; } return queryApi.query(flux); } public List queryByGroup(QueryDataParam param){ String measurement = param.getMeasurement(); List dropedTagNames = param.getDropedTagNames(); Range range = param.getRange(); String bucket = param.getBucket(); String tagName = param.getTag().getTagName(); String tagValue = param.getTag().getTagValue(); String groupName = param.getGroupName(); PageInfo pageInfo = param.getPageInfo(); String flux = "from(bucket:\""+bucket+"\")"; flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")"; flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")"; flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; flux += "|> timeShift(duration: 8h)"; flux += "|> group(columns: [\""+groupName+"\"], mode: \"except\")"; for(String dropName:dropedTagNames){ flux += "|> drop(columns: [\""+dropName+"\"])"; } flux += "|> sort(columns: [\"_time\"], desc: true)"; if(pageInfo!=null){ flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; } return queryApi.query(flux); } public List queryGroupByTime(QueryDataGroupByTimeParam param){ String measurement = param.getMeasurement(); List dropedTagNames = param.getDropedTagNames(); Range range = param.getRange(); String bucket = param.getBucket(); String tagName = param.getTag().getTagName(); String tagValue = param.getTag().getTagValue(); PageInfo pageInfo = param.getPageInfo(); String flux = "from(bucket:\""+bucket+"\")"; flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")"; flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")"; flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; flux += "|> timeShift(duration: 8h)"; for(String dropName:dropedTagNames){ flux += "|> drop(columns: [\""+dropName+"\"])"; } //|> window(every: 1mo) if(param.getTimeType() == 1){ flux += "|> window(every: 1y)"; }else if(param.getTimeType() == 2 ){ flux += "|> window(every: 1mo)"; }else{ flux += "|> window(every: 1d)"; } flux += "|> sort(columns: [\"_time\"], desc: true)"; if(pageInfo!=null){ flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; } // List tables = queryApi.query(flux); // for (FluxTable fluxTable : tables) { // List records = fluxTable.getRecords(); // for (FluxRecord fluxRecord : records) { // System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); // // } // } return queryApi.query(flux); } }