diff --git a/src/main/java/com/qgs/dc/common/utils/CommonFunction.java b/src/main/java/com/qgs/dc/common/utils/CommonFunction.java index d41d71e..b6ccefb 100644 --- a/src/main/java/com/qgs/dc/common/utils/CommonFunction.java +++ b/src/main/java/com/qgs/dc/common/utils/CommonFunction.java @@ -1,5 +1,6 @@ package com.qgs.dc.common.utils; + import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.qgs.dc.mq.entity.MQMessage; @@ -11,6 +12,9 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.*; import org.springframework.messaging.Message; import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.io.Writer; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; @@ -153,12 +157,97 @@ public class CommonFunction { */ public static MQMessage parse(Message message){ byte[] bytes =(byte[]) message.getPayload(); - String s = new String(bytes); MQMessage mqMessage = JSONObject.parseObject(bytes, MQMessage.class); return mqMessage; } + public static String unescapeJava(String str) throws IOException + { + Writer out = new StringWriter(); + if (str != null) + { + int sz = str.length(); + StringBuilder unicode = new StringBuilder(4); + boolean hadSlash = false; + boolean inUnicode = false; + + for (int i = 0; i < sz; ++i) + { + char ch = str.charAt(i); + if (inUnicode) + { + unicode.append(ch); + if (unicode.length() == 4) + { + try + { + int nfe = Integer.parseInt(unicode.toString(), 16); + out.write((char) nfe); + unicode.setLength(0); + inUnicode = false; + hadSlash = false; + } + catch (NumberFormatException var9) + { + } + } + } + else if (hadSlash) + { + hadSlash = false; + switch (ch) + { + case '\"': + out.write(34); + break; + case '\'': + out.write(39); + break; + case '\\': + out.write(92); + break; + case 'b': + out.write(8); + break; + case 'f': + out.write(12); + break; + case 'n': + out.write(10); + break; + case 'r': + out.write(13); + break; + case 't': + out.write(9); + break; + case 'u': + inUnicode = true; + break; + default: + out.write(ch); + } + } + else if (ch == 92) + { + hadSlash = true; + } + else + { + out.write(ch); + } + } + + if (hadSlash) + { + out.write(92); + } + + } + return out.toString(); + } + /** * 解析body diff --git a/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID10_1.java b/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID10_1.java index f6d27b5..2696dba 100644 --- a/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID10_1.java +++ b/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID10_1.java @@ -71,7 +71,7 @@ public class ConfigOfPID10_1 { //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); return queue; } - + @Bean public Binding bindExchangeAndQueueA_PID10_1(){ diff --git a/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID2.java b/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID2.java new file mode 100644 index 0000000..900e98d --- /dev/null +++ b/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID2.java @@ -0,0 +1,98 @@ +package com.qgs.dc.mq.configuration; + + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @Desc: "设备:PID2 相关信息定义" + * @Author: caixiang + * @DATE: 2021/6/7 9:11 + */ +@Configuration +public class ConfigOfPID2 { + + //水平扩展其他设备的时候 只要:control+R 然后 PID2=>00C 然后replace all + public static final String EQUIPMENT_NAME_PID2 = "PID2"; + + public static final String EXCHANGE_NAME_PID2 = EQUIPMENT_NAME_PID2 +"_Exchange"; + public static final String EAP_REQUEST_QUEUE_PID2 = EQUIPMENT_NAME_PID2 +"_EAP_Request_Queue"; + public static final String EAP_RESPONSE_QUEUE_PID2 = EQUIPMENT_NAME_PID2 +"_EAP_Response_Queue"; + public static final String MES_REQUEST_QUEUE_PID2 = EQUIPMENT_NAME_PID2 +"_MES_Request_Queue"; + public static final String MES_RESPONSE_QUEUE_PID2 = EQUIPMENT_NAME_PID2 +"_MES_Response_Queue"; + public static final String EAP_REQUEST_QUEUE_ROUTINGKEY_PID2 = EQUIPMENT_NAME_PID2 +"_EAP_Request_Queue_RoutingKey"; + public static final String EAP_RESPONSE_QUEUE_ROUTINGKEY_PID2 = EQUIPMENT_NAME_PID2 +"_EAP_Response_Queue_RoutingKey"; + public static final String MES_REQUEST_QUEUE_ROUTINGKEY_PID2 = EQUIPMENT_NAME_PID2 +"_MES_Request_Queue_RoutingKey"; + public static final String MES_RESPONSE_QUEUE_ROUTINGKEY_PID2 = EQUIPMENT_NAME_PID2 +"_MES_Response_Queue_RoutingKey"; + + + @Bean + public DirectExchange EXCHANGE_NAME_PID2(){ + return new DirectExchange(EXCHANGE_NAME_PID2); + } + + //todo + @Bean + public Queue MES_REQUEST_QUEUE_PID2(){ + Queue queue = new Queue(MES_REQUEST_QUEUE_PID2); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + } + @Bean + public Queue MES_RESPONSE_QUEUE_PID2(){ + Queue queue = new Queue(MES_RESPONSE_QUEUE_PID2); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + } + + + @Bean + public Queue EAP_REQUEST_QUEUE_PID2(){ + Queue queue = new Queue(EAP_REQUEST_QUEUE_PID2); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + + } + @Bean + public Queue EAP_RESPONSE_QUEUE_PID2(){ + Queue queue = new Queue(EAP_RESPONSE_QUEUE_PID2); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + } + + + @Bean + public Binding bindExchangeAndQueueA_PID2(){ + return BindingBuilder.bind(EAP_REQUEST_QUEUE_PID2()).to(EXCHANGE_NAME_PID2()) + .with(EAP_REQUEST_QUEUE_ROUTINGKEY_PID2); + } + @Bean + public Binding bindExchangeAndQueueB_PID2(){ + return BindingBuilder.bind(EAP_RESPONSE_QUEUE_PID2()).to(EXCHANGE_NAME_PID2()) + .with(EAP_RESPONSE_QUEUE_ROUTINGKEY_PID2); + } + + @Bean + public Binding bindExchangeAndQueueC_PID2(){ + return BindingBuilder.bind(MES_REQUEST_QUEUE_PID2()).to(EXCHANGE_NAME_PID2()) + .with(MES_REQUEST_QUEUE_ROUTINGKEY_PID2); + } + + @Bean + public Binding bindExchangeAndQueueD_PID2(){ + return BindingBuilder.bind(MES_RESPONSE_QUEUE_PID2()).to(EXCHANGE_NAME_PID2()) + .with(MES_RESPONSE_QUEUE_ROUTINGKEY_PID2); + } +} \ No newline at end of file diff --git a/src/main/java/com/qgs/dc/mq/consumer/PID00BReceived.java b/src/main/java/com/qgs/dc/mq/consumer/PID00BReceived.java index e35ed8d..b3d1787 100644 --- a/src/main/java/com/qgs/dc/mq/consumer/PID00BReceived.java +++ b/src/main/java/com/qgs/dc/mq/consumer/PID00BReceived.java @@ -45,6 +45,7 @@ public class PID00BReceived { MQMessageHandler mqMessageHandler; + //这个队列监听EAP 推送过来的Event,然后dc 把这个Event 转发给Mes //need @RabbitListener(queues = ConfigOf00B.EAP_REQUEST_QUEUE_00B) @@ -77,7 +78,6 @@ public class PID00BReceived { } //2.模拟异常 ,然后 拒签消息 ,然后丢到死信队列 - //throw new Exception("11111"); }catch (Exception e){ // 第一个false 是 不批量签收;第二个false 是 不重回队列 logger.error(e.getMessage()); @@ -143,6 +143,8 @@ public class PID00BReceived { } + + //这种情况:指的是 MES发出远程指令,然后eap处理完后,把结果丢到mesResponse队列 通知MES进行收尾处理。 @RabbitHandler public void mesResponse(Message message, Channel channel)throws Exception{ diff --git a/src/main/java/com/qgs/dc/mq/consumer/PID10_1Received.java b/src/main/java/com/qgs/dc/mq/consumer/PID10_1Received.java index d5bbb66..6c19f6e 100644 --- a/src/main/java/com/qgs/dc/mq/consumer/PID10_1Received.java +++ b/src/main/java/com/qgs/dc/mq/consumer/PID10_1Received.java @@ -9,8 +9,11 @@ import com.qgs.dc.mq.configuration.ConfigOfPID10_1; import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; import com.qgs.dc.mq.entity.MQMessage; import com.qgs.dc.mq.entity.common.Header; +import com.qgs.dc.mq.entity.common.Returns; import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody; +import com.qgs.dc.mq.producer.component.RabbitSender; import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Return; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; @@ -25,18 +28,19 @@ import java.util.ArrayList; import java.util.List; /** - * @Desc: "PIDPID10_1设备 接收MQ消息 监听类" + * @Desc: "PID10_1设备 接收MQ消息 监听类" * @Author: caixiang * @DATE: 2021/6/22 15:30 * * Ctrl+R 替换设备名 */ @Component -public class PID10_1Received { +public class PID10_1Received{ private static final Logger logger = LoggerFactory.getLogger(PID10_1Received.class); - + @Autowired + RabbitSender rabbitSender; @Autowired MQMessageHandler mqMessageHandler; @@ -45,10 +49,10 @@ public class PID10_1Received { @RabbitHandler public void eapRequest(Message message, Channel channel)throws Exception{ - logger.info("==============received message-EAP_REQUEST_QUEUE_PID10_1=================,priority:"+"equipmentName"+message.getHeaders().get("attr2")); + logger.info("==============received message-EAP_REQUEST_QUEUE_PID10_1================="); Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); -// //MQMessage 中的 transactionId + //MQMessage 中的 transactionId // String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation"); // //logger.info("transactionId:"+transactionId); @@ -56,22 +60,29 @@ public class PID10_1Received { MQMessage mqMessage = CommonFunction.parse(message); String transactionId = mqMessage.getHeader().getTransactionId(); - //1. 正常情况 - //Integer integer = mqMessageHandler.requestHandler(message); - String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage)); - Integer result = Integer.valueOf(integer); + System.out.println("收到的消息是: "+mqMessage.toString()); + System.out.println(); + //todo reply + reply(mqMessage); - //注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 - if(result == 1){ - logger.info("在 EAP_REQUEST_QUEUE_PID10_1 队列中,transitionId"+transactionId+", 这条消息处理成功"); - channel.basicAck(deliveryTag,false); - }else { - logger.error("在 EAP_REQUEST_QUEUE_PID10_1 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); - channel.basicNack(deliveryTag,false,false); - } - //2.模拟异常 ,然后 拒签消息 ,然后丢到死信队列 +// //回调给MES --- 开始 +// //1. 正常情况 +// String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage)); +// Integer result = Integer.valueOf(integer); +// +// //注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 +// if(result == 1){ +// logger.info("在 EAP_REQUEST_QUEUE_PID4B 队列中,transitionId"+transactionId+", 这条消息处理成功"); +// channel.basicAck(deliveryTag,false); +// }else { +// logger.error("在 EAP_REQUEST_QUEUE_PID4B 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); +// channel.basicNack(deliveryTag,false,false); +// } +// //回调给MES --- 结束 - //throw new Exception("11111"); + + + channel.basicAck(deliveryTag,false); }catch (Exception e){ // 第一个false 是 不批量签收;第二个false 是 不重回队列 logger.error(e.getMessage()); @@ -79,10 +90,57 @@ public class PID10_1Received { return; } - - } + public void reply(MQMessage request) throws Exception { + MQMessage response = new MQMessage(); + + Header header = request.getHeader(); + header.setFrom("mes"); + header.setTo("eap"); + header.setMessageType("Response"); + header.setSendTimestamp(CommonFunction.getNowDate(1)); + Returns aReturn = new Returns("0","0"); + + response.setHeader(header); + response.setReturns(aReturn); + + rabbitSender.sendDirect(response,ConfigOfPID10_1.EXCHANGE_NAME_PID10_1,ConfigOfPID10_1.EAP_RESPONSE_QUEUE_ROUTINGKEY_PID10_1); + System.out.println("回复的消息是: "+response.toString()); + } + + + +// @RabbitListener(queues = ConfigOfPID10_1.MES_REQUEST_QUEUE_PID10_1) +// @RabbitHandler +// public void mesRequest(Message message, Channel channel)throws Exception{ +// +// logger.info("==============received message-EAP_REQUEST_QUEUE_PID10_1================="); +// +// Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); +// //MQMessage 中的 transactionId +//// String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation"); +//// //logger.info("transactionId:"+transactionId); +// +// try { +// System.out.println("message : "+message.toString()); +// MQMessage mqMessage = CommonFunction.parse(message); +// String transactionId = mqMessage.getHeader().getTransactionId(); +// System.out.println("收到的消息是: "+mqMessage.toString()); +// System.out.println(); +// logger.info(mqMessage.toString()); +// channel.basicAck(deliveryTag,false); +// System.out.println(mqMessage.toString()); +// }catch (Exception e){ +// // 第一个false 是 不批量签收;第二个false 是 不重回队列 +// logger.error(e.getMessage()); +// channel.basicNack(deliveryTag,false,false); +// return; +// } +// +// } + + public static void main(String[] args) { //localhost:8001 diff --git a/src/main/java/com/qgs/dc/mq/consumer/PID18Received.java b/src/main/java/com/qgs/dc/mq/consumer/PID18Received.java index 9328e17..b726ec2 100644 --- a/src/main/java/com/qgs/dc/mq/consumer/PID18Received.java +++ b/src/main/java/com/qgs/dc/mq/consumer/PID18Received.java @@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.qgs.dc.common.utils.CommonFunction; import com.qgs.dc.mq.Constant.Constant; +import com.qgs.dc.mq.configuration.ConfigOfPID13S; import com.qgs.dc.mq.configuration.ConfigOfPID18; import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; import com.qgs.dc.mq.entity.MQMessage; @@ -61,6 +62,8 @@ public class PID18Received { String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage)); Integer result = Integer.valueOf(integer); + + //注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 if(result == 1){ logger.info("在 EAP_REQUEST_QUEUE_PID18 队列中,transitionId"+transactionId+", 这条消息处理成功"); diff --git a/src/main/java/com/qgs/dc/mq/consumer/PID2Received.java b/src/main/java/com/qgs/dc/mq/consumer/PID2Received.java new file mode 100644 index 0000000..6c541c7 --- /dev/null +++ b/src/main/java/com/qgs/dc/mq/consumer/PID2Received.java @@ -0,0 +1,167 @@ +package com.qgs.dc.mq.consumer; + +import cn.hutool.http.HttpUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.qgs.dc.common.utils.CommonFunction; +import com.qgs.dc.mq.Constant.Constant; +import com.qgs.dc.mq.configuration.ConfigOfPID2; +import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; +import com.qgs.dc.mq.entity.MQMessage; +import com.qgs.dc.mq.entity.common.Header; +import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody; +import com.rabbitmq.client.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +/** + * @Desc: "PID2设备 接收MQ消息 监听类" + * @Author: caixiang + * @DATE: 2021/6/22 15:30 + * + * Ctrl+R 替换设备名 + */ +@Component +public class PID2Received { + + private static final Logger logger = LoggerFactory.getLogger(PID2Received.class); + + + + @Autowired + MQMessageHandler mqMessageHandler; + + @RabbitListener(queues = ConfigOfPID2.EAP_REQUEST_QUEUE_PID2) + @RabbitHandler + public void eapRequest(Message message, Channel channel)throws Exception{ + + logger.info("==============received message-EAP_REQUEST_QUEUE_PID2================="); + + Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); + //MQMessage 中的 transactionId +// String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation"); +// //logger.info("transactionId:"+transactionId); + + try { + + MQMessage mqMessage = CommonFunction.parse(message); + String transactionId = mqMessage.getHeader().getTransactionId(); + //1. 正常情况 + String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage)); + Integer result = Integer.valueOf(integer); + + //注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 + if(result == 1){ + logger.info("在 EAP_REQUEST_QUEUE_PID2 队列中,transitionId"+transactionId+", 这条消息处理成功"); + channel.basicAck(deliveryTag,false); + }else { + logger.error("在 EAP_REQUEST_QUEUE_PID2 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); + channel.basicNack(deliveryTag,false,false); + } + //2.模拟异常 ,然后 拒签消息 ,然后丢到死信队列 + }catch (Exception e){ + // 第一个false 是 不批量签收;第二个false 是 不重回队列 + logger.error(e.getMessage()); + channel.basicNack(deliveryTag,false,false); + return; + } + + + + } + + + public static void main(String[] args) { + //localhost:8001 + + + MQMessage mqMessage = new MQMessage(); + + Header header = new Header("Request","Execute","QUERYEQPStatus","12"); + QueryEQStatusBody queryEQStatusBody = new QueryEQStatusBody(); + queryEQStatusBody.setVidType("u4"); + List vids = new ArrayList<>(); + vids.add("10000"); + vids.add("10001"); + vids.add("10002"); + queryEQStatusBody.setVidList(vids); + + //queryEQStatusBodys => {"vidList":["10000","10001","10002"],"vidType":"u4"} + //bytes => [123, 34, 118, 105, 100, 76, 105, 115, 116, 34, 58, 91, 34, 49, 48, 48, 48, 48, 34, 44, 34, 49, 48, 48, 48, 49, 34, 44, 34, 49, 48, 48, 48, 50, 34, 93, 44, 34, 118, 105, 100, 84, 121, 112, 101, 34, 58, 34, 117, 52, 34, 125] + //"body": "eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==" + //message => {"header":{"transactionId":"12_20210908141555_d7d88","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 14:15:56","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} + //new String(bytes) : {"vidType":"u4","vidList":["10000","10001","10002"]} + String queryEQStatusBodys = JSONObject.toJSONString(queryEQStatusBody); + + + byte[] bytes = JSONObject.toJSONBytes(queryEQStatusBody); + System.out.println("myString :"+ new String(bytes)); + mqMessage.setBody(queryEQStatusBodys); + mqMessage.setHeader(header); + + //{"header":{"transactionId":"12_20210908094750_d62d4","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 09:47:50","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} + String s = JSONObject.toJSONString(mqMessage); + MQMessage mqMessagessss = JSONObject.parseObject(s.getBytes(StandardCharsets.UTF_8), MQMessage.class); + System.out.println("mqMessagessss : " + mqMessagessss.toString()); + System.out.println(s); + //String result= HttpUtil.post("localhost:8001/receivedFromEapRequest",s); + + //{"header":{"transactionId":"PIDPID2_20210908093729_95196","messageType":"Request","messageCategory":"Alarm","messageName":"AlarmTimeOut","from":"EAP","to":"MES","equipmentId":"PIDPID2","sendTimestamp":"2021-09-08 09:37:29"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} + String s22 = "{\"header\":{\"transactionId\":\"PIDPID2_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID2\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":\"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==\"}"; + + String s2 = "{\"header\":{\"transactionId\":\"PIDPID2_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID2\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":{\"TransactionId:\":\"PIDPID2_20210908093644_71835\",\"MessageName:\":\"TraceData\"}}"; + JSONObject jsonObject = JSON.parseObject(s2); + String header1 = jsonObject.getString("header"); + String body = jsonObject.getString("body"); + String returns = jsonObject.getString("returns"); + System.out.println("header: "+header1); + System.out.println("body: "+body); + System.out.println("returns: "+returns); + + byte[] bytes2 = s2.getBytes(); + MQMessage mqMessage2 = JSONObject.parseObject(bytes2, MQMessage.class); + System.out.println(mqMessage2.toString()); + } + + @RabbitListener(queues = ConfigOfPID2.MES_RESPONSE_QUEUE_PID2) + @RabbitHandler + public void mesResponse(Message message, Channel channel)throws Exception{ + logger.info("==============PIDPID2_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1")); + Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); + + //模拟异常 ,然后 拒签消息 ,然后丢到死信队列 + try { + MQMessage mqMessage = CommonFunction.parse(message); + String transactionId = mqMessage.getHeader().getTransactionId(); + Integer result = mqMessageHandler.responseHandler(message); + if(result == 1){ + logger.info("在 MES_RESPONSE_QUEUE_PID2 队列中 , transitionId"+transactionId+" 这条消息处理成功"); + channel.basicAck(deliveryTag,false); + }else { + logger.error("在 MES_RESPONSE_QUEUE_PID2 队列中 ,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); + channel.basicNack(deliveryTag,false,false); + } + + //throw new Exception("11111"); + }catch (Exception e){ + // 第一个false 是 不批量签收;第二个false 是 不重回队列 + channel.basicNack(deliveryTag,false,false); + return; + } + + //注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 + //channel.basicAck(deliveryTag,false); + + } + +} diff --git a/src/main/java/com/qgs/dc/mq/consumer/PID4BReceived.java b/src/main/java/com/qgs/dc/mq/consumer/PID4BReceived.java index 4406ba0..a30205c 100644 --- a/src/main/java/com/qgs/dc/mq/consumer/PID4BReceived.java +++ b/src/main/java/com/qgs/dc/mq/consumer/PID4BReceived.java @@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.qgs.dc.common.utils.CommonFunction; import com.qgs.dc.mq.Constant.Constant; +import com.qgs.dc.mq.configuration.ConfigOfPID13S; import com.qgs.dc.mq.configuration.ConfigOfPID4B; import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; import com.qgs.dc.mq.entity.MQMessage; diff --git a/src/main/java/com/qgs/dc/mq/controller/MQController.java b/src/main/java/com/qgs/dc/mq/controller/MQController.java index 271a1a3..fd6518a 100644 --- a/src/main/java/com/qgs/dc/mq/controller/MQController.java +++ b/src/main/java/com/qgs/dc/mq/controller/MQController.java @@ -68,12 +68,7 @@ public class MQController { String routingKey = callbackMessageEntity.getRoutingKey(); logger.info("MES => EAP (MES_Request), 状态:DC已收到 , 内容:"+ callbackMessageEntity.toString()); - //properties 这里的参数是写在MQ消息 header里面的,如果EAP端 需要某些参数 可以写在这里,eap去取更方便一些 - Map properties = new HashMap<>(); - properties.put("equipmentName","PID001"); - properties.put("transitionId",transitionId); - //rabbitSender.sendDirect(mqMessage,properties,exchangeName,routingKey,"6000"); - rabbitSender.sendDirect(mqMessage,properties,exchangeName,routingKey); + rabbitSender.sendDirect(mqMessage,exchangeName,routingKey); logger.info("MES => EAP (MES_Request) , 状态:DC已发送给MQ , 内容:"+ callbackMessageEntity.toString()); AsyncFuture add = SendedList.add(transitionId,mqMessage); @@ -84,4 +79,30 @@ public class MQController { return R.error(2,e.getMessage()); } } + + @PostMapping("/mesRequestNoW") + /** + * desc : MES给EAP发送远程指令(MES_Request)(向rabbitmq中发送消息(direct模式)) + * MES => DC(数据采集中间件) => MQ => EAP + * return :返回 的就是这个指令的回复指令 + * 1 = 成功 + * 其他 = 异常 + */ + public R mesRequestNoW(@RequestBody CallbackMessageEntity callbackMessageEntity){ + try { + String transitionId = callbackMessageEntity.getMqMessage().getHeader().getTransactionId(); + MQMessage mqMessage = callbackMessageEntity.getMqMessage(); + String exchangeName = callbackMessageEntity.getExchangeName(); + String routingKey = callbackMessageEntity.getRoutingKey(); + + logger.info("MES => EAP (MES_Request), 状态:DC已收到 , 内容:"+ callbackMessageEntity.toString()); + rabbitSender.sendDirect(mqMessage,exchangeName,routingKey); + logger.info("MES => EAP (MES_Request) , 状态:DC已发送给MQ , 内容:"+ callbackMessageEntity.toString()); + + return R.ok(); + }catch (Exception e){ + logger.error("MES => EAP (MES_Request) , 状态:DC处理异常 , 内容:"+ callbackMessageEntity.toString()); + return R.error(2,e.getMessage()); + } + } } diff --git a/src/main/java/com/qgs/dc/mq/entity/MQMessage.java b/src/main/java/com/qgs/dc/mq/entity/MQMessage.java index 79f4036..687fde5 100644 --- a/src/main/java/com/qgs/dc/mq/entity/MQMessage.java +++ b/src/main/java/com/qgs/dc/mq/entity/MQMessage.java @@ -1,6 +1,7 @@ package com.qgs.dc.mq.entity; import cn.hutool.http.HttpUtil; +import com.alibaba.fastjson.annotation.JSONField; import com.alibaba.fastjson.annotation.JSONType; import com.qgs.dc.mq.entity.common.Header; import com.qgs.dc.mq.entity.common.Returns; @@ -14,9 +15,14 @@ import java.util.HashMap; * @DATE: 2021/8/12 15:38 */ @Data -@JSONType(orders={"header","body","returns"}) +@JSONType(orders={"header","body","return"}) public class MQMessage { private Header header; + @JSONField(jsonDirect = true) private String body; + //在序列化和反序列化 的时候 都会把 json 里面的return字段转成 object 里面的returns字段 + @JSONField(name = "return") private Returns returns; + + } \ No newline at end of file diff --git a/src/main/java/com/qgs/dc/mq/entity/common/Header.java b/src/main/java/com/qgs/dc/mq/entity/common/Header.java index 3d94aa4..a99ad1f 100644 --- a/src/main/java/com/qgs/dc/mq/entity/common/Header.java +++ b/src/main/java/com/qgs/dc/mq/entity/common/Header.java @@ -37,6 +37,7 @@ public class Header { this.to = "eap"; } + public String getTransactionId() { return transactionId; } diff --git a/src/main/java/com/qgs/dc/mq/entity/common/Returns.java b/src/main/java/com/qgs/dc/mq/entity/common/Returns.java index 4d48826..247889d 100644 --- a/src/main/java/com/qgs/dc/mq/entity/common/Returns.java +++ b/src/main/java/com/qgs/dc/mq/entity/common/Returns.java @@ -1,5 +1,6 @@ package com.qgs.dc.mq.entity.common; +import com.alibaba.fastjson.annotation.JSONField; import com.alibaba.fastjson.annotation.JSONType; import lombok.Data; @@ -9,29 +10,15 @@ import lombok.Data; * @DATE: 2021/8/12 15:35 */ @Data -@JSONType(orders={"returnCode","ReasonCode"}) +@JSONType(orders={"ReturnCode","ReasonCode"}) public class Returns { - private String returnCode; + @JSONField(name = "ReturnCode") + private String ReturnCode; + @JSONField(name = "ReasonCode") private String ReasonCode; - public String getReturnCode() { - return returnCode; - } - - public void setReturnCode(String returnCode) { - this.returnCode = returnCode; - } - - public String getReasonCode() { - return ReasonCode; - } - - public void setReasonCode(String reasonCode) { - ReasonCode = reasonCode; - } - public Returns(String returnCode, String reasonCode) { - this.returnCode = returnCode; + ReturnCode = returnCode; ReasonCode = reasonCode; } } diff --git a/src/main/java/com/qgs/dc/mq/producer/component/RabbitSender.java b/src/main/java/com/qgs/dc/mq/producer/component/RabbitSender.java index 1994d62..07c4454 100644 --- a/src/main/java/com/qgs/dc/mq/producer/component/RabbitSender.java +++ b/src/main/java/com/qgs/dc/mq/producer/component/RabbitSender.java @@ -3,6 +3,8 @@ package com.qgs.dc.mq.producer.component; import com.alibaba.fastjson.JSONObject; import com.qgs.dc.common.utils.CommonFunction; import com.qgs.dc.mq.entity.MQMessage; +import com.qgs.dc.mq.entity.common.Header; +import com.qgs.dc.mq.entity.common.Returns; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,6 +19,7 @@ import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; +import java.sql.SQLOutput; import java.util.Map; import java.util.Random; import java.util.UUID; @@ -190,20 +193,29 @@ public class RabbitSender { cd ); } - public void sendDirect(MQMessage mqMessage, Map properties, String exchange, String routingKey) throws Exception{ + + + + public void sendDirect(MQMessage mqMessage, String exchange, String routingKey) throws Exception{ + byte[] message = JSONObject.toJSONBytes(mqMessage); - MessageHeaders messageHeaders = new MessageHeaders(properties); - Message msg = MessageBuilder.createMessage(message,messageHeaders); - //String transitionId = mqMessage.getHeader().getEquipmentId()+"_"+CommonFunction.getNowDate(2)+"_"+CommonFunction.getUUID(10); + String transitionId = mqMessage.getHeader().getTransactionId(); CorrelationData cd = new CorrelationData(transitionId); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnsCallback(returnsCallback); - rabbitTemplate.convertAndSend(exchange, - routingKey, - msg, - cd - ); + //rabbitTemplate.send 此方法发送的消息 是不含java相关信息的,rabbitTemplate.convertAndSend是包含java相关信息的 + org.springframework.amqp.core.Message mqsMessage = new org.springframework.amqp.core.Message(message); + rabbitTemplate.send(exchange,routingKey,mqsMessage,cd); + +// MessageHeaders messageHeaders = new MessageHeaders(properties); +// Message msg = MessageBuilder.createMessage(message,messageHeaders);Message msg = MessageBuilder.createMessage(message,messageHeaders); +// rabbitTemplate.convertAndSend(exchange, +// routingKey, +// msg, +// cd +// ); + } diff --git a/src/main/java/com/qgs/dc/s7/my/s7connector/api/DaveArea.java b/src/main/java/com/qgs/dc/s7/my/s7connector/api/DaveArea.java index e335a80..26889c9 100644 --- a/src/main/java/com/qgs/dc/s7/my/s7connector/api/DaveArea.java +++ b/src/main/java/com/qgs/dc/s7/my/s7connector/api/DaveArea.java @@ -20,10 +20,16 @@ public enum DaveArea { ANALOGOUTPUTS200(7), // System flags of 200 family COUNTER(28), // analog inputs of 200 family COUNTER200(30), // analog outputs of 200 family - DB(0x84), // Peripheral I/O //这个是plc中的不同区,有V区,input区,output区 - DI(0x85), FLAGS(0x83), INPUTS(0x81), LOCAL(0x86), // data blocks - OUTPUTS(0x82), // instance data blocks - P(0x80), // not tested + DB(0x84), // Peripheral I/O //DB块 + DI(0x85), //DBI块 + FLAGS(0x83), //M块 + INPUTS(0x81), //I块 + T(0x1D), //T块ed + C(0x1C), //C块 + + LOCAL(0x86), // data blocks //LD块 + OUTPUTS(0x82), // instance data blocks //Q块 + P(0x80), // not tested //D块 SYSINFO(3), // local of caller SYSTEMFLAGS(5), // S7 counters TIMER(29), // S7 timers diff --git a/src/main/java/com/qgs/dc/s7/my/s7connector/impl/nodave/PDU.java b/src/main/java/com/qgs/dc/s7/my/s7connector/impl/nodave/PDU.java index b209dad..a2418ab 100644 --- a/src/main/java/com/qgs/dc/s7/my/s7connector/impl/nodave/PDU.java +++ b/src/main/java/com/qgs/dc/s7/my/s7connector/impl/nodave/PDU.java @@ -476,6 +476,8 @@ public final class PDU { //0x03:响应数据(Ack-Data),响应0x01的请求; //0x07:自定义数据(Userdata),扩展协议类型 this.mem[this.header + 1] = (byte) type; + + //dlen = data length ( 报文中存在两个字节 ) this.dlen = 0; //plen = param length ( 报文中存在两个字节 ) @@ -608,6 +610,7 @@ public final class PDU { if (this.mem[this.param] != FUNC_WRITE) { return Nodave.RESULT_UNEXPECTED_FUNC; } + if ((this.mem[this.data] == 255)) { res = Nodave.RESULT_OK; } else { diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 8dea78f..afa57ca 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -8,7 +8,7 @@ server: spring: rabbitmq: # 如果是rabbitmq+haproxy+keepalived集群 ,,那么192.168.0.176是haproxy代理的地址(严格来说是keepalived的vip) - addresses: 192.168.0.176:5672 # 新版rabbitmq 版本还未测试 + addresses: 192.168.0.170:5672 # 新版rabbitmq 版本还未测试 #addresses: 172.16.21.133:5672 username: cigs password: cigs