package com.cnbm.influx.config; import com.cnbm.common.spc.util.DataUtils; import com.cnbm.influx.constant.Constant; import com.cnbm.influx.param.*; import com.cnbm.influx.template.Event; import com.influxdb.client.InfluxDBClient; import com.influxdb.client.InfluxDBClientFactory; import com.influxdb.client.QueryApi; import com.influxdb.client.WriteApi; import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; import com.influxdb.query.FluxTable; import java.util.ArrayList; import java.util.List; public enum InfluxClient { /** * influxdb 读写客户端,,如果write比较繁忙,后续可以考虑,维护 client一个线程池。 * */ //Client("http://192.168.0.171:8086","lkq32gtomiKd4nu-kwZVUBEbn1HLdigjRPfuA6p8c29KFfqRs8JcR3IDLDsKzVFA-TDHbG1W6EP3EcJKKqpDPg==","qgs","qgs-bucket"), //瑞昌-cdte spc-all-token Client("http://172.16.21.111:8086","XLTjiVj368gLL0j-hTcAS6HE17M7tY5fj3ipt1gXTjgVqdGRE5pEhpGMABzchYFZpmaQsGTUckR5Rs8Erz8nKQ==","qgs","qgs-bucket"), ; private String url; private String token; private String org; private String bucket; private InfluxDBClient influxDBClient; private WriteApi writeApi; private QueryApi queryApi; InfluxClient(String url,String token,String org,String bucket){ this.url = url; this.token = token; this.org = org; this.bucket = bucket; this.influxDBClient = InfluxDBClientFactory.create(this.url, this.token.toCharArray(),this.org,this.bucket); this.writeApi = influxDBClient.makeWriteApi(); this.queryApi = influxDBClient.getQueryApi(); } public QueryApi getQueryApi() { return queryApi; } public WriteApi getWriteApi() { return writeApi; } /** * 测试连接是否正常 * * @return * true 服务正常健康 * false 异常 */ public boolean ping() { boolean isConnected = false; Boolean pong; try { pong = influxDBClient.ping(); if (pong != null) { isConnected = pong; } } catch (Exception e) { e.printStackTrace(); return false; } return isConnected; } /** * desc: 异步批量 写入数据 / 更新数据 * notes: 如果是更新数据,要保证time字段不能改变 * auth: caixaing * */ public void insert(Event event, String measurement){ Point point = null; if(event.getSampleNumber()==null){ point = Point.measurement(measurement) .addTag("transationId", event.getTransationId()) .addTag("argName", event.getArgName()) .addField("argValue", event.getArgValue()) .time(event.getTime().toEpochMilli(), WritePrecision.MS); }else { point = Point.measurement(measurement) .addTag("transationId", event.getTransationId()) .addTag("inspectionSheetId", event.getInspectionSheetId()) .addTag("sampleNumber", event.getSampleNumber()) .addTag("argName", event.getArgName()) .addField("argValue", event.getArgValue()) .time(event.getTime().toEpochMilli(), WritePrecision.MS); } writeApi.writePoint(point); } /** * desc: 异步批量 写入数据/更新数据 * notes: 如果是更新数据,要保证time字段不能改变 * auth: caixaing * */ public void batchInsert(List events, String measurement){ List list = new ArrayList<>(); for(Event event:events){ Point point = null; if(event.getSampleNumber()==null){ point = Point.measurement(measurement) .addTag("transationId", event.getTransationId()==null ? "" : event.getTransationId()) .addTag("argName", event.getArgName()) .addField("argValue", event.getArgValue()) .time(event.getTime().toEpochMilli(), WritePrecision.MS); //.time(DataUtils.StringToInstantPlus8(event.getTime().toString()).toEpochMilli(), WritePrecision.MS); }else { point = Point.measurement(measurement) .addTag("transationId", event.getTransationId()==null ? "" : event.getTransationId()) .addTag("inspectionSheetId", event.getInspectionSheetId()==null ? "" : event.getInspectionSheetId()) // .addTag("batchNum", event.getBatchNum()) .addTag("sampleNumber", event.getSampleNumber()) //.addField("batchNum", event.getBatchNum()) .addTag("argName", event.getArgName()) .addField("argValue", event.getArgValue()) .time(event.getTime().toEpochMilli(), WritePrecision.MS); //.time(DataUtils.StringToInstantPlus8(event.getTime().toString()).toEpochMilli(), WritePrecision.MS); } list.add(point); } writeApi.writePoints(list); } public List query(QueryDataParam param){ String measurement = param.getMeasurement(); String bucket = param.getBucket(); String flux = "from(bucket:\""+bucket+"\")"; if(param.getRange() != null){ Range range = param.getRange(); flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")"; } flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")"; if(param.getTags()!=null && param.getTags().size()>0){ for(Tag tag:param.getTags()){ String tagName = tag.getTagName(); String tagValue = tag.getTagValue(); flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; } } //调整时区,查询出的结果 +8个小时 //flux += "|> timeShift(duration: 8h)"; if(param.getDropedTagNames() != null){ List dropedTagNames = param.getDropedTagNames(); for(String dropName:dropedTagNames){ flux += "|> drop(columns: [\""+dropName+"\"])"; } } flux += "|> sort(columns: [\"_time\"], desc: true)"; if(param.getPageInfo() != null){ PageInfo pageInfo = param.getPageInfo(); flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; } return queryApi.query(flux); } // public List queryByGroup(QueryDataParam param){ // String measurement = param.getMeasurement(); // List dropedTagNames = param.getDropedTagNames(); // Range range = param.getRange(); // String bucket = param.getBucket(); // // // // String groupName = param.getGroupName(); // PageInfo pageInfo = param.getPageInfo(); // // String flux = "from(bucket:\""+bucket+"\")"; // flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")"; // flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")"; // //// String tagName = param.getTag().getTagName(); //// String tagValue = param.getTag().getTagValue(); //// flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; // if(param.getTags()!=null && param.getTags().size()>0){ // for(Tag tag:param.getTags()){ // String tagName = tag.getTagName(); // String tagValue = tag.getTagValue(); // flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; // } // } // //// flux += "|> timeShift(duration: 8h)"; // flux += "|> group(columns: [\""+groupName+"\"], mode: \"except\")"; // for(String dropName:dropedTagNames){ // flux += "|> drop(columns: [\""+dropName+"\"])"; // } // flux += "|> sort(columns: [\"_time\"], desc: true)"; // if(pageInfo!=null){ // flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; // } // return queryApi.query(flux); // } public List queryGroupByTime(QueryDataGroupByTimeParam param){ String measurement = param.getMeasurement(); List dropedTagNames = param.getDropedTagNames(); Range range = param.getRange(); String bucket = param.getBucket(); PageInfo pageInfo = param.getPageInfo(); String flux = "from(bucket:\""+bucket+"\")"; flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")"; flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")"; // String tagName = param.getTag().getTagName(); // String tagValue = param.getTag().getTagValue(); // flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; // flux += "|> timeShift(duration: 8h)"; if(param.getTags()!=null && param.getTags().size()>0){ for(Tag tag:param.getTags()){ String tagName = tag.getTagName(); String tagValue = tag.getTagValue(); flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; } } for(String dropName:dropedTagNames){ flux += "|> drop(columns: [\""+dropName+"\"])"; } //|> window(every: 1mo) if(param.getTimeType() == 1){ flux += "|> window(every: 1y)"; }else if(param.getTimeType() == 2 ){ flux += "|> window(every: 1mo)"; }else{ flux += "|> window(every: 1d)"; } flux += "|> sort(columns: [\"_time\"], desc: true)"; if(pageInfo!=null){ flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; } // List tables = queryApi.query(flux); // for (FluxTable fluxTable : tables) { // List records = fluxTable.getRecords(); // for (FluxRecord fluxRecord : records) { // System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); // // } // } return queryApi.query(flux); } }