mark for doc
This commit is contained in:
parent
e2a4f08c0f
commit
8369dad0b7
10
src/main/java/com/qgs/dc/mq/Constant/Constant.java
Normal file
10
src/main/java/com/qgs/dc/mq/Constant/Constant.java
Normal file
@ -0,0 +1,10 @@
|
||||
package com.qgs.dc.mq.Constant;
|
||||
|
||||
/**
|
||||
* @Desc: ""
|
||||
* @Author: caixiang
|
||||
* @DATE: 2021/9/7 9:25
|
||||
*/
|
||||
public class Constant {
|
||||
public static final String mesCallBackUrl = "localhost:8080/api/mq/mq-message-received/receivedFromEapRequest";
|
||||
}
|
@ -4,6 +4,7 @@ import cn.hutool.http.HttpRequest;
|
||||
import cn.hutool.http.HttpUtil;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.qgs.dc.common.utils.CommonFunction;
|
||||
import com.qgs.dc.mq.Constant.Constant;
|
||||
import com.qgs.dc.mq.configuration.ConfigOf00B;
|
||||
import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler;
|
||||
import com.qgs.dc.mq.entity.MQMessage;
|
||||
@ -56,7 +57,7 @@ public class PID00BReceived {
|
||||
|
||||
//1. 正常情况
|
||||
//Integer integer = mqMessageHandler.requestHandler(message);
|
||||
String integer= HttpUtil.post("localhost:8080/api/mq/mq-message-received/receivedFromEapRequest",JSONObject.toJSONString(mqMessage));
|
||||
String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage));
|
||||
Integer result = Integer.valueOf(integer);
|
||||
|
||||
//注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。
|
||||
@ -112,8 +113,6 @@ public class PID00BReceived {
|
||||
@RabbitHandler
|
||||
public void mesResponse(Message<?> message, Channel channel)throws Exception{
|
||||
logger.info("==============PID00B_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1"));
|
||||
|
||||
|
||||
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
|
||||
|
||||
//模拟异常 ,然后 拒签消息 ,然后丢到死信队列
|
||||
|
@ -1,73 +0,0 @@
|
||||
package com.qgs.dc.mq.consumer;
|
||||
|
||||
import com.qgs.dc.mq.configuration.ConfigOf00C;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.support.AmqpHeaders;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @Desc: "PID00B设备 接收MQ消息 监听类"
|
||||
* @Author: caixiang
|
||||
* @DATE: 2021/6/22 15:30
|
||||
*
|
||||
* Ctrl+R 替换设备名
|
||||
*/
|
||||
@Component
|
||||
public class PID00CReceived {
|
||||
|
||||
@RabbitListener(queues = ConfigOf00C.EAP_REQUEST_QUEUE_00C)
|
||||
@RabbitHandler
|
||||
public void eapRequest(Message<?> message, Channel channel)throws Exception{
|
||||
|
||||
System.out.println("==============received message-EAP_REQUEST_QUEUE_00C=================,priority:"+"equipmentName"+message.getHeaders().get("attr2"));
|
||||
Thread.sleep(100);
|
||||
|
||||
String correlationId = (String)message.getHeaders().get("spring_returned_message_correlation");
|
||||
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
|
||||
|
||||
|
||||
|
||||
|
||||
//模拟异常 ,然后 拒签消息 ,然后丢到死信队列
|
||||
try {
|
||||
System.err.println("处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列");
|
||||
throw new Exception("11111");
|
||||
}catch (Exception e){
|
||||
// 第一个false 是 不批量签收;第二个false 是 不重回队列
|
||||
channel.basicNack(deliveryTag,false,false);
|
||||
return;
|
||||
}
|
||||
|
||||
//注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。
|
||||
//channel.basicAck(deliveryTag,false);
|
||||
|
||||
}
|
||||
|
||||
@RabbitListener(queues = ConfigOf00C.MES_RESPONSE_QUEUE_00C)
|
||||
@RabbitHandler
|
||||
public void mesResponse(Message<?> message, Channel channel)throws Exception{
|
||||
System.out.println("==============PID00B_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1"));
|
||||
Thread.sleep(100);
|
||||
|
||||
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
|
||||
|
||||
|
||||
//模拟异常 ,然后 拒签消息 ,然后丢到死信队列
|
||||
try {
|
||||
System.err.println("处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列");
|
||||
throw new Exception("11111");
|
||||
}catch (Exception e){
|
||||
// 第一个false 是 不批量签收;第二个false 是 不重回队列
|
||||
channel.basicNack(deliveryTag,false,false);
|
||||
return;
|
||||
}
|
||||
|
||||
//注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。
|
||||
//channel.basicAck(deliveryTag,false);
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -1,12 +1,8 @@
|
||||
package com.qgs.dc.mq.controller;
|
||||
|
||||
import cn.hutool.http.HttpUtil;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.qgs.dc.mq.Constant.SecsGemTimeout;
|
||||
import com.qgs.dc.mq.entity.MQMessage;
|
||||
import com.qgs.dc.mq.entity.CallbackMessageEntity;
|
||||
import com.qgs.dc.mq.entity.common.Header;
|
||||
import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody;
|
||||
import com.qgs.dc.mq.producer.component.RabbitSender;
|
||||
import com.qgs.dc.mq.secsgem.AsyncFuture;
|
||||
import com.qgs.dc.mq.secsgem.SendedList;
|
||||
@ -18,10 +14,7 @@ import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
@ -63,6 +56,8 @@ public class MQController {
|
||||
* desc : MES给EAP发送远程指令(MES_Request)(向rabbitmq中发送消息(direct模式))
|
||||
* MES => DC(数据采集中间件) => MQ => EAP
|
||||
* return :返回 的就是这个指令的回复指令
|
||||
* 1 = 成功
|
||||
* 其他 = 异常
|
||||
*/
|
||||
public R mesRequest(@RequestBody CallbackMessageEntity callbackMessageEntity){
|
||||
try {
|
||||
@ -85,7 +80,7 @@ public class MQController {
|
||||
return R.ok().put("responseMessage",mqMessageResponse);
|
||||
}catch (Exception e){
|
||||
logger.error("MES => EAP (MES_Request) , 状态:DC处理异常 , 内容:"+ callbackMessageEntity.toString());
|
||||
return R.error().put("result",e.getMessage());
|
||||
return R.error(2,e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -31,36 +31,6 @@ public class TestController {
|
||||
|
||||
|
||||
|
||||
//For EAP SERVER1
|
||||
@GetMapping("/putMessageDirect")
|
||||
public String putMessageDirect() throws Exception {
|
||||
Map<String,Object> properties = new HashMap<>();
|
||||
properties.put("attr1",String.valueOf(1));
|
||||
properties.put("attr2","00B");
|
||||
//序列化
|
||||
MQMessage mqMessage = new MQMessage();
|
||||
|
||||
Header header = new Header("Request","Execute","QUERYEQPStatus","12");
|
||||
header.setTransactionId("abc123456");
|
||||
QueryEQStatusBody queryEQStatusBody = new QueryEQStatusBody();
|
||||
queryEQStatusBody.setVidType("u4");
|
||||
List<String> vids = new ArrayList<>();
|
||||
vids.add("10000");
|
||||
vids.add("10001");
|
||||
vids.add("10002");
|
||||
queryEQStatusBody.setVidList(vids);
|
||||
mqMessage.setBody(JSONObject.toJSONBytes(queryEQStatusBody));
|
||||
mqMessage.setHeader(header);
|
||||
|
||||
|
||||
|
||||
//rabbitSender.sendDirect(mqMessage,properties,"00B_Exchange","00B_EAP_Request_Queue_RoutingKey","6000");
|
||||
rabbitSender.sendDirect(mqMessage,properties,"00B_Exchange","00B_MES_Response_Queue_RoutingKey");
|
||||
|
||||
Thread.sleep(30);
|
||||
return null;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
Map<String,Object> properties = new HashMap<>();
|
||||
properties.put("attr1",String.valueOf(1));
|
||||
|
@ -144,7 +144,7 @@ public class OperateController {
|
||||
return R.error().put("result",readArgEntity.getPlcName()+" 这台plc不存在 / 或参数异常");
|
||||
}
|
||||
Object value = dv.getValue().getValue();
|
||||
System.out.println(CommonFunction.judgeVarType(value));
|
||||
logger.info(CommonFunction.judgeVarType(value));
|
||||
return R.ok().put("result", CommonFunction.var(value));
|
||||
}catch (Exception e){
|
||||
return R.error().put("result", e.getMessage());
|
||||
@ -178,7 +178,7 @@ public class OperateController {
|
||||
|
||||
System.err.println("(测试是否 每隔intervel 都会执行这个回调方法) subscription value received: item:NodeId : " +
|
||||
"" + item.getReadValueId().getNodeId() + ", value :" + dataValue.getValue());
|
||||
System.out.println();
|
||||
|
||||
NodeId currentNode = item.getReadValueId().getNodeId();
|
||||
UShort namespaceIndex = currentNode.getNamespaceIndex();
|
||||
Object identifier = currentNode.getIdentifier();
|
||||
|
@ -832,6 +832,9 @@ public class UAService {
|
||||
* 注意:
|
||||
* 1.同时监听多个变量,只要有一个变量发生改变了,就把调用回调函数并且把那个改变了的变量通过websocket发给前端(如果两个同时改变也不是同时发给前端的是一个一个发的)。因为不同变量有不同的监视器,而监视器他们自己会调回调函数
|
||||
* 2.如果你要新订阅的变量已经存在了,那么就什么也不错(因为加了个过滤器)
|
||||
* 3.samplingInterval 和 publishingInterval 的区别
|
||||
* ① The sampling interval is the rate at which you want the server to sample/poll/monitor an item at. Items in a subscription may come from varying sources and each can be sampled at its own rate.
|
||||
* ② The publishing interval is the rate at which the subscription "executes" and reports any items that have changed. It also influences the calculation that determines how often a keep alive response should be returned should there be no changing data.
|
||||
* 参数:
|
||||
* 回调函数BiConsumer<UaMonitoredItem, DataValue>,是当你订阅的这个变量当变量发生 改变的时候 你执行的方法(刚开始 会执行一次)
|
||||
* 返回值:
|
||||
@ -846,8 +849,9 @@ public class UAService {
|
||||
* 如果定义为Sampling,则这个Subscription是一个Triggered Item,即被激发的订阅,需要一个定义为Reporting的Subscription(称为Triggering Item)
|
||||
* 与它连接。这样当Triggering Item更新时,会激发Triggered Item更新。
|
||||
*
|
||||
*
|
||||
* */
|
||||
public synchronized Integer subscribeForVisit(List<Integer> listNameSpace,List<String> listIdentifier,Double listenTimeInterval,UaMonitoredItem.ValueConsumer biConsumer,String plcName) throws Exception {
|
||||
public synchronized Integer subscribeForVisit(List<Integer> listNameSpace,List<String> listIdentifier,Double samplingInterval,UaMonitoredItem.ValueConsumer biConsumer,String plcName) throws Exception {
|
||||
if(!isValid(plcName)){
|
||||
return null;
|
||||
}
|
||||
@ -875,6 +879,7 @@ public class UAService {
|
||||
//SubscribeEntity 不存在的时候新建一个SubscribeEntity,并且UaSubscription也要新建一个
|
||||
if(currentNow == null){
|
||||
try {
|
||||
|
||||
subscription = client.getSubscriptionManager().createSubscription(1000.0).get();
|
||||
}catch (Exception e){
|
||||
throw new Exception("在 subscribeForVisit 的时候出现异常,,具体异常是: "+e.getMessage());
|
||||
@ -916,7 +921,7 @@ public class UAService {
|
||||
|
||||
MonitoringParameters parameters = new MonitoringParameters(
|
||||
clientHandle,
|
||||
listenTimeInterval, // sampling interval
|
||||
samplingInterval, // sampling interval
|
||||
null, // filter, null means use default
|
||||
uint(10), // queue size
|
||||
true // discard oldest
|
||||
|
@ -39,7 +39,7 @@
|
||||
<!-- 2.1 level为 DEBUG 日志,时间滚动输出 -->
|
||||
<appender name="DEBUG_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<!-- 正在记录的日志文档的路径及文档名 -->
|
||||
<file>${logging.pathwork}/work-log-debug.log</file>
|
||||
<file>${logging.pathwork}/debug/work-log-debug.log</file>
|
||||
<!--日志文档输出格式-->
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
|
||||
@ -79,7 +79,7 @@
|
||||
|
||||
<appender name="OPCUA_DEBUG_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<!-- 正在记录的日志文档的路径及文档名 -->
|
||||
<file>${logging.pathopc}/opcua-log-debug.log</file>
|
||||
<file>${logging.pathopc}/debug/opcua-log-debug.log</file>
|
||||
<!--日志文档输出格式-->
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
|
||||
@ -106,7 +106,7 @@
|
||||
|
||||
<appender name="MQ_DEBUG_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<!-- 正在记录的日志文档的路径及文档名 -->
|
||||
<file>${logging.pathmq}/mq-log-debug.log</file>
|
||||
<file>${logging.pathmq}/debug/mq-log-debug.log</file>
|
||||
<!--日志文档输出格式-->
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
|
||||
@ -142,7 +142,7 @@
|
||||
<!-- 2.2 level为 INFO 日志,时间滚动输出 -->
|
||||
<appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<!-- 正在记录的日志文档的路径及文档名 -->
|
||||
<file>${logging.pathwork}/work-log-info.log</file>
|
||||
<file>${logging.pathwork}/info/work-log-info.log</file>
|
||||
<!--日志文档输出格式-->
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
|
||||
@ -176,7 +176,7 @@
|
||||
|
||||
<appender name="OPCUA_INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<!-- 正在记录的日志文档的路径及文档名 -->
|
||||
<file>${logging.pathopc}/opcua-log-info.log</file>
|
||||
<file>${logging.pathopc}/info/opcua-log-info.log</file>
|
||||
<!--日志文档输出格式-->
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
|
||||
@ -210,7 +210,7 @@
|
||||
</appender>
|
||||
<appender name="MQ_INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<!-- 正在记录的日志文档的路径及文档名 -->
|
||||
<file>${logging.pathmq}/mq-log-info.log</file>
|
||||
<file>${logging.pathmq}/info/mq-log-info.log</file>
|
||||
<!--日志文档输出格式-->
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
|
||||
@ -246,7 +246,7 @@
|
||||
<!-- 2.3 level为 WARN 日志,时间滚动输出 -->
|
||||
<appender name="WARN_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<!-- 正在记录的日志文档的路径及文档名 -->
|
||||
<file>${logging.pathwork}/work-log-warn.log</file>
|
||||
<file>${logging.pathwork}/warn/work-log-warn.log</file>
|
||||
<!--日志文档输出格式-->
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
|
||||
@ -280,7 +280,7 @@
|
||||
|
||||
<appender name="OPCUA_WARN_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<!-- 正在记录的日志文档的路径及文档名 -->
|
||||
<file>${logging.pathopc}/opcua-log-warn.log</file>
|
||||
<file>${logging.pathopc}/warn/opcua-log-warn.log</file>
|
||||
<!--日志文档输出格式-->
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
|
||||
@ -312,7 +312,7 @@
|
||||
</appender>
|
||||
<appender name="MQ_WARN_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<!-- 正在记录的日志文档的路径及文档名 -->
|
||||
<file>${logging.pathmq}/mq-log-warn.log</file>
|
||||
<file>${logging.pathmq}/warn/mq-log-warn.log</file>
|
||||
<!--日志文档输出格式-->
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
|
||||
@ -347,7 +347,7 @@
|
||||
<!-- 2.4 level为 ERROR 日志,时间滚动输出 -->
|
||||
<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<!-- 正在记录的日志文档的路径及文档名 -->
|
||||
<file>${logging.pathwork}/work-log-error.log</file>
|
||||
<file>${logging.pathwork}/error/work-log-error.log</file>
|
||||
<!--日志文档输出格式-->
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
|
||||
@ -382,7 +382,7 @@
|
||||
|
||||
<appender name="OPCUA_ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<!-- 正在记录的日志文档的路径及文档名 -->
|
||||
<file>${logging.pathopc}/opcua-log-error.log</file>
|
||||
<file>${logging.pathopc}/error/opcua-log-error.log</file>
|
||||
<!--日志文档输出格式-->
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
|
||||
@ -416,7 +416,7 @@
|
||||
</appender>
|
||||
<appender name="MQ_ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<!-- 正在记录的日志文档的路径及文档名 -->
|
||||
<file>${logging.pathmq}/mq-log-error.log</file>
|
||||
<file>${logging.pathmq}/error/mq-log-error.log</file>
|
||||
<!--日志文档输出格式-->
|
||||
<encoder>
|
||||
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
|
||||
|
Loading…
Reference in New Issue
Block a user