@@ -95,6 +95,8 @@ | |||
</dependency> | |||
<!-- hutool 结束 --> | |||
<!-- rabbitmq 依赖 开始 --> | |||
<dependency> | |||
<groupId>org.springframework.boot</groupId> | |||
@@ -182,6 +184,23 @@ | |||
<artifactId>spring-boot-starter-test</artifactId> | |||
<scope>test</scope> | |||
</dependency> | |||
<dependency> | |||
<groupId>org.springframework.boot</groupId> | |||
<artifactId>spring-boot-starter-validation</artifactId> | |||
</dependency> | |||
<!-- influx begin --> | |||
<dependency> | |||
<groupId>com.influxdb</groupId> | |||
<artifactId>influxdb-client-java</artifactId> | |||
<version>6.3.0</version> | |||
</dependency> | |||
<!-- influx end --> | |||
</dependencies> | |||
<build> | |||
@@ -0,0 +1,210 @@ | |||
package com.qgs.dc.influx; | |||
import com.influxdb.client.*; | |||
import com.influxdb.client.domain.WritePrecision; | |||
import com.influxdb.client.write.Point; | |||
import com.influxdb.query.FluxRecord; | |||
import com.influxdb.query.FluxTable; | |||
import com.qgs.dc.influx.param.PageInfo; | |||
import com.qgs.dc.influx.param.QueryDataParam; | |||
import com.qgs.dc.influx.param.Range; | |||
import java.text.SimpleDateFormat; | |||
import java.time.Instant; | |||
import java.util.*; | |||
/** | |||
* @Desc: "" | |||
* @Author: caixiang | |||
* @DATE: 2022/6/25 11:19 | |||
*/ | |||
public class Main { | |||
public static void main(String[] args) throws InterruptedException { | |||
char[] token = "lkBsC27QZr1W50BSPlGxpTqNNpwuUk5uz1dZZRPSPbCG5VmNDDUo8P3UkZIhGWwfJwkuz6ZGZ7Et4_KBaG3gHw==".toCharArray(); | |||
String org = "qgs"; | |||
String bucket = "mytest"; | |||
InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://192.168.0.170:8086", token, org, bucket); | |||
WriteApi writeApi = influxDBClient.makeWriteApi(); | |||
// InfluxService influxService = new InfluxService(); | |||
// Event event = new Event(); | |||
// event.time = Instant.now(); | |||
// event.transationId = "asas"; | |||
// event.argName = "arg5"; | |||
// event.argValue = new Double(11); | |||
// influxService.insert(event); | |||
// Event event = new Event(); | |||
// event.setTime(Instant.now()); | |||
// event.setTransationId("asasd11"); | |||
// event.setArgName("argName11"); | |||
// event.setArgValue(3d); | |||
// InfluxClient.Client.insert(event,"ASProcessCompleteEvent"); | |||
Point point = Point.measurement("ASProcessCompleteEvent") | |||
.addTag("transationId", "112311") | |||
.addTag("argName", "argName11") | |||
.addField("argValue", 3D) | |||
.time(Instant.now().toEpochMilli(), WritePrecision.MS); | |||
Point point2 = Point.measurement("ASProcessCompleteEvent") | |||
.addTag("transationId", "222312") | |||
.addTag("argName", "argName11") | |||
.addField("argValue", 4D) | |||
.time(Instant.now().toEpochMilli(), WritePrecision.MS); | |||
List<Point> list = new ArrayList<>(); | |||
list.add(point); | |||
list.add(point2); | |||
writeApi.writePoints(list); | |||
//todo api.writeMeasurements(WritePrecision.NS, Arrays.asList(new H2OFeetMeasurement("coyote_creek", 15.0D, null, Instant.ofEpochSecond(0, 15)), new H2OFeetMeasurement("coyote_creek", 16.0D, null, Instant.ofEpochSecond(0, 16)))); | |||
// List<Event> events = new ArrayList<>(); | |||
// for(int i=0;i<99;i++){ | |||
// | |||
// Event event = new Event(); | |||
// event.time = Instant.now(); | |||
// event.transationId = "asas"+i; | |||
// event.argName = "arg7"; | |||
// event.argValue = new Double(i); | |||
// events.add(event); | |||
// } | |||
// List<Event> qList = new ArrayList<>(); | |||
// Event event = new Event(); | |||
// event.time = Instant.now(); | |||
// event.transationId = "asas"; | |||
// event.argName = "arg7"; | |||
// event.argValue = new Double(1); | |||
// Thread.sleep(100); | |||
// Event event2 = new Event(); | |||
// event2.time = Instant.now(); | |||
// event2.transationId = "asas"; | |||
// event2.argName = "arg7"; | |||
// event2.argValue = new Double(2); | |||
// qList.add(event); | |||
// qList.add(event2); | |||
// writeApi.writeMeasurement( WritePrecision.NS, qList); | |||
// for(int i=0;i<10;i++){ | |||
// Temperature temperature = new Temperature(); | |||
// temperature.location = "south"; | |||
// temperature.value = new Double(i); | |||
// temperature.type = "equipment3"; | |||
// temperature.time = Instant.now(); | |||
// | |||
// writeApi.writeMeasurement( WritePrecision.NS, temperature); | |||
// } | |||
// String flux = "from(bucket:\"mytest\") |> range(start: -60m)"; | |||
// flux += "|> filter(fn: (r) =>\n" + | |||
// " r._measurement == \"ASProcessCompleteEvent\" and \n" + | |||
//// " r._field == \"type\" and \n" + //对应 Field key | |||
// " r.argName == \"arg3\"\n" + //对应 Tags key (Tag 信息无法在FluxRecord 里面获取。) | |||
// " )"; | |||
// QueryApi queryApi = influxDBClient.getQueryApi(); | |||
// | |||
// List<FluxTable> tables = queryApi.query(flux); | |||
// for (FluxTable fluxTable : tables) { | |||
// List<FluxRecord> records = fluxTable.getRecords(); | |||
// for (FluxRecord fluxRecord : records) { | |||
// System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); | |||
//// System.out.println("time: "+fluxRecord.getTime() +" key:"++" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); | |||
// | |||
// } | |||
// } | |||
// from(bucket: "mytest") | |||
// |> range(start: 2022-06-29T11:30:00Z, stop: 2022-06-29T12:30:00Z) | |||
// |> filter(fn: (r) => r["_measurement"] == "ASProcessCompleteEvent") | |||
// |> filter(fn: (r) => r["argName"] == "arg4") | |||
// |> drop(columns: ["transationId"]) | |||
// |> sort(columns: ["_time"], desc: true) | |||
// 取前10条数据 | |||
// |> limit(n: 10, offset: 0) | |||
// 取 10-20 条数据 | |||
// |> limit(n: 10, offset: 10) | |||
// 取 20-30 条数据 | |||
// |> limit(n: 10, offset: 20) | |||
// QueryDataParam queryDataParam = new QueryDataParam(); | |||
// queryDataParam.setBucket("mytest"); | |||
// queryDataParam.setRange(new Range(getDate().toInstant(),new Date().toInstant())); | |||
// queryDataParam.setMeasurement("ASProcessCompleteEvent"); | |||
// queryDataParam.setTag(new Tag("argName","arg4")); | |||
// queryDataParam.setDropedTagName("transationId"); | |||
// queryDataParam.setPageInfo(new PageInfo(1,100)); | |||
// | |||
// List<FluxTable> tables = query(queryDataParam,influxDBClient); | |||
// List<FluxRecord> records1 = tables.get(0).getRecords(); | |||
// List<List<FluxRecord>> lists = Utils.fixedGroup(records1, 10); | |||
// for (FluxTable fluxTable : tables) { | |||
// List<FluxRecord> records = fluxTable.getRecords(); | |||
// for (FluxRecord fluxRecord : records) { | |||
// System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); | |||
// | |||
// } | |||
// } | |||
influxDBClient.close(); | |||
} | |||
public static Date getDate(){ | |||
Date date = new Date();//获取当前日期 | |||
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");//格式化一下 | |||
Calendar calendar1 = Calendar.getInstance();//获取对日期操作的类对象 | |||
//两种写法都可以获取到前三天的日期 | |||
// calendar1.set(Calendar.DAY_OF_YEAR,calendar1.get(Calendar.DAY_OF_YEAR) -3); | |||
//在当前时间的基础上获取前三天的日期 | |||
calendar1.add(Calendar.DATE, -3); | |||
//add方法 参数也可传入 月份,获取的是前几月或后几月的日期 | |||
//calendar1.add(Calendar.MONTH, -3); | |||
Date day = calendar1.getTime(); | |||
return day; | |||
} | |||
private static List<FluxTable> query(QueryDataParam param, InfluxDBClient influxDBClient){ | |||
String measurement = param.getMeasurement(); | |||
String dropedTagName = param.getDropedTagName(); | |||
Range range = param.getRange(); | |||
String bucket = param.getBucket(); | |||
String tagName = param.getTag().getTagName(); | |||
String tagValue = param.getTag().getTagValue(); | |||
PageInfo pageInfo = param.getPageInfo(); | |||
String flux = "from(bucket:\""+bucket+"\")"; | |||
flux += "|> range(start: "+range.getBegin().toString()+",stop:"+range.getEnd().toString()+") \n"; | |||
flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\") \n"; | |||
flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\") \n"; | |||
flux += "|> drop(columns: [\""+dropedTagName+"\"]) \n"; | |||
flux += "|> sort(columns: [\"_time\"], desc: true) \n"; | |||
if(pageInfo!=null){ | |||
flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; | |||
} | |||
QueryApi queryApi = influxDBClient.getQueryApi(); | |||
List<FluxTable> tables = queryApi.query(flux); | |||
for (FluxTable fluxTable : tables) { | |||
List<FluxRecord> records = fluxTable.getRecords(); | |||
for (FluxRecord fluxRecord : records) { | |||
System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); | |||
} | |||
} | |||
return tables; | |||
} | |||
} |
@@ -0,0 +1,69 @@ | |||
package com.qgs.dc.influx.common; | |||
import java.text.SimpleDateFormat; | |||
import java.util.ArrayList; | |||
import java.util.Calendar; | |||
import java.util.Date; | |||
import java.util.List; | |||
/** | |||
* @Desc: "" | |||
* @Author: caixiang | |||
* @DATE: 2022/6/29 16:23 | |||
*/ | |||
public class Utils { | |||
public static void main(String[] args) { | |||
ArrayList<Integer> arrs = new ArrayList<>(); | |||
for(int i=0;i<100;i++){ | |||
arrs.add(i); | |||
} | |||
List<List<Integer>> lists = fixedGroup(arrs, 10); | |||
System.out.println(); | |||
} | |||
public static Date getBeforeDate(Integer number){ | |||
Date date = new Date();//获取当前日期 | |||
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");//格式化一下 | |||
Calendar calendar1 = Calendar.getInstance();//获取对日期操作的类对象 | |||
//两种写法都可以获取到前三天的日期 | |||
// calendar1.set(Calendar.DAY_OF_YEAR,calendar1.get(Calendar.DAY_OF_YEAR) -3); | |||
//在当前时间的基础上获取前三天的日期 | |||
calendar1.add(Calendar.DATE, 0-number); | |||
//add方法 参数也可传入 月份,获取的是前几月或后几月的日期 | |||
//calendar1.add(Calendar.MONTH, -3); | |||
Date day = calendar1.getTime(); | |||
return day; | |||
} | |||
/** | |||
* 将一组数据固定分组,每组n个元素 | |||
* | |||
* @param source 要分组的数据源 | |||
* @param limit 每组n个元素 | |||
* @param <T> | |||
* @return | |||
*/ | |||
public static <T> List<List<T>> fixedGroup(List<T> source, int limit) { | |||
if (null == source || source.size() == 0 || limit <= 0) | |||
return null; | |||
List<List<T>> result = new ArrayList<List<T>>(); | |||
int remainder = source.size() % limit; | |||
int size = (source.size() / limit); | |||
for (int i = 0; i < size; i++) { | |||
List<T> subset = null; | |||
subset = source.subList(i * limit, (i + 1) * limit); | |||
result.add(subset); | |||
} | |||
if (remainder > 0) { | |||
List<T> subset = null; | |||
subset = source.subList(size * limit, size * limit + remainder); | |||
result.add(subset); | |||
} | |||
return result; | |||
} | |||
} |
@@ -0,0 +1,133 @@ | |||
package com.qgs.dc.influx.config; | |||
import com.influxdb.client.InfluxDBClient; | |||
import com.influxdb.client.InfluxDBClientFactory; | |||
import com.influxdb.client.QueryApi; | |||
import com.influxdb.client.WriteApi; | |||
import com.influxdb.client.domain.WritePrecision; | |||
import com.influxdb.client.write.Point; | |||
import com.influxdb.query.FluxTable; | |||
import com.qgs.dc.influx.param.PageInfo; | |||
import com.qgs.dc.influx.param.QueryDataParam; | |||
import com.qgs.dc.influx.param.Range; | |||
import com.qgs.dc.influx.template.Event; | |||
import java.util.ArrayList; | |||
import java.util.List; | |||
public enum InfluxClient { | |||
/** | |||
* influxdb 读写客户端,,如果write比较繁忙,后续可以考虑,维护 client一个线程池。 | |||
* */ | |||
Client("http://192.168.0.170:8086","lkBsC27QZr1W50BSPlGxpTqNNpwuUk5uz1dZZRPSPbCG5VmNDDUo8P3UkZIhGWwfJwkuz6ZGZ7Et4_KBaG3gHw==","qgs","qgs-bucket"), | |||
; | |||
private String url; | |||
private String token; | |||
private String org; | |||
private String bucket; | |||
private InfluxDBClient influxDBClient; | |||
private WriteApi writeApi; | |||
private QueryApi queryApi; | |||
InfluxClient(String url,String token,String org,String bucket){ | |||
this.url = url; | |||
this.token = token; | |||
this.org = org; | |||
this.bucket = bucket; | |||
this.influxDBClient = InfluxDBClientFactory.create(this.url, this.token.toCharArray(),this.org,this.bucket); | |||
this.writeApi = influxDBClient.makeWriteApi(); | |||
this.queryApi = influxDBClient.getQueryApi(); | |||
} | |||
public QueryApi getQueryApi() { | |||
return queryApi; | |||
} | |||
public WriteApi getWriteApi() { | |||
return writeApi; | |||
} | |||
/** | |||
* 测试连接是否正常 | |||
* | |||
* @return | |||
* true 服务正常健康 | |||
* false 异常 | |||
*/ | |||
private boolean ping() { | |||
boolean isConnected = false; | |||
Boolean pong; | |||
try { | |||
pong = influxDBClient.ping(); | |||
if (pong != null) { | |||
isConnected = true; | |||
} | |||
} catch (Exception e) { | |||
e.printStackTrace(); | |||
} | |||
return isConnected; | |||
} | |||
public void insert(Event event, String measurement){ | |||
Point point = Point.measurement(measurement) | |||
.addTag("transationId", event.getTransationId()) | |||
.addTag("argName", event.getArgName()) | |||
.addField("argValue", event.getArgValue()) | |||
.time(event.getTime().toEpochMilli(), WritePrecision.MS); | |||
writeApi.writePoint(point); | |||
} | |||
//异步 批量写入数据 | |||
//如果要批量插入的话,一次也只能写入 | |||
public void batchInsert(List<Event> events, String measurement){ | |||
List<Point> list = new ArrayList<>(); | |||
for(Event event:events){ | |||
Point point = Point.measurement(measurement) | |||
.addTag("transationId", event.getTransationId()) | |||
.addTag("argName", event.getArgName()) | |||
.addField("argValue", event.getArgValue()) | |||
.time(event.getTime().toEpochMilli(), WritePrecision.MS); | |||
list.add(point); | |||
} | |||
writeApi.writePoints(list); | |||
} | |||
public List<FluxTable> query(QueryDataParam param){ | |||
String measurement = param.getMeasurement(); | |||
String dropedTagName = param.getDropedTagName(); | |||
Range range = param.getRange(); | |||
String bucket = param.getBucket(); | |||
String tagName = param.getTag().getTagName(); | |||
String tagValue = param.getTag().getTagValue(); | |||
PageInfo pageInfo = param.getPageInfo(); | |||
String flux = "from(bucket:\""+bucket+"\")"; | |||
flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")"; | |||
flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")"; | |||
flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; | |||
flux += "|> drop(columns: [\""+dropedTagName+"\"])"; | |||
flux += "|> sort(columns: [\"_time\"], desc: true)"; | |||
if(pageInfo!=null){ | |||
flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; | |||
} | |||
// List<FluxTable> tables = queryApi.query(flux); | |||
// for (FluxTable fluxTable : tables) { | |||
// List<FluxRecord> records = fluxTable.getRecords(); | |||
// for (FluxRecord fluxRecord : records) { | |||
// System.out.println("time: "+fluxRecord.getTime() +" key:"+fluxRecord.getField()+" value: " + fluxRecord.getValueByKey("_value")+" measurement: " + fluxRecord.getMeasurement()); | |||
// | |||
// } | |||
// } | |||
return queryApi.query(flux); | |||
} | |||
} |
@@ -0,0 +1,23 @@ | |||
package com.qgs.dc.influx.constant; | |||
import com.influxdb.LogLevel; | |||
/** | |||
* @Desc: "" | |||
* @Author: caixiang | |||
* @DATE: 2022/7/1 9:18 | |||
*/ | |||
public class Constant { | |||
public static final String url = "http://192.168.0.170:8086"; | |||
public static final String token = "lkBsC27QZr1W50BSPlGxpTqNNpwuUk5uz1dZZRPSPbCG5VmNDDUo8P3UkZIhGWwfJwkuz6ZGZ7Et4_KBaG3gHw=="; | |||
public static final String org = "qgs"; | |||
public static final String bucket = "qgs-bucket"; | |||
public static final String username = "caixiang"; | |||
public static final String password = "25112856"; | |||
public static final LogLevel logLevel = LogLevel.BODY; | |||
public static final LogLevel readTimeout = LogLevel.BODY; | |||
public static final LogLevel writeTimeout = LogLevel.BODY; | |||
public static final LogLevel connectTimeout = LogLevel.BODY; | |||
} |
@@ -0,0 +1,74 @@ | |||
package com.qgs.dc.influx.controller; | |||
import com.influxdb.client.domain.WritePrecision; | |||
import com.influxdb.client.write.Point; | |||
import com.qgs.dc.influx.config.InfluxClient; | |||
import com.qgs.dc.influx.template.Event; | |||
import org.slf4j.Logger; | |||
import org.slf4j.LoggerFactory; | |||
import org.springframework.web.bind.annotation.PostMapping; | |||
import org.springframework.web.bind.annotation.RequestMapping; | |||
import org.springframework.web.bind.annotation.RestController; | |||
import java.time.Instant; | |||
@RestController | |||
@RequestMapping("/influx") | |||
public class InfluxDemoController { | |||
private static final Logger logger = LoggerFactory.getLogger(InfluxDemoController.class); | |||
// try (WriteApi writeApi = influxDBClient.makeWriteApi()) { | |||
// Temperature temperature = new Temperature(); | |||
// temperature.setLocation("east"); | |||
// temperature.setValue(106.2D); | |||
// temperature.setTime(Instant.now()); | |||
// writeApi.writeMeasurement(WritePrecision.NS,temperature); | |||
// } | |||
// | |||
// try (WriteApi writeApi = influxDBClient.makeWriteApi()) { | |||
// Point point = Point.measurement("temperature") | |||
// .addTag("location","south") | |||
// .addTag("owner","wxm") | |||
// .addField("wxm",230.8); | |||
// writeApi.writePoint(point); | |||
// } | |||
@PostMapping("/insertBatch") | |||
public void insertBatch() throws InterruptedException { | |||
// List<Event> list = new ArrayList<>(); | |||
// | |||
// for(int i=0;i<99;i++){ | |||
// //Thread.sleep(1000); | |||
// Event event = new Event(); | |||
// event.time = Instant.now(); | |||
// event.transationId = "asas"+i; | |||
// event.argName = "arg5"; | |||
// event.argValue = new Double(i); | |||
// list.add(event); | |||
// } | |||
// influxService.batchInsert(list); | |||
} | |||
public Point insert(Event event, String measurement){ | |||
Point point = Point.measurement(measurement) | |||
.addTag("transationId", event.getTransationId()) | |||
.addTag("argName", event.getArgName()) | |||
.addField("argValue", event.getArgValue()) | |||
.time(event.getTime().toEpochMilli(), WritePrecision.MS); | |||
return point; | |||
} | |||
@PostMapping("/insert") | |||
public void insert() throws InterruptedException { | |||
Event event = new Event(); | |||
event.setTime(Instant.now()); | |||
event.setTransationId("asasd11"); | |||
event.setArgName("argName11"); | |||
event.setArgValue(99765d); | |||
Point asProcessCompleteEvent = insert(event, "ASProcessCompleteEvent"); | |||
InfluxClient.Client.insert(event,"ASProcessCompleteEvent"); | |||
} | |||
} |
@@ -0,0 +1,30 @@ | |||
package com.qgs.dc.influx.entity; | |||
import lombok.Data; | |||
/** | |||
* @Desc: "" | |||
* @Author: caixiang | |||
* @DATE: 2022/6/27 10:46 | |||
*/ | |||
@Data | |||
public class Header { | |||
private String transationId; | |||
private String messageName; | |||
private String messageType; | |||
private String fromWhere; | |||
private String toWhere; | |||
private String equipmentId; | |||
private String sendTimestamp; | |||
private String argName; | |||
private Double argValue; | |||
} |
@@ -0,0 +1,24 @@ | |||
package com.qgs.dc.influx.param; | |||
import lombok.Data; | |||
import javax.validation.constraints.NotEmpty; | |||
import javax.validation.constraints.NotNull; | |||
import java.io.Serializable; | |||
/** | |||
* @Desc: "" | |||
* @Author: caixiang | |||
* @DATE: 2022/6/29 10:18 | |||
*/ | |||
@Data | |||
public class BaseParam implements Serializable { | |||
//page 信息可选 | |||
private PageInfo pageInfo; | |||
@NotEmpty(message = "measurement 不能为空") | |||
private String measurement; | |||
@NotNull(message = "查询时间段不能为空") | |||
private Range range; | |||
} |
@@ -0,0 +1,24 @@ | |||
package com.qgs.dc.influx.param; | |||
import lombok.Data; | |||
import org.hibernate.validator.constraints.Range; | |||
/** | |||
* @Desc: "" | |||
* @Author: caixiang | |||
* @DATE: 2022/6/29 10:19 | |||
*/ | |||
@Data | |||
public class PageInfo { | |||
@Range(min = 1, message = "页码必须大于等于1") | |||
private Integer current; | |||
// @NotNull(message = "每页显示条数不能为空") | |||
@Range(min = 1, max = 1000, message = "每页显示条数范围需在1-1000之间") | |||
private Integer size; | |||
public PageInfo(Integer current,Integer size){ | |||
this.current = current; | |||
this.size = size; | |||
} | |||
} |
@@ -0,0 +1,31 @@ | |||
package com.qgs.dc.influx.param; | |||
import lombok.Data; | |||
import lombok.EqualsAndHashCode; | |||
import lombok.experimental.Accessors; | |||
/** | |||
* @Desc: "influx 查询条件构造" | |||
* @Author: caixiang | |||
* @DATE: 2022/6/29 10:17 | |||
* | |||
* 注意: | |||
* 必填 | |||
* ① measurement 不能为空 | |||
* ② 时间段 不能为空 | |||
* ③ bucket 不能为空 | |||
* 非必填 | |||
* ① 分页信息可选 | |||
* ② tag | |||
* | |||
*/ | |||
@Data | |||
@EqualsAndHashCode(callSuper = false) | |||
@Accessors(chain = true) | |||
public class QueryDataParam extends BaseParam{ | |||
private Tag tag; | |||
//查询的时候,需要忽略的字段。(transationId是唯一标识会对 最终的查询结果集产生影响) | |||
private String dropedTagName; | |||
private String bucket; | |||
} |
@@ -0,0 +1,30 @@ | |||
package com.qgs.dc.influx.param; | |||
import lombok.Data; | |||
import javax.validation.constraints.NotNull; | |||
import java.time.Instant; | |||
/** | |||
* @Desc: "influxdb查询 时间范围" | |||
* @Author: caixiang | |||
* @DATE: 2022/6/29 11:14 | |||
*/ | |||
@Data | |||
public class Range { | |||
@NotNull(message = "起始时间不能为空") | |||
private Instant begin; | |||
@NotNull(message = "终点时间不能为空") | |||
private Instant end; | |||
public Range(Instant begin,Instant end){ | |||
this.begin = begin; | |||
this.end = end; | |||
} | |||
// public static void main(String[] args) { | |||
// Date date = new Date(); | |||
// System.out.println(date.toInstant().toString()); | |||
// } | |||
} |
@@ -0,0 +1,19 @@ | |||
package com.qgs.dc.influx.param; | |||
import lombok.Data; | |||
/** | |||
* @Desc: "" | |||
* @Author: caixiang | |||
* @DATE: 2022/6/29 14:38 | |||
*/ | |||
@Data | |||
public class Tag { | |||
private String tagName; | |||
private String tagValue; | |||
public Tag(String tagName,String tagValue){ | |||
this.tagName = tagName; | |||
this.tagValue = tagValue; | |||
} | |||
} |
@@ -0,0 +1,23 @@ | |||
package com.qgs.dc.influx.template; | |||
import com.influxdb.annotations.Column; | |||
import com.influxdb.annotations.Measurement; | |||
import lombok.Data; | |||
import java.time.Instant; | |||
/** | |||
* @Desc: "" | |||
* @Author: caixiang | |||
* @DATE: 2022/6/25 11:13 | |||
*/ | |||
@Data | |||
public class Event { | |||
private Instant time; | |||
private String transationId; | |||
private String argName; | |||
private Double argValue; | |||
} |
@@ -0,0 +1,28 @@ | |||
package com.qgs.dc.influx.template; | |||
import com.influxdb.annotations.Column; | |||
import com.influxdb.annotations.Measurement; | |||
import java.time.Instant; | |||
/** | |||
* @Desc: "" | |||
* @Author: caixiang | |||
* @DATE: 2022/6/22 15:52 | |||
*/ | |||
//Temperature.java | |||
@Measurement(name = "temperature") | |||
public class Temperature { | |||
@Column(tag = true) | |||
public String location; | |||
@Column | |||
public Double value; | |||
@Column | |||
public String type; | |||
@Column(timestamp = true) | |||
public Instant time; | |||
} |
@@ -1,18 +1,26 @@ | |||
package com.qgs.dc.s7.controller; | |||
import com.influxdb.client.InfluxDBClient; | |||
import com.influxdb.client.domain.WritePrecision; | |||
import com.influxdb.client.write.Point; | |||
import com.qgs.dc.opcua.controller.R; | |||
import com.qgs.dc.s7.entity.AGVInfoCallBack; | |||
import com.qgs.dc.s7.my.s7connector.enmuc.PlcVarActual; | |||
import com.qgs.dc.s7.my.s7connector.enmuc.S7Client; | |||
import com.qgs.dc.s7.my.s7connector.type.PlcVar; | |||
import org.apache.plc4x.java.api.exceptions.PlcConnectionException; | |||
import org.slf4j.Logger; | |||
import org.slf4j.LoggerFactory; | |||
import org.springframework.beans.factory.annotation.Autowired; | |||
import org.springframework.web.bind.annotation.*; | |||
import java.io.UnsupportedEncodingException; | |||
import java.text.ParseException; | |||
import java.time.Instant; | |||
import java.util.Arrays; | |||
import java.util.List; | |||
import java.util.Random; | |||
@@ -22,18 +30,42 @@ import java.util.Random; | |||
public class S7DemoController { | |||
private static final Logger logger = LoggerFactory.getLogger(S7DemoController.class); | |||
// @Autowired | |||
// InfluxDBClient influxDBClient; | |||
// | |||
// @PostMapping("/insert") | |||
// public void insert() throws InterruptedException { | |||
// Event event = new Event(); | |||
// event.setTime(Instant.now()); | |||
// event.setTransationId("asasd11"); | |||
// event.setArgName("argName11"); | |||
// event.setArgValue(7d); | |||
// Point asProcessCompleteEvent = insert(event, "ASProcessCompleteEventSSS"); | |||
// influxDBClient.makeWriteApi().writePoint(asProcessCompleteEvent); | |||
// } | |||
@PostMapping(value = "testFor") | |||
private void testFor(@RequestBody AGVInfoCallBack agvInfoCallBack) { | |||
System.out.println(agvInfoCallBack.toString()); | |||
} | |||
//demo1 | |||
@PostMapping("/testReadAll") | |||
public R testReadAll() throws UnsupportedEncodingException, ParseException { | |||
for(PlcVarActual actual:PlcVarActual.values()){ | |||
System.out.println(read(S7Client.S7_1500,actual)); | |||
} | |||
return R.ok(); | |||
} | |||
// public Point insert(Event event, String measurement){ | |||
// Point point = Point.measurement(measurement) | |||
// .addTag("transationId", event.getTransationId()) | |||
// .addTag("argName", event.getArgName()) | |||
// .addField("argValue", event.getArgValue()) | |||
// .time(event.getTime().toEpochMilli(), WritePrecision.MS); | |||
// return point; | |||
// } | |||
//demo2 | |||
@PostMapping("/readTest") | |||
public R getTestForS7() throws UnsupportedEncodingException, ParseException { | |||
@@ -42,11 +74,13 @@ public class S7DemoController { | |||
List<Character> characters = (List<Character>)read(S7Client.S7_1500,PlcVarActual.CharArrays); | |||
List<Boolean> booleans = (List<Boolean>)read(S7Client.S7_1500,PlcVarActual.BooleanArrays); | |||
String stri = (String)read(S7Client.S7_1500,PlcVarActual.STRING1); | |||
return R.ok().put("res",heartBeat).put("characters",characters).put("ddtl",ddtl).put("bools",booleans).put("str",stri); | |||
} | |||
@PostMapping("/readTest2") | |||
public R getTest2ForS7() throws UnsupportedEncodingException, ParseException { | |||
// List<Character> characters = (List<Character>)read(PlcVarActual.CharArrays,S7Client.S7_1500); | |||
@@ -99,7 +133,7 @@ public class S7DemoController { | |||
long l = System.currentTimeMillis(); | |||
String[] subs = (String[])read(S7Client.S7_1200,PlcVarActual.SubIdArrays1200); //65ms | |||
long l1 = System.currentTimeMillis(); | |||
System.out.println(Arrays.toString(subs)); | |||
//System.out.println(Arrays.toString(subs)); | |||
String[] toWrite = new String[63]; | |||
for(int i=0;i<63;i++){ | |||
@@ -111,8 +145,8 @@ public class S7DemoController { | |||
write(S7Client.S7_1200,PlcVarActual.SubIdArrays1200,toWrite); | |||
long c2 = System.currentTimeMillis(); | |||
String[] subs2 = (String[])read(S7Client.S7_1200,PlcVarActual.SubIdArrays1200); | |||
logger.info("正常测试: l:"+(l1-l)+"c:"+(c2-c1)+";;read1:"+Arrays.toString(subs)+";;read2:"+Arrays.toString(subs2)); | |||
return R.ok().put("l",(l1-l)).put("c",(c2-c1)); | |||
} | |||
@@ -160,8 +194,19 @@ public class S7DemoController { | |||
} | |||
//PlcVarActual 到时候改成你们的 xxxPlcToWcs 或者 xxxWcsToPlc | |||
/** | |||
* return | |||
* 成功: 返回相应的object对象 | |||
* 失败: 返回null | |||
* */ | |||
private Object read(S7Client s7Client,PlcVarActual var) throws UnsupportedEncodingException, ParseException { | |||
return s7Client.read(var.getArea(), var.getAreaNumber(), var.getByteOffset(), var.getBitOffset(), var.getLength(), var.getStrSize(), var.getType()); | |||
try { | |||
return s7Client.read(var.getArea(), var.getAreaNumber(), var.getByteOffset(), var.getBitOffset(), var.getLength(), var.getStrSize(), var.getType()); | |||
}catch (Exception e){ | |||
logger.error("host:"+s7Client.getHost()+" ; read 操作出现问题: "+e.getMessage()); | |||
e.printStackTrace(); | |||
return null; | |||
} | |||
} | |||
private void write(S7Client s7Client,PlcVarActual var,Object newValue) { | |||
if(var.getType().equals(PlcVar.STRING_Array)){ | |||
@@ -179,25 +224,25 @@ public class S7DemoController { | |||
} | |||
} | |||
//demo3 | |||
@PostMapping("/writeTest") | |||
public R writeTest() throws PlcConnectionException, UnsupportedEncodingException { | |||
write(S7Client.S7_1500,PlcVarActual.HeartBeat, false); | |||
char[] charArrays_content = new char[2]; | |||
charArrays_content[0] = '1'; | |||
charArrays_content[1] = 'c'; | |||
write( S7Client.S7_1500,PlcVarActual.CharArrays, charArrays_content); | |||
boolean[] boolArrays_content = new boolean[2]; | |||
boolArrays_content[0] = true; | |||
boolArrays_content[1] = false; | |||
write(S7Client.S7_1500,PlcVarActual.BooleanArrays, boolArrays_content); | |||
String str = "你好啊aa"; | |||
//todo string 的读写有问题 待会看看 | |||
write(S7Client.S7_1500,PlcVarActual.STRING1, str); | |||
return R.ok().put("res",true); | |||
} | |||
// //demo3 | |||
// @PostMapping("/writeTest") | |||
// public R writeTest() throws PlcConnectionException, UnsupportedEncodingException { | |||
// write(S7Client.S7_1500,PlcVarActual.HeartBeat, false); | |||
// | |||
// char[] charArrays_content = new char[2]; | |||
// charArrays_content[0] = '1'; | |||
// charArrays_content[1] = 'c'; | |||
// write( S7Client.S7_1500,PlcVarActual.CharArrays, charArrays_content); | |||
// | |||
// boolean[] boolArrays_content = new boolean[2]; | |||
// boolArrays_content[0] = true; | |||
// boolArrays_content[1] = false; | |||
// write(S7Client.S7_1500,PlcVarActual.BooleanArrays, boolArrays_content); | |||
// | |||
// String str = "你好啊aa"; | |||
// //todo string 的读写有问题 待会看看 | |||
// write(S7Client.S7_1500,PlcVarActual.STRING1, str); | |||
// return R.ok().put("res",true); | |||
// } | |||
} |
@@ -0,0 +1,24 @@ | |||
package com.qgs.dc.s7.entity; | |||
import lombok.Data; | |||
import lombok.EqualsAndHashCode; | |||
import java.io.Serializable; | |||
/** | |||
* <p> | |||
* 表 | |||
* </p> | |||
* | |||
* @author caixiang | |||
* @since 2022-05-31 | |||
*/ | |||
@Data | |||
@EqualsAndHashCode(callSuper = false) | |||
public class AGVInfoCallBack implements Serializable { | |||
private static final long serialVersionUID = 1L; | |||
private String warehouseName; | |||
private Integer status; | |||
private String content; | |||
} |
@@ -1,28 +0,0 @@ | |||
package com.qgs.dc.s7.enums; | |||
import org.apache.plc4x.java.PlcDriverManager; | |||
import org.apache.plc4x.java.api.PlcConnection; | |||
import org.apache.plc4x.java.api.exceptions.PlcConnectionException; | |||
import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager; | |||
/** | |||
* @Author: 蔡翔 | |||
* @Date: 2019/12/5 14:53 | |||
* @Version 1.0 | |||
*/ | |||
public enum S7DriveManage { | |||
INSTANCE("driverManagerPool", new PooledPlcDriverManager()) | |||
; | |||
PlcDriverManager driverManager; | |||
private String desc; | |||
S7DriveManage(String desc, PooledPlcDriverManager object){ | |||
this.driverManager = object; | |||
this.desc = desc; | |||
} | |||
public PlcDriverManager getInstance(){ | |||
return this.driverManager; | |||
} | |||
} |
@@ -7,10 +7,12 @@ import com.qgs.dc.s7.my.s7connector.api.utils.ByteUtils; | |||
import com.qgs.dc.s7.my.s7connector.type.PlcVar; | |||
import org.slf4j.Logger; | |||
import org.slf4j.LoggerFactory; | |||
import org.springframework.http.ResponseEntity; | |||
import java.io.UnsupportedEncodingException; | |||
import java.text.ParseException; | |||
import java.util.ArrayList; | |||
import java.util.Arrays; | |||
import java.util.List; | |||
import java.util.concurrent.Executors; | |||
import java.util.concurrent.ScheduledExecutorService; | |||
@@ -23,9 +25,9 @@ import java.util.concurrent.ScheduledExecutorService; | |||
public enum S7Client { | |||
//TODO 步骤1 这里是配置多PLC 的,,,有多个plc 就在这里配置一个枚举类 | |||
//1500 西门子200smart、1200、1500默认的 机架号=0 槽位号=1; 300/400 默认的 机架-0 插槽-2 | |||
S7_1500("192.168.0.51",0,1,3,PlcVarActual.HeartBeat), | |||
S7_1500("192.168.0.51",0,1,2,PlcVarActual.HeartBeat), | |||
//1500 机架-0 插槽-1 | |||
S7_1200("192.168.0.52",0,1,3,PlcVarActual.HeartBeatFor1200) | |||
S7_1200("192.168.0.52",0,1,2,PlcVarActual.HeartBeatFor1200) | |||
//后续 在这里扩展 多PLC应用。 | |||
@@ -70,6 +72,10 @@ public enum S7Client { | |||
check_ping(); | |||
} | |||
public String getHost(){ | |||
return this.host; | |||
} | |||
/** | |||
* PlcVar(byte[]) 转 java对象 对照表 | |||
* 单体变量 | |||
@@ -159,6 +165,8 @@ public enum S7Client { | |||
} | |||
} | |||
/** | |||
* | |||
* eg : | |||
@@ -317,6 +325,7 @@ public enum S7Client { | |||
heartBeat.getByteOffset(), | |||
heartBeat.getBitOffset(), | |||
heartBeat.getType().getTransportSize()); | |||
System.out.println("host:"+host +" ;; "+connector.hashCode()+" : ping"); | |||
Thread.sleep(100); | |||
}catch (Exception e){ | |||
@@ -8,8 +8,8 @@ server: | |||
spring: | |||
rabbitmq: | |||
# 如果是rabbitmq+haproxy+keepalived集群 ,,那么192.168.0.176是haproxy代理的地址(严格来说是keepalived的vip) | |||
#addresses: 192.168.0.183:5672 ## 新版rabbitmq 版本还未测试 | |||
addresses: 192.168.0.176:5672 | |||
addresses: 192.168.0.176:5672 ## 新版rabbitmq 版本还未测试 | |||
#addresses: 172.16.21.133:5672 | |||
username: cdte | |||
password: cdte | |||
virtual-host: cdte | |||
@@ -32,4 +32,21 @@ spring: | |||
#initial-interval: 2000 #重试间隔时间(单位毫秒) | |||
#max-interval: 10000 # 重试最大间隔时间 | |||
#multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间 | |||
#================重试机制 结束 | |||
#================重试机制 结束 | |||
#influx: | |||
# influxUrl: 'http://192.168.0.170:8086' | |||
# bucket: 'qgs-bucket' | |||
# org: 'qgs' | |||
# token: 'lkBsC27QZr1W50BSPlGxpTqNNpwuUk5uz1dZZRPSPbCG5VmNDDUo8P3UkZIhGWwfJwkuz6ZGZ7Et4_KBaG3gHw==' | |||
influx: | |||
influxUrl: 'http://192.168.0.170:8086' | |||
bucket: 'qgs-bucket' | |||
org: 'qgs' | |||
token: 'lkBsC27QZr1W50BSPlGxpTqNNpwuUk5uz1dZZRPSPbCG5VmNDDUo8P3UkZIhGWwfJwkuz6ZGZ7Et4_KBaG3gHw==' | |||
# /health point | |||
#management: | |||
# health: | |||
# influxdb: | |||
# enabled: true |