diff --git a/src/main/java/com/qgs/dc/mq/Constant/Constant.java b/src/main/java/com/qgs/dc/mq/Constant/Constant.java new file mode 100644 index 0000000..7acfc3a --- /dev/null +++ b/src/main/java/com/qgs/dc/mq/Constant/Constant.java @@ -0,0 +1,10 @@ +package com.qgs.dc.mq.Constant; + +/** + * @Desc: "" + * @Author: caixiang + * @DATE: 2021/9/7 9:25 + */ +public class Constant { + public static final String mesCallBackUrl = "localhost:8080/api/mq/mq-message-received/receivedFromEapRequest"; +} diff --git a/src/main/java/com/qgs/dc/mq/consumer/PID00BReceived.java b/src/main/java/com/qgs/dc/mq/consumer/PID00BReceived.java index 2a8b17c..b46aca8 100644 --- a/src/main/java/com/qgs/dc/mq/consumer/PID00BReceived.java +++ b/src/main/java/com/qgs/dc/mq/consumer/PID00BReceived.java @@ -4,6 +4,7 @@ import cn.hutool.http.HttpRequest; import cn.hutool.http.HttpUtil; import com.alibaba.fastjson.JSONObject; import com.qgs.dc.common.utils.CommonFunction; +import com.qgs.dc.mq.Constant.Constant; import com.qgs.dc.mq.configuration.ConfigOf00B; import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; import com.qgs.dc.mq.entity.MQMessage; @@ -56,7 +57,7 @@ public class PID00BReceived { //1. 正常情况 //Integer integer = mqMessageHandler.requestHandler(message); - String integer= HttpUtil.post("localhost:8080/api/mq/mq-message-received/receivedFromEapRequest",JSONObject.toJSONString(mqMessage)); + String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage)); Integer result = Integer.valueOf(integer); //注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 @@ -112,8 +113,6 @@ public class PID00BReceived { @RabbitHandler public void mesResponse(Message message, Channel channel)throws Exception{ logger.info("==============PID00B_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1")); - - Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); //模拟异常 ,然后 拒签消息 ,然后丢到死信队列 diff --git a/src/main/java/com/qgs/dc/mq/consumer/PID00CReceived.java b/src/main/java/com/qgs/dc/mq/consumer/PID00CReceived.java deleted file mode 100644 index 9b32d58..0000000 --- a/src/main/java/com/qgs/dc/mq/consumer/PID00CReceived.java +++ /dev/null @@ -1,73 +0,0 @@ -package com.qgs.dc.mq.consumer; - -import com.qgs.dc.mq.configuration.ConfigOf00C; -import com.rabbitmq.client.Channel; -import org.springframework.amqp.rabbit.annotation.RabbitHandler; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.amqp.support.AmqpHeaders; -import org.springframework.messaging.Message; -import org.springframework.stereotype.Component; - -/** - * @Desc: "PID00B设备 接收MQ消息 监听类" - * @Author: caixiang - * @DATE: 2021/6/22 15:30 - * - * Ctrl+R 替换设备名 - */ -@Component -public class PID00CReceived { - - @RabbitListener(queues = ConfigOf00C.EAP_REQUEST_QUEUE_00C) - @RabbitHandler - public void eapRequest(Message message, Channel channel)throws Exception{ - - System.out.println("==============received message-EAP_REQUEST_QUEUE_00C=================,priority:"+"equipmentName"+message.getHeaders().get("attr2")); - Thread.sleep(100); - - String correlationId = (String)message.getHeaders().get("spring_returned_message_correlation"); - Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); - - - - - //模拟异常 ,然后 拒签消息 ,然后丢到死信队列 - try { - System.err.println("处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); - throw new Exception("11111"); - }catch (Exception e){ - // 第一个false 是 不批量签收;第二个false 是 不重回队列 - channel.basicNack(deliveryTag,false,false); - return; - } - - //注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 - //channel.basicAck(deliveryTag,false); - - } - - @RabbitListener(queues = ConfigOf00C.MES_RESPONSE_QUEUE_00C) - @RabbitHandler - public void mesResponse(Message message, Channel channel)throws Exception{ - System.out.println("==============PID00B_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1")); - Thread.sleep(100); - - Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); - - - //模拟异常 ,然后 拒签消息 ,然后丢到死信队列 - try { - System.err.println("处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); - throw new Exception("11111"); - }catch (Exception e){ - // 第一个false 是 不批量签收;第二个false 是 不重回队列 - channel.basicNack(deliveryTag,false,false); - return; - } - - //注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 - //channel.basicAck(deliveryTag,false); - - } - -} diff --git a/src/main/java/com/qgs/dc/mq/controller/MQController.java b/src/main/java/com/qgs/dc/mq/controller/MQController.java index 925cfc7..dae0dfd 100644 --- a/src/main/java/com/qgs/dc/mq/controller/MQController.java +++ b/src/main/java/com/qgs/dc/mq/controller/MQController.java @@ -1,12 +1,8 @@ package com.qgs.dc.mq.controller; -import cn.hutool.http.HttpUtil; -import com.alibaba.fastjson.JSONObject; import com.qgs.dc.mq.Constant.SecsGemTimeout; import com.qgs.dc.mq.entity.MQMessage; import com.qgs.dc.mq.entity.CallbackMessageEntity; -import com.qgs.dc.mq.entity.common.Header; -import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody; import com.qgs.dc.mq.producer.component.RabbitSender; import com.qgs.dc.mq.secsgem.AsyncFuture; import com.qgs.dc.mq.secsgem.SendedList; @@ -18,10 +14,7 @@ import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; - -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; /** @@ -63,6 +56,8 @@ public class MQController { * desc : MES给EAP发送远程指令(MES_Request)(向rabbitmq中发送消息(direct模式)) * MES => DC(数据采集中间件) => MQ => EAP * return :返回 的就是这个指令的回复指令 + * 1 = 成功 + * 其他 = 异常 */ public R mesRequest(@RequestBody CallbackMessageEntity callbackMessageEntity){ try { @@ -85,7 +80,7 @@ public class MQController { return R.ok().put("responseMessage",mqMessageResponse); }catch (Exception e){ logger.error("MES => EAP (MES_Request) , 状态:DC处理异常 , 内容:"+ callbackMessageEntity.toString()); - return R.error().put("result",e.getMessage()); + return R.error(2,e.getMessage()); } } } diff --git a/src/main/java/com/qgs/dc/mq/producer/controller/TestController.java b/src/main/java/com/qgs/dc/mq/producer/controller/TestController.java index 49bfa90..baff447 100644 --- a/src/main/java/com/qgs/dc/mq/producer/controller/TestController.java +++ b/src/main/java/com/qgs/dc/mq/producer/controller/TestController.java @@ -31,36 +31,6 @@ public class TestController { - //For EAP SERVER1 - @GetMapping("/putMessageDirect") - public String putMessageDirect() throws Exception { - Map properties = new HashMap<>(); - properties.put("attr1",String.valueOf(1)); - properties.put("attr2","00B"); - //序列化 - MQMessage mqMessage = new MQMessage(); - - Header header = new Header("Request","Execute","QUERYEQPStatus","12"); - header.setTransactionId("abc123456"); - QueryEQStatusBody queryEQStatusBody = new QueryEQStatusBody(); - queryEQStatusBody.setVidType("u4"); - List vids = new ArrayList<>(); - vids.add("10000"); - vids.add("10001"); - vids.add("10002"); - queryEQStatusBody.setVidList(vids); - mqMessage.setBody(JSONObject.toJSONBytes(queryEQStatusBody)); - mqMessage.setHeader(header); - - - - //rabbitSender.sendDirect(mqMessage,properties,"00B_Exchange","00B_EAP_Request_Queue_RoutingKey","6000"); - rabbitSender.sendDirect(mqMessage,properties,"00B_Exchange","00B_MES_Response_Queue_RoutingKey"); - - Thread.sleep(30); - return null; - } - public static void main(String[] args) throws IOException { Map properties = new HashMap<>(); properties.put("attr1",String.valueOf(1)); diff --git a/src/main/java/com/qgs/dc/opcua/controller/OperateController.java b/src/main/java/com/qgs/dc/opcua/controller/OperateController.java index 4c52d78..3990e15 100644 --- a/src/main/java/com/qgs/dc/opcua/controller/OperateController.java +++ b/src/main/java/com/qgs/dc/opcua/controller/OperateController.java @@ -144,7 +144,7 @@ public class OperateController { return R.error().put("result",readArgEntity.getPlcName()+" 这台plc不存在 / 或参数异常"); } Object value = dv.getValue().getValue(); - System.out.println(CommonFunction.judgeVarType(value)); + logger.info(CommonFunction.judgeVarType(value)); return R.ok().put("result", CommonFunction.var(value)); }catch (Exception e){ return R.error().put("result", e.getMessage()); @@ -178,7 +178,7 @@ public class OperateController { System.err.println("(测试是否 每隔intervel 都会执行这个回调方法) subscription value received: item:NodeId : " + "" + item.getReadValueId().getNodeId() + ", value :" + dataValue.getValue()); - System.out.println(); + NodeId currentNode = item.getReadValueId().getNodeId(); UShort namespaceIndex = currentNode.getNamespaceIndex(); Object identifier = currentNode.getIdentifier(); diff --git a/src/main/java/com/qgs/dc/opcua/selfunion/UAService.java b/src/main/java/com/qgs/dc/opcua/selfunion/UAService.java index 1520f48..1ebd6a8 100644 --- a/src/main/java/com/qgs/dc/opcua/selfunion/UAService.java +++ b/src/main/java/com/qgs/dc/opcua/selfunion/UAService.java @@ -832,6 +832,9 @@ public class UAService { * 注意: * 1.同时监听多个变量,只要有一个变量发生改变了,就把调用回调函数并且把那个改变了的变量通过websocket发给前端(如果两个同时改变也不是同时发给前端的是一个一个发的)。因为不同变量有不同的监视器,而监视器他们自己会调回调函数 * 2.如果你要新订阅的变量已经存在了,那么就什么也不错(因为加了个过滤器) + * 3.samplingInterval 和 publishingInterval 的区别 + * ① The sampling interval is the rate at which you want the server to sample/poll/monitor an item at. Items in a subscription may come from varying sources and each can be sampled at its own rate. + * ② The publishing interval is the rate at which the subscription "executes" and reports any items that have changed. It also influences the calculation that determines how often a keep alive response should be returned should there be no changing data. * 参数: * 回调函数BiConsumer,是当你订阅的这个变量当变量发生 改变的时候 你执行的方法(刚开始 会执行一次) * 返回值: @@ -846,8 +849,9 @@ public class UAService { * 如果定义为Sampling,则这个Subscription是一个Triggered Item,即被激发的订阅,需要一个定义为Reporting的Subscription(称为Triggering Item) * 与它连接。这样当Triggering Item更新时,会激发Triggered Item更新。 * + * * */ - public synchronized Integer subscribeForVisit(List listNameSpace,List listIdentifier,Double listenTimeInterval,UaMonitoredItem.ValueConsumer biConsumer,String plcName) throws Exception { + public synchronized Integer subscribeForVisit(List listNameSpace,List listIdentifier,Double samplingInterval,UaMonitoredItem.ValueConsumer biConsumer,String plcName) throws Exception { if(!isValid(plcName)){ return null; } @@ -875,6 +879,7 @@ public class UAService { //SubscribeEntity 不存在的时候新建一个SubscribeEntity,并且UaSubscription也要新建一个 if(currentNow == null){ try { + subscription = client.getSubscriptionManager().createSubscription(1000.0).get(); }catch (Exception e){ throw new Exception("在 subscribeForVisit 的时候出现异常,,具体异常是: "+e.getMessage()); @@ -916,7 +921,7 @@ public class UAService { MonitoringParameters parameters = new MonitoringParameters( clientHandle, - listenTimeInterval, // sampling interval + samplingInterval, // sampling interval null, // filter, null means use default uint(10), // queue size true // discard oldest diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 18b9e72..71090d0 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -39,7 +39,7 @@ - ${logging.pathwork}/work-log-debug.log + ${logging.pathwork}/debug/work-log-debug.log %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n @@ -79,7 +79,7 @@ - ${logging.pathopc}/opcua-log-debug.log + ${logging.pathopc}/debug/opcua-log-debug.log %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n @@ -106,7 +106,7 @@ - ${logging.pathmq}/mq-log-debug.log + ${logging.pathmq}/debug/mq-log-debug.log %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n @@ -142,7 +142,7 @@ - ${logging.pathwork}/work-log-info.log + ${logging.pathwork}/info/work-log-info.log %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n @@ -176,7 +176,7 @@ - ${logging.pathopc}/opcua-log-info.log + ${logging.pathopc}/info/opcua-log-info.log %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n @@ -210,7 +210,7 @@ - ${logging.pathmq}/mq-log-info.log + ${logging.pathmq}/info/mq-log-info.log %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n @@ -246,7 +246,7 @@ - ${logging.pathwork}/work-log-warn.log + ${logging.pathwork}/warn/work-log-warn.log %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n @@ -280,7 +280,7 @@ - ${logging.pathopc}/opcua-log-warn.log + ${logging.pathopc}/warn/opcua-log-warn.log %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n @@ -312,7 +312,7 @@ - ${logging.pathmq}/mq-log-warn.log + ${logging.pathmq}/warn/mq-log-warn.log %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n @@ -347,7 +347,7 @@ - ${logging.pathwork}/work-log-error.log + ${logging.pathwork}/error/work-log-error.log %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n @@ -382,7 +382,7 @@ - ${logging.pathopc}/opcua-log-error.log + ${logging.pathopc}/error/opcua-log-error.log %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n @@ -416,7 +416,7 @@ - ${logging.pathmq}/mq-log-error.log + ${logging.pathmq}/error/mq-log-error.log %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n