diff --git a/pom.xml b/pom.xml index 70583a6..ec9c664 100644 --- a/pom.xml +++ b/pom.xml @@ -95,6 +95,8 @@ + + org.springframework.boot @@ -182,6 +184,23 @@ spring-boot-starter-test test + + + + org.springframework.boot + spring-boot-starter-validation + + + + + com.influxdb + influxdb-client-java + 6.3.0 + + + + + diff --git a/src/main/java/com/qgs/dc/influx/Main.java b/src/main/java/com/qgs/dc/influx/Main.java new file mode 100644 index 0000000..224e569 --- /dev/null +++ b/src/main/java/com/qgs/dc/influx/Main.java @@ -0,0 +1,210 @@ +package com.qgs.dc.influx; + + +import com.influxdb.client.*; +import com.influxdb.client.domain.WritePrecision; +import com.influxdb.client.write.Point; +import com.influxdb.query.FluxRecord; +import com.influxdb.query.FluxTable; +import com.qgs.dc.influx.param.PageInfo; +import com.qgs.dc.influx.param.QueryDataParam; +import com.qgs.dc.influx.param.Range; + +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.util.*; + +/** + * @Desc: "" + * @Author: caixiang + * @DATE: 2022/6/25 11:19 + */ +public class Main { + public static void main(String[] args) throws InterruptedException { + char[] token = "lkBsC27QZr1W50BSPlGxpTqNNpwuUk5uz1dZZRPSPbCG5VmNDDUo8P3UkZIhGWwfJwkuz6ZGZ7Et4_KBaG3gHw==".toCharArray(); + String org = "qgs"; + String bucket = "mytest"; + InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://192.168.0.170:8086", token, org, bucket); + + + WriteApi writeApi = influxDBClient.makeWriteApi(); + +// InfluxService influxService = new InfluxService(); +// Event event = new Event(); +// event.time = Instant.now(); +// event.transationId = "asas"; +// event.argName = "arg5"; +// event.argValue = new Double(11); +// influxService.insert(event); + +// Event event = new Event(); +// event.setTime(Instant.now()); +// event.setTransationId("asasd11"); +// event.setArgName("argName11"); +// event.setArgValue(3d); +// InfluxClient.Client.insert(event,"ASProcessCompleteEvent"); + + + Point point = Point.measurement("ASProcessCompleteEvent") + .addTag("transationId", "112311") + .addTag("argName", "argName11") + .addField("argValue", 3D) + .time(Instant.now().toEpochMilli(), WritePrecision.MS); + + + Point point2 = Point.measurement("ASProcessCompleteEvent") + .addTag("transationId", "222312") + .addTag("argName", "argName11") + .addField("argValue", 4D) + .time(Instant.now().toEpochMilli(), WritePrecision.MS); + List list = new ArrayList<>(); + list.add(point); + list.add(point2); + + writeApi.writePoints(list); + + //todo api.writeMeasurements(WritePrecision.NS, Arrays.asList(new H2OFeetMeasurement("coyote_creek", 15.0D, null, Instant.ofEpochSecond(0, 15)), new H2OFeetMeasurement("coyote_creek", 16.0D, null, Instant.ofEpochSecond(0, 16)))); +// List events = new ArrayList<>(); +// for(int i=0;i<99;i++){ +// +// Event event = new Event(); +// event.time = Instant.now(); +// event.transationId = "asas"+i; +// event.argName = "arg7"; +// event.argValue = new Double(i); +// events.add(event); +// } +// List qList = new ArrayList<>(); +// Event event = new Event(); +// event.time = Instant.now(); +// event.transationId = "asas"; +// event.argName = "arg7"; +// event.argValue = new Double(1); +// Thread.sleep(100); +// Event event2 = new Event(); +// event2.time = Instant.now(); +// event2.transationId = "asas"; +// event2.argName = "arg7"; +// event2.argValue = new Double(2); +// qList.add(event); +// qList.add(event2); +// writeApi.writeMeasurement( WritePrecision.NS, qList); + +// for(int i=0;i<10;i++){ +// Temperature temperature = new Temperature(); +// temperature.location = "south"; +// temperature.value = new Double(i); +// temperature.type = "equipment3"; +// temperature.time = Instant.now(); +// +// writeApi.writeMeasurement( WritePrecision.NS, temperature); +// } + + +// String flux = "from(bucket:\"mytest\") |> range(start: -60m)"; +// flux += "|> filter(fn: (r) =>\n" + +// " r._measurement == \"ASProcessCompleteEvent\" and \n" + +//// " r._field == \"type\" and \n" + //对应 Field key +// " r.argName == \"arg3\"\n" + //对应 Tags key (Tag 信息无法在FluxRecord 里面获取。) +// " )"; +// QueryApi queryApi = influxDBClient.getQueryApi(); +// +// List tables = queryApi.query(flux); +// for (FluxTable fluxTable : tables) { +// List records = fluxTable.getRecords(); +// for (FluxRecord fluxRecord : records) { +// System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); +//// System.out.println("time: "+fluxRecord.getTime() +" key:"++" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); +// +// } +// } + + +// from(bucket: "mytest") +// |> range(start: 2022-06-29T11:30:00Z, stop: 2022-06-29T12:30:00Z) +// |> filter(fn: (r) => r["_measurement"] == "ASProcessCompleteEvent") +// |> filter(fn: (r) => r["argName"] == "arg4") +// |> drop(columns: ["transationId"]) +// |> sort(columns: ["_time"], desc: true) +// 取前10条数据 +// |> limit(n: 10, offset: 0) + +// 取 10-20 条数据 +// |> limit(n: 10, offset: 10) + +// 取 20-30 条数据 +// |> limit(n: 10, offset: 20) + + + +// QueryDataParam queryDataParam = new QueryDataParam(); +// queryDataParam.setBucket("mytest"); +// queryDataParam.setRange(new Range(getDate().toInstant(),new Date().toInstant())); +// queryDataParam.setMeasurement("ASProcessCompleteEvent"); +// queryDataParam.setTag(new Tag("argName","arg4")); +// queryDataParam.setDropedTagName("transationId"); +// queryDataParam.setPageInfo(new PageInfo(1,100)); +// +// List tables = query(queryDataParam,influxDBClient); +// List records1 = tables.get(0).getRecords(); +// List> lists = Utils.fixedGroup(records1, 10); + +// for (FluxTable fluxTable : tables) { +// List records = fluxTable.getRecords(); +// for (FluxRecord fluxRecord : records) { +// System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); +// +// } +// } + + influxDBClient.close(); + } + + + public static Date getDate(){ + Date date = new Date();//获取当前日期 + SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");//格式化一下 + Calendar calendar1 = Calendar.getInstance();//获取对日期操作的类对象 + //两种写法都可以获取到前三天的日期 + // calendar1.set(Calendar.DAY_OF_YEAR,calendar1.get(Calendar.DAY_OF_YEAR) -3); + //在当前时间的基础上获取前三天的日期 + calendar1.add(Calendar.DATE, -3); + //add方法 参数也可传入 月份,获取的是前几月或后几月的日期 + //calendar1.add(Calendar.MONTH, -3); + Date day = calendar1.getTime(); + return day; + } + + private static List query(QueryDataParam param, InfluxDBClient influxDBClient){ + String measurement = param.getMeasurement(); + String dropedTagName = param.getDropedTagName(); + Range range = param.getRange(); + String bucket = param.getBucket(); + String tagName = param.getTag().getTagName(); + String tagValue = param.getTag().getTagValue(); + PageInfo pageInfo = param.getPageInfo(); + + String flux = "from(bucket:\""+bucket+"\")"; + flux += "|> range(start: "+range.getBegin().toString()+",stop:"+range.getEnd().toString()+") \n"; + flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\") \n"; + flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\") \n"; + flux += "|> drop(columns: [\""+dropedTagName+"\"]) \n"; + flux += "|> sort(columns: [\"_time\"], desc: true) \n"; + if(pageInfo!=null){ + flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; + } + + QueryApi queryApi = influxDBClient.getQueryApi(); + + List tables = queryApi.query(flux); + for (FluxTable fluxTable : tables) { + List records = fluxTable.getRecords(); + for (FluxRecord fluxRecord : records) { + System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); + + } + } + + return tables; + } +} diff --git a/src/main/java/com/qgs/dc/influx/common/Utils.java b/src/main/java/com/qgs/dc/influx/common/Utils.java new file mode 100644 index 0000000..abcbab8 --- /dev/null +++ b/src/main/java/com/qgs/dc/influx/common/Utils.java @@ -0,0 +1,69 @@ +package com.qgs.dc.influx.common; + +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.List; + +/** + * @Desc: "" + * @Author: caixiang + * @DATE: 2022/6/29 16:23 + */ +public class Utils { + public static void main(String[] args) { + ArrayList arrs = new ArrayList<>(); + + for(int i=0;i<100;i++){ + arrs.add(i); + } + List> lists = fixedGroup(arrs, 10); + System.out.println(); + + } + + + public static Date getBeforeDate(Integer number){ + Date date = new Date();//获取当前日期 + SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");//格式化一下 + Calendar calendar1 = Calendar.getInstance();//获取对日期操作的类对象 + //两种写法都可以获取到前三天的日期 + // calendar1.set(Calendar.DAY_OF_YEAR,calendar1.get(Calendar.DAY_OF_YEAR) -3); + //在当前时间的基础上获取前三天的日期 + calendar1.add(Calendar.DATE, 0-number); + //add方法 参数也可传入 月份,获取的是前几月或后几月的日期 + //calendar1.add(Calendar.MONTH, -3); + Date day = calendar1.getTime(); + return day; + } + + /** + * 将一组数据固定分组,每组n个元素 + * + * @param source 要分组的数据源 + * @param limit 每组n个元素 + * @param + * @return + */ + public static List> fixedGroup(List source, int limit) { + if (null == source || source.size() == 0 || limit <= 0) + return null; + List> result = new ArrayList>(); + int remainder = source.size() % limit; + int size = (source.size() / limit); + for (int i = 0; i < size; i++) { + List subset = null; + subset = source.subList(i * limit, (i + 1) * limit); + result.add(subset); + } + if (remainder > 0) { + List subset = null; + subset = source.subList(size * limit, size * limit + remainder); + result.add(subset); + } + return result; + } + + +} diff --git a/src/main/java/com/qgs/dc/influx/config/InfluxClient.java b/src/main/java/com/qgs/dc/influx/config/InfluxClient.java new file mode 100644 index 0000000..f437875 --- /dev/null +++ b/src/main/java/com/qgs/dc/influx/config/InfluxClient.java @@ -0,0 +1,133 @@ +package com.qgs.dc.influx.config; + + +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.InfluxDBClientFactory; +import com.influxdb.client.QueryApi; +import com.influxdb.client.WriteApi; +import com.influxdb.client.domain.WritePrecision; +import com.influxdb.client.write.Point; +import com.influxdb.query.FluxTable; +import com.qgs.dc.influx.param.PageInfo; +import com.qgs.dc.influx.param.QueryDataParam; +import com.qgs.dc.influx.param.Range; +import com.qgs.dc.influx.template.Event; + +import java.util.ArrayList; +import java.util.List; + +public enum InfluxClient { + + /** + * influxdb 读写客户端,,如果write比较繁忙,后续可以考虑,维护 client一个线程池。 + * */ + Client("http://192.168.0.170:8086","lkBsC27QZr1W50BSPlGxpTqNNpwuUk5uz1dZZRPSPbCG5VmNDDUo8P3UkZIhGWwfJwkuz6ZGZ7Et4_KBaG3gHw==","qgs","qgs-bucket"), + + ; + private String url; + private String token; + private String org; + private String bucket; + + private InfluxDBClient influxDBClient; + private WriteApi writeApi; + + private QueryApi queryApi; + + InfluxClient(String url,String token,String org,String bucket){ + this.url = url; + this.token = token; + this.org = org; + this.bucket = bucket; + this.influxDBClient = InfluxDBClientFactory.create(this.url, this.token.toCharArray(),this.org,this.bucket); + this.writeApi = influxDBClient.makeWriteApi(); + this.queryApi = influxDBClient.getQueryApi(); + } + + + public QueryApi getQueryApi() { + return queryApi; + } + + public WriteApi getWriteApi() { + return writeApi; + } + + /** + * 测试连接是否正常 + * + * @return + * true 服务正常健康 + * false 异常 + */ + private boolean ping() { + boolean isConnected = false; + Boolean pong; + try { + pong = influxDBClient.ping(); + if (pong != null) { + isConnected = true; + } + } catch (Exception e) { + e.printStackTrace(); + } + return isConnected; + } + + public void insert(Event event, String measurement){ + Point point = Point.measurement(measurement) + .addTag("transationId", event.getTransationId()) + .addTag("argName", event.getArgName()) + .addField("argValue", event.getArgValue()) + .time(event.getTime().toEpochMilli(), WritePrecision.MS); + writeApi.writePoint(point); + + } + + //异步 批量写入数据 + //如果要批量插入的话,一次也只能写入 + public void batchInsert(List events, String measurement){ + List list = new ArrayList<>(); + for(Event event:events){ + Point point = Point.measurement(measurement) + .addTag("transationId", event.getTransationId()) + .addTag("argName", event.getArgName()) + .addField("argValue", event.getArgValue()) + .time(event.getTime().toEpochMilli(), WritePrecision.MS); + list.add(point); + } + writeApi.writePoints(list); + } + + + public List query(QueryDataParam param){ + String measurement = param.getMeasurement(); + String dropedTagName = param.getDropedTagName(); + Range range = param.getRange(); + String bucket = param.getBucket(); + String tagName = param.getTag().getTagName(); + String tagValue = param.getTag().getTagValue(); + PageInfo pageInfo = param.getPageInfo(); + + String flux = "from(bucket:\""+bucket+"\")"; + flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")"; + flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")"; + flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; + flux += "|> drop(columns: [\""+dropedTagName+"\"])"; + flux += "|> sort(columns: [\"_time\"], desc: true)"; + if(pageInfo!=null){ + flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; + } + + +// List tables = queryApi.query(flux); +// for (FluxTable fluxTable : tables) { +// List records = fluxTable.getRecords(); +// for (FluxRecord fluxRecord : records) { +// System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); +// +// } +// } + return queryApi.query(flux); + } +} diff --git a/src/main/java/com/qgs/dc/influx/constant/Constant.java b/src/main/java/com/qgs/dc/influx/constant/Constant.java new file mode 100644 index 0000000..8ac20f8 --- /dev/null +++ b/src/main/java/com/qgs/dc/influx/constant/Constant.java @@ -0,0 +1,23 @@ +package com.qgs.dc.influx.constant; + +import com.influxdb.LogLevel; + +/** + * @Desc: "" + * @Author: caixiang + * @DATE: 2022/7/1 9:18 + */ +public class Constant { + public static final String url = "http://192.168.0.170:8086"; + public static final String token = "lkBsC27QZr1W50BSPlGxpTqNNpwuUk5uz1dZZRPSPbCG5VmNDDUo8P3UkZIhGWwfJwkuz6ZGZ7Et4_KBaG3gHw=="; + public static final String org = "qgs"; + public static final String bucket = "qgs-bucket"; + public static final String username = "caixiang"; + public static final String password = "25112856"; + public static final LogLevel logLevel = LogLevel.BODY; + public static final LogLevel readTimeout = LogLevel.BODY; + public static final LogLevel writeTimeout = LogLevel.BODY; + public static final LogLevel connectTimeout = LogLevel.BODY; + + +} diff --git a/src/main/java/com/qgs/dc/influx/controller/InfluxDemoController.java b/src/main/java/com/qgs/dc/influx/controller/InfluxDemoController.java new file mode 100644 index 0000000..e86441e --- /dev/null +++ b/src/main/java/com/qgs/dc/influx/controller/InfluxDemoController.java @@ -0,0 +1,74 @@ +package com.qgs.dc.influx.controller; + +import com.influxdb.client.domain.WritePrecision; +import com.influxdb.client.write.Point; +import com.qgs.dc.influx.config.InfluxClient; +import com.qgs.dc.influx.template.Event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import java.time.Instant; + +@RestController +@RequestMapping("/influx") +public class InfluxDemoController { + private static final Logger logger = LoggerFactory.getLogger(InfluxDemoController.class); + + + +// try (WriteApi writeApi = influxDBClient.makeWriteApi()) { +// Temperature temperature = new Temperature(); +// temperature.setLocation("east"); +// temperature.setValue(106.2D); +// temperature.setTime(Instant.now()); +// writeApi.writeMeasurement(WritePrecision.NS,temperature); +// } +// +// try (WriteApi writeApi = influxDBClient.makeWriteApi()) { +// Point point = Point.measurement("temperature") +// .addTag("location","south") +// .addTag("owner","wxm") +// .addField("wxm",230.8); +// writeApi.writePoint(point); +// } + + + @PostMapping("/insertBatch") + public void insertBatch() throws InterruptedException { +// List list = new ArrayList<>(); +// +// for(int i=0;i<99;i++){ +// //Thread.sleep(1000); +// Event event = new Event(); +// event.time = Instant.now(); +// event.transationId = "asas"+i; +// event.argName = "arg5"; +// event.argValue = new Double(i); +// list.add(event); +// } +// influxService.batchInsert(list); + } + + + public Point insert(Event event, String measurement){ + Point point = Point.measurement(measurement) + .addTag("transationId", event.getTransationId()) + .addTag("argName", event.getArgName()) + .addField("argValue", event.getArgValue()) + .time(event.getTime().toEpochMilli(), WritePrecision.MS); + return point; + + } + @PostMapping("/insert") + public void insert() throws InterruptedException { + Event event = new Event(); + event.setTime(Instant.now()); + event.setTransationId("asasd11"); + event.setArgName("argName11"); + event.setArgValue(99765d); + Point asProcessCompleteEvent = insert(event, "ASProcessCompleteEvent"); + InfluxClient.Client.insert(event,"ASProcessCompleteEvent"); + } +} diff --git a/src/main/java/com/qgs/dc/influx/entity/Header.java b/src/main/java/com/qgs/dc/influx/entity/Header.java new file mode 100644 index 0000000..c5af061 --- /dev/null +++ b/src/main/java/com/qgs/dc/influx/entity/Header.java @@ -0,0 +1,30 @@ +package com.qgs.dc.influx.entity; + +import lombok.Data; + +/** + * @Desc: "" + * @Author: caixiang + * @DATE: 2022/6/27 10:46 + */ +@Data +public class Header { + + private String transationId; + + private String messageName; + + private String messageType; + + private String fromWhere; + + private String toWhere; + + private String equipmentId; + + private String sendTimestamp; + + private String argName; + + private Double argValue; +} diff --git a/src/main/java/com/qgs/dc/influx/param/BaseParam.java b/src/main/java/com/qgs/dc/influx/param/BaseParam.java new file mode 100644 index 0000000..3452566 --- /dev/null +++ b/src/main/java/com/qgs/dc/influx/param/BaseParam.java @@ -0,0 +1,24 @@ +package com.qgs.dc.influx.param; + +import lombok.Data; + +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; +import java.io.Serializable; + +/** + * @Desc: "" + * @Author: caixiang + * @DATE: 2022/6/29 10:18 + */ +@Data +public class BaseParam implements Serializable { + //page 信息可选 + private PageInfo pageInfo; + + @NotEmpty(message = "measurement 不能为空") + private String measurement; + + @NotNull(message = "查询时间段不能为空") + private Range range; +} diff --git a/src/main/java/com/qgs/dc/influx/param/PageInfo.java b/src/main/java/com/qgs/dc/influx/param/PageInfo.java new file mode 100644 index 0000000..4eaf9bd --- /dev/null +++ b/src/main/java/com/qgs/dc/influx/param/PageInfo.java @@ -0,0 +1,24 @@ +package com.qgs.dc.influx.param; + +import lombok.Data; +import org.hibernate.validator.constraints.Range; + +/** + * @Desc: "" + * @Author: caixiang + * @DATE: 2022/6/29 10:19 + */ +@Data +public class PageInfo { + @Range(min = 1, message = "页码必须大于等于1") + private Integer current; + +// @NotNull(message = "每页显示条数不能为空") + @Range(min = 1, max = 1000, message = "每页显示条数范围需在1-1000之间") + private Integer size; + + public PageInfo(Integer current,Integer size){ + this.current = current; + this.size = size; + } +} diff --git a/src/main/java/com/qgs/dc/influx/param/QueryDataParam.java b/src/main/java/com/qgs/dc/influx/param/QueryDataParam.java new file mode 100644 index 0000000..604cc93 --- /dev/null +++ b/src/main/java/com/qgs/dc/influx/param/QueryDataParam.java @@ -0,0 +1,31 @@ +package com.qgs.dc.influx.param; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.experimental.Accessors; + +/** + * @Desc: "influx 查询条件构造" + * @Author: caixiang + * @DATE: 2022/6/29 10:17 + * + * 注意: + * 必填 + * ① measurement 不能为空 + * ② 时间段 不能为空 + * ③ bucket 不能为空 + * 非必填 + * ① 分页信息可选 + * ② tag + * + */ +@Data +@EqualsAndHashCode(callSuper = false) +@Accessors(chain = true) +public class QueryDataParam extends BaseParam{ + private Tag tag; + //查询的时候,需要忽略的字段。(transationId是唯一标识会对 最终的查询结果集产生影响) + private String dropedTagName; + private String bucket; + +} diff --git a/src/main/java/com/qgs/dc/influx/param/Range.java b/src/main/java/com/qgs/dc/influx/param/Range.java new file mode 100644 index 0000000..12fc4f8 --- /dev/null +++ b/src/main/java/com/qgs/dc/influx/param/Range.java @@ -0,0 +1,30 @@ +package com.qgs.dc.influx.param; + +import lombok.Data; + +import javax.validation.constraints.NotNull; +import java.time.Instant; + +/** + * @Desc: "influxdb查询 时间范围" + * @Author: caixiang + * @DATE: 2022/6/29 11:14 + */ +@Data +public class Range { + @NotNull(message = "起始时间不能为空") + private Instant begin; + + @NotNull(message = "终点时间不能为空") + private Instant end; + + public Range(Instant begin,Instant end){ + this.begin = begin; + this.end = end; + } + +// public static void main(String[] args) { +// Date date = new Date(); +// System.out.println(date.toInstant().toString()); +// } +} diff --git a/src/main/java/com/qgs/dc/influx/param/Tag.java b/src/main/java/com/qgs/dc/influx/param/Tag.java new file mode 100644 index 0000000..003f87b --- /dev/null +++ b/src/main/java/com/qgs/dc/influx/param/Tag.java @@ -0,0 +1,19 @@ +package com.qgs.dc.influx.param; + +import lombok.Data; + +/** + * @Desc: "" + * @Author: caixiang + * @DATE: 2022/6/29 14:38 + */ +@Data +public class Tag { + private String tagName; + private String tagValue; + + public Tag(String tagName,String tagValue){ + this.tagName = tagName; + this.tagValue = tagValue; + } +} diff --git a/src/main/java/com/qgs/dc/influx/template/Event.java b/src/main/java/com/qgs/dc/influx/template/Event.java new file mode 100644 index 0000000..f29fde2 --- /dev/null +++ b/src/main/java/com/qgs/dc/influx/template/Event.java @@ -0,0 +1,23 @@ +package com.qgs.dc.influx.template; + +import com.influxdb.annotations.Column; +import com.influxdb.annotations.Measurement; +import lombok.Data; + +import java.time.Instant; + +/** + * @Desc: "" + * @Author: caixiang + * @DATE: 2022/6/25 11:13 + */ +@Data +public class Event { + private Instant time; + + private String transationId; + + private String argName; + + private Double argValue; +} diff --git a/src/main/java/com/qgs/dc/influx/template/Temperature.java b/src/main/java/com/qgs/dc/influx/template/Temperature.java new file mode 100644 index 0000000..cb90a73 --- /dev/null +++ b/src/main/java/com/qgs/dc/influx/template/Temperature.java @@ -0,0 +1,28 @@ +package com.qgs.dc.influx.template; + +import com.influxdb.annotations.Column; +import com.influxdb.annotations.Measurement; + +import java.time.Instant; + +/** + * @Desc: "" + * @Author: caixiang + * @DATE: 2022/6/22 15:52 + */ +//Temperature.java +@Measurement(name = "temperature") +public class Temperature { + + @Column(tag = true) + public String location; + + @Column + public Double value; + + @Column + public String type; + + @Column(timestamp = true) + public Instant time; +} diff --git a/src/main/java/com/qgs/dc/s7/controller/S7DemoController.java b/src/main/java/com/qgs/dc/s7/controller/S7DemoController.java index 2818380..511c790 100644 --- a/src/main/java/com/qgs/dc/s7/controller/S7DemoController.java +++ b/src/main/java/com/qgs/dc/s7/controller/S7DemoController.java @@ -1,18 +1,26 @@ package com.qgs.dc.s7.controller; +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.domain.WritePrecision; +import com.influxdb.client.write.Point; import com.qgs.dc.opcua.controller.R; +import com.qgs.dc.s7.entity.AGVInfoCallBack; + + import com.qgs.dc.s7.my.s7connector.enmuc.PlcVarActual; import com.qgs.dc.s7.my.s7connector.enmuc.S7Client; import com.qgs.dc.s7.my.s7connector.type.PlcVar; -import org.apache.plc4x.java.api.exceptions.PlcConnectionException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import java.io.UnsupportedEncodingException; import java.text.ParseException; +import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.Random; @@ -22,18 +30,42 @@ import java.util.Random; public class S7DemoController { private static final Logger logger = LoggerFactory.getLogger(S7DemoController.class); +// @Autowired +// InfluxDBClient influxDBClient; +// +// @PostMapping("/insert") +// public void insert() throws InterruptedException { +// Event event = new Event(); +// event.setTime(Instant.now()); +// event.setTransationId("asasd11"); +// event.setArgName("argName11"); +// event.setArgValue(7d); +// Point asProcessCompleteEvent = insert(event, "ASProcessCompleteEventSSS"); +// influxDBClient.makeWriteApi().writePoint(asProcessCompleteEvent); +// } + @PostMapping(value = "testFor") + private void testFor(@RequestBody AGVInfoCallBack agvInfoCallBack) { + + System.out.println(agvInfoCallBack.toString()); + } //demo1 @PostMapping("/testReadAll") public R testReadAll() throws UnsupportedEncodingException, ParseException { for(PlcVarActual actual:PlcVarActual.values()){ System.out.println(read(S7Client.S7_1500,actual)); - } return R.ok(); } - +// public Point insert(Event event, String measurement){ +// Point point = Point.measurement(measurement) +// .addTag("transationId", event.getTransationId()) +// .addTag("argName", event.getArgName()) +// .addField("argValue", event.getArgValue()) +// .time(event.getTime().toEpochMilli(), WritePrecision.MS); +// return point; +// } //demo2 @PostMapping("/readTest") public R getTestForS7() throws UnsupportedEncodingException, ParseException { @@ -42,11 +74,13 @@ public class S7DemoController { List characters = (List)read(S7Client.S7_1500,PlcVarActual.CharArrays); List booleans = (List)read(S7Client.S7_1500,PlcVarActual.BooleanArrays); + String stri = (String)read(S7Client.S7_1500,PlcVarActual.STRING1); return R.ok().put("res",heartBeat).put("characters",characters).put("ddtl",ddtl).put("bools",booleans).put("str",stri); } + @PostMapping("/readTest2") public R getTest2ForS7() throws UnsupportedEncodingException, ParseException { // List characters = (List)read(PlcVarActual.CharArrays,S7Client.S7_1500); @@ -99,7 +133,7 @@ public class S7DemoController { long l = System.currentTimeMillis(); String[] subs = (String[])read(S7Client.S7_1200,PlcVarActual.SubIdArrays1200); //65ms long l1 = System.currentTimeMillis(); - System.out.println(Arrays.toString(subs)); + //System.out.println(Arrays.toString(subs)); String[] toWrite = new String[63]; for(int i=0;i<63;i++){ @@ -111,8 +145,8 @@ public class S7DemoController { write(S7Client.S7_1200,PlcVarActual.SubIdArrays1200,toWrite); long c2 = System.currentTimeMillis(); - String[] subs2 = (String[])read(S7Client.S7_1200,PlcVarActual.SubIdArrays1200); + logger.info("正常测试: l:"+(l1-l)+"c:"+(c2-c1)+";;read1:"+Arrays.toString(subs)+";;read2:"+Arrays.toString(subs2)); return R.ok().put("l",(l1-l)).put("c",(c2-c1)); } @@ -160,8 +194,19 @@ public class S7DemoController { } //PlcVarActual 到时候改成你们的 xxxPlcToWcs 或者 xxxWcsToPlc + /** + * return + * 成功: 返回相应的object对象 + * 失败: 返回null + * */ private Object read(S7Client s7Client,PlcVarActual var) throws UnsupportedEncodingException, ParseException { - return s7Client.read(var.getArea(), var.getAreaNumber(), var.getByteOffset(), var.getBitOffset(), var.getLength(), var.getStrSize(), var.getType()); + try { + return s7Client.read(var.getArea(), var.getAreaNumber(), var.getByteOffset(), var.getBitOffset(), var.getLength(), var.getStrSize(), var.getType()); + }catch (Exception e){ + logger.error("host:"+s7Client.getHost()+" ; read 操作出现问题: "+e.getMessage()); + e.printStackTrace(); + return null; + } } private void write(S7Client s7Client,PlcVarActual var,Object newValue) { if(var.getType().equals(PlcVar.STRING_Array)){ @@ -179,25 +224,25 @@ public class S7DemoController { } } - //demo3 - @PostMapping("/writeTest") - public R writeTest() throws PlcConnectionException, UnsupportedEncodingException { - write(S7Client.S7_1500,PlcVarActual.HeartBeat, false); - - char[] charArrays_content = new char[2]; - charArrays_content[0] = '1'; - charArrays_content[1] = 'c'; - write( S7Client.S7_1500,PlcVarActual.CharArrays, charArrays_content); - - boolean[] boolArrays_content = new boolean[2]; - boolArrays_content[0] = true; - boolArrays_content[1] = false; - write(S7Client.S7_1500,PlcVarActual.BooleanArrays, boolArrays_content); - - String str = "你好啊aa"; - //todo string 的读写有问题 待会看看 - write(S7Client.S7_1500,PlcVarActual.STRING1, str); - return R.ok().put("res",true); - } +// //demo3 +// @PostMapping("/writeTest") +// public R writeTest() throws PlcConnectionException, UnsupportedEncodingException { +// write(S7Client.S7_1500,PlcVarActual.HeartBeat, false); +// +// char[] charArrays_content = new char[2]; +// charArrays_content[0] = '1'; +// charArrays_content[1] = 'c'; +// write( S7Client.S7_1500,PlcVarActual.CharArrays, charArrays_content); +// +// boolean[] boolArrays_content = new boolean[2]; +// boolArrays_content[0] = true; +// boolArrays_content[1] = false; +// write(S7Client.S7_1500,PlcVarActual.BooleanArrays, boolArrays_content); +// +// String str = "你好啊aa"; +// //todo string 的读写有问题 待会看看 +// write(S7Client.S7_1500,PlcVarActual.STRING1, str); +// return R.ok().put("res",true); +// } } diff --git a/src/main/java/com/qgs/dc/s7/entity/AGVInfoCallBack.java b/src/main/java/com/qgs/dc/s7/entity/AGVInfoCallBack.java new file mode 100644 index 0000000..f000e95 --- /dev/null +++ b/src/main/java/com/qgs/dc/s7/entity/AGVInfoCallBack.java @@ -0,0 +1,24 @@ +package com.qgs.dc.s7.entity; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.io.Serializable; + +/** + *

+ * 表 + *

+ * + * @author caixiang + * @since 2022-05-31 + */ +@Data +@EqualsAndHashCode(callSuper = false) +public class AGVInfoCallBack implements Serializable { + + private static final long serialVersionUID = 1L; + private String warehouseName; + private Integer status; + private String content; +} diff --git a/src/main/java/com/qgs/dc/s7/enums/S7DriveManage.java b/src/main/java/com/qgs/dc/s7/enums/S7DriveManage.java deleted file mode 100644 index c97d1fd..0000000 --- a/src/main/java/com/qgs/dc/s7/enums/S7DriveManage.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.qgs.dc.s7.enums; - -import org.apache.plc4x.java.PlcDriverManager; -import org.apache.plc4x.java.api.PlcConnection; -import org.apache.plc4x.java.api.exceptions.PlcConnectionException; -import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager; - -/** - * @Author: 蔡翔 - * @Date: 2019/12/5 14:53 - * @Version 1.0 - */ -public enum S7DriveManage { - INSTANCE("driverManagerPool", new PooledPlcDriverManager()) - ; - PlcDriverManager driverManager; - private String desc; - - S7DriveManage(String desc, PooledPlcDriverManager object){ - this.driverManager = object; - this.desc = desc; - } - - public PlcDriverManager getInstance(){ - return this.driverManager; - } - -} diff --git a/src/main/java/com/qgs/dc/s7/my/s7connector/enmuc/S7Client.java b/src/main/java/com/qgs/dc/s7/my/s7connector/enmuc/S7Client.java index 4351acd..79a96e7 100644 --- a/src/main/java/com/qgs/dc/s7/my/s7connector/enmuc/S7Client.java +++ b/src/main/java/com/qgs/dc/s7/my/s7connector/enmuc/S7Client.java @@ -7,10 +7,12 @@ import com.qgs.dc.s7.my.s7connector.api.utils.ByteUtils; import com.qgs.dc.s7.my.s7connector.type.PlcVar; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.ResponseEntity; import java.io.UnsupportedEncodingException; import java.text.ParseException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -23,9 +25,9 @@ import java.util.concurrent.ScheduledExecutorService; public enum S7Client { //TODO 步骤1 这里是配置多PLC 的,,,有多个plc 就在这里配置一个枚举类 //1500 西门子200smart、1200、1500默认的 机架号=0 槽位号=1; 300/400 默认的 机架-0 插槽-2 - S7_1500("192.168.0.51",0,1,3,PlcVarActual.HeartBeat), + S7_1500("192.168.0.51",0,1,2,PlcVarActual.HeartBeat), //1500 机架-0 插槽-1 - S7_1200("192.168.0.52",0,1,3,PlcVarActual.HeartBeatFor1200) + S7_1200("192.168.0.52",0,1,2,PlcVarActual.HeartBeatFor1200) //后续 在这里扩展 多PLC应用。 @@ -70,6 +72,10 @@ public enum S7Client { check_ping(); } + public String getHost(){ + return this.host; + } + /** * PlcVar(byte[]) 转 java对象 对照表 * 单体变量 @@ -159,6 +165,8 @@ public enum S7Client { } } + + /** * * eg : @@ -317,6 +325,7 @@ public enum S7Client { heartBeat.getByteOffset(), heartBeat.getBitOffset(), heartBeat.getType().getTransportSize()); + System.out.println("host:"+host +" ;; "+connector.hashCode()+" : ping"); Thread.sleep(100); }catch (Exception e){ diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0352f75..b374784 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -8,8 +8,8 @@ server: spring: rabbitmq: # 如果是rabbitmq+haproxy+keepalived集群 ,,那么192.168.0.176是haproxy代理的地址(严格来说是keepalived的vip) - #addresses: 192.168.0.183:5672 ## 新版rabbitmq 版本还未测试 - addresses: 192.168.0.176:5672 + addresses: 192.168.0.176:5672 ## 新版rabbitmq 版本还未测试 + #addresses: 172.16.21.133:5672 username: cdte password: cdte virtual-host: cdte @@ -32,4 +32,21 @@ spring: #initial-interval: 2000 #重试间隔时间(单位毫秒) #max-interval: 10000 # 重试最大间隔时间 #multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间 - #================重试机制 结束 \ No newline at end of file + #================重试机制 结束 + +#influx: +# influxUrl: 'http://192.168.0.170:8086' +# bucket: 'qgs-bucket' +# org: 'qgs' +# token: 'lkBsC27QZr1W50BSPlGxpTqNNpwuUk5uz1dZZRPSPbCG5VmNDDUo8P3UkZIhGWwfJwkuz6ZGZ7Et4_KBaG3gHw==' +influx: + influxUrl: 'http://192.168.0.170:8086' + bucket: 'qgs-bucket' + org: 'qgs' + token: 'lkBsC27QZr1W50BSPlGxpTqNNpwuUk5uz1dZZRPSPbCG5VmNDDUo8P3UkZIhGWwfJwkuz6ZGZ7Et4_KBaG3gHw==' + +# /health point +#management: +# health: +# influxdb: +# enabled: true