diff --git a/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID13S.java b/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID13S.java new file mode 100644 index 0000000..ffe7043 --- /dev/null +++ b/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID13S.java @@ -0,0 +1,98 @@ +package com.qgs.dc.mq.configuration; + + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @Desc: "设备:PID13S 相关信息定义" + * @Author: caixiang + * @DATE: 2021/6/7 9:11 + */ +@Configuration +public class ConfigOfPID13S { + + //水平扩展其他设备的时候 只要:control+R 然后 PID13S=>00C 然后replace all + public static final String EQUIPMENT_NAME_PID13S = "PID13S"; + + public static final String EXCHANGE_NAME_PID13S = EQUIPMENT_NAME_PID13S +"_Exchange"; + public static final String EAP_REQUEST_QUEUE_PID13S = EQUIPMENT_NAME_PID13S +"_EAP_Request_Queue"; + public static final String EAP_RESPONSE_QUEUE_PID13S = EQUIPMENT_NAME_PID13S +"_EAP_Response_Queue"; + public static final String MES_REQUEST_QUEUE_PID13S = EQUIPMENT_NAME_PID13S +"_MES_Request_Queue"; + public static final String MES_RESPONSE_QUEUE_PID13S = EQUIPMENT_NAME_PID13S +"_MES_Response_Queue"; + public static final String EAP_REQUEST_QUEUE_ROUTINGKEY_PID13S = EQUIPMENT_NAME_PID13S +"_EAP_Request_Queue_RoutingKey"; + public static final String EAP_RESPONSE_QUEUE_ROUTINGKEY_PID13S = EQUIPMENT_NAME_PID13S +"_EAP_Response_Queue_RoutingKey"; + public static final String MES_REQUEST_QUEUE_ROUTINGKEY_PID13S = EQUIPMENT_NAME_PID13S +"_MES_Request_Queue_RoutingKey"; + public static final String MES_RESPONSE_QUEUE_ROUTINGKEY_PID13S = EQUIPMENT_NAME_PID13S +"_MES_Response_Queue_RoutingKey"; + + + @Bean + public DirectExchange EXCHANGE_NAME_PID13S(){ + return new DirectExchange(EXCHANGE_NAME_PID13S); + } + + //todo + @Bean + public Queue MES_REQUEST_QUEUE_PID13S(){ + Queue queue = new Queue(MES_REQUEST_QUEUE_PID13S); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + } + @Bean + public Queue MES_RESPONSE_QUEUE_PID13S(){ + Queue queue = new Queue(MES_RESPONSE_QUEUE_PID13S); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + } + + + @Bean + public Queue EAP_REQUEST_QUEUE_PID13S(){ + Queue queue = new Queue(EAP_REQUEST_QUEUE_PID13S); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + + } + @Bean + public Queue EAP_RESPONSE_QUEUE_PID13S(){ + Queue queue = new Queue(EAP_RESPONSE_QUEUE_PID13S); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + } + + + @Bean + public Binding bindExchangeAndQueueA_PID13S(){ + return BindingBuilder.bind(EAP_REQUEST_QUEUE_PID13S()).to(EXCHANGE_NAME_PID13S()) + .with(EAP_REQUEST_QUEUE_ROUTINGKEY_PID13S); + } + @Bean + public Binding bindExchangeAndQueueB_PID13S(){ + return BindingBuilder.bind(EAP_RESPONSE_QUEUE_PID13S()).to(EXCHANGE_NAME_PID13S()) + .with(EAP_RESPONSE_QUEUE_ROUTINGKEY_PID13S); + } + + @Bean + public Binding bindExchangeAndQueueC_PID13S(){ + return BindingBuilder.bind(MES_REQUEST_QUEUE_PID13S()).to(EXCHANGE_NAME_PID13S()) + .with(MES_REQUEST_QUEUE_ROUTINGKEY_PID13S); + } + + @Bean + public Binding bindExchangeAndQueueD_PID13S(){ + return BindingBuilder.bind(MES_RESPONSE_QUEUE_PID13S()).to(EXCHANGE_NAME_PID13S()) + .with(MES_RESPONSE_QUEUE_ROUTINGKEY_PID13S); + } +} \ No newline at end of file diff --git a/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID18.java b/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID18.java new file mode 100644 index 0000000..6c3e938 --- /dev/null +++ b/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID18.java @@ -0,0 +1,98 @@ +package com.qgs.dc.mq.configuration; + + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @Desc: "设备:PID18 相关信息定义" + * @Author: caixiang + * @DATE: 2021/6/7 9:11 + */ +@Configuration +public class ConfigOfPID18 { + + //水平扩展其他设备的时候 只要:control+R 然后 PID18=>00C 然后replace all + public static final String EQUIPMENT_NAME_PID18 = "PID18"; + + public static final String EXCHANGE_NAME_PID18 = EQUIPMENT_NAME_PID18 +"_Exchange"; + public static final String EAP_REQUEST_QUEUE_PID18 = EQUIPMENT_NAME_PID18 +"_EAP_Request_Queue"; + public static final String EAP_RESPONSE_QUEUE_PID18 = EQUIPMENT_NAME_PID18 +"_EAP_Response_Queue"; + public static final String MES_REQUEST_QUEUE_PID18 = EQUIPMENT_NAME_PID18 +"_MES_Request_Queue"; + public static final String MES_RESPONSE_QUEUE_PID18 = EQUIPMENT_NAME_PID18 +"_MES_Response_Queue"; + public static final String EAP_REQUEST_QUEUE_ROUTINGKEY_PID18 = EQUIPMENT_NAME_PID18 +"_EAP_Request_Queue_RoutingKey"; + public static final String EAP_RESPONSE_QUEUE_ROUTINGKEY_PID18 = EQUIPMENT_NAME_PID18 +"_EAP_Response_Queue_RoutingKey"; + public static final String MES_REQUEST_QUEUE_ROUTINGKEY_PID18 = EQUIPMENT_NAME_PID18 +"_MES_Request_Queue_RoutingKey"; + public static final String MES_RESPONSE_QUEUE_ROUTINGKEY_PID18 = EQUIPMENT_NAME_PID18 +"_MES_Response_Queue_RoutingKey"; + + + @Bean + public DirectExchange EXCHANGE_NAME_PID18(){ + return new DirectExchange(EXCHANGE_NAME_PID18); + } + + //todo + @Bean + public Queue MES_REQUEST_QUEUE_PID18(){ + Queue queue = new Queue(MES_REQUEST_QUEUE_PID18); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + } + @Bean + public Queue MES_RESPONSE_QUEUE_PID18(){ + Queue queue = new Queue(MES_RESPONSE_QUEUE_PID18); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + } + + + @Bean + public Queue EAP_REQUEST_QUEUE_PID18(){ + Queue queue = new Queue(EAP_REQUEST_QUEUE_PID18); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + + } + @Bean + public Queue EAP_RESPONSE_QUEUE_PID18(){ + Queue queue = new Queue(EAP_RESPONSE_QUEUE_PID18); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + } + + + @Bean + public Binding bindExchangeAndQueueA_PID18(){ + return BindingBuilder.bind(EAP_REQUEST_QUEUE_PID18()).to(EXCHANGE_NAME_PID18()) + .with(EAP_REQUEST_QUEUE_ROUTINGKEY_PID18); + } + @Bean + public Binding bindExchangeAndQueueB_PID18(){ + return BindingBuilder.bind(EAP_RESPONSE_QUEUE_PID18()).to(EXCHANGE_NAME_PID18()) + .with(EAP_RESPONSE_QUEUE_ROUTINGKEY_PID18); + } + + @Bean + public Binding bindExchangeAndQueueC_PID18(){ + return BindingBuilder.bind(MES_REQUEST_QUEUE_PID18()).to(EXCHANGE_NAME_PID18()) + .with(MES_REQUEST_QUEUE_ROUTINGKEY_PID18); + } + + @Bean + public Binding bindExchangeAndQueueD_PID18(){ + return BindingBuilder.bind(MES_RESPONSE_QUEUE_PID18()).to(EXCHANGE_NAME_PID18()) + .with(MES_RESPONSE_QUEUE_ROUTINGKEY_PID18); + } +} \ No newline at end of file diff --git a/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID4B.java b/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID4B.java new file mode 100644 index 0000000..75c342b --- /dev/null +++ b/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID4B.java @@ -0,0 +1,98 @@ +package com.qgs.dc.mq.configuration; + + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @Desc: "设备:PID4B 相关信息定义" + * @Author: caixiang + * @DATE: 2021/6/7 9:11 + */ +@Configuration +public class ConfigOfPID4B { + + //水平扩展其他设备的时候 只要:control+R 然后 PID4B=>00C 然后replace all + public static final String EQUIPMENT_NAME_PID4B = "PID4B"; + + public static final String EXCHANGE_NAME_PID4B = EQUIPMENT_NAME_PID4B +"_Exchange"; + public static final String EAP_REQUEST_QUEUE_PID4B = EQUIPMENT_NAME_PID4B +"_EAP_Request_Queue"; + public static final String EAP_RESPONSE_QUEUE_PID4B = EQUIPMENT_NAME_PID4B +"_EAP_Response_Queue"; + public static final String MES_REQUEST_QUEUE_PID4B = EQUIPMENT_NAME_PID4B +"_MES_Request_Queue"; + public static final String MES_RESPONSE_QUEUE_PID4B = EQUIPMENT_NAME_PID4B +"_MES_Response_Queue"; + public static final String EAP_REQUEST_QUEUE_ROUTINGKEY_PID4B = EQUIPMENT_NAME_PID4B +"_EAP_Request_Queue_RoutingKey"; + public static final String EAP_RESPONSE_QUEUE_ROUTINGKEY_PID4B = EQUIPMENT_NAME_PID4B +"_EAP_Response_Queue_RoutingKey"; + public static final String MES_REQUEST_QUEUE_ROUTINGKEY_PID4B = EQUIPMENT_NAME_PID4B +"_MES_Request_Queue_RoutingKey"; + public static final String MES_RESPONSE_QUEUE_ROUTINGKEY_PID4B = EQUIPMENT_NAME_PID4B +"_MES_Response_Queue_RoutingKey"; + + + @Bean + public DirectExchange EXCHANGE_NAME_PID4B(){ + return new DirectExchange(EXCHANGE_NAME_PID4B); + } + + //todo + @Bean + public Queue MES_REQUEST_QUEUE_PID4B(){ + Queue queue = new Queue(MES_REQUEST_QUEUE_PID4B); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + } + @Bean + public Queue MES_RESPONSE_QUEUE_PID4B(){ + Queue queue = new Queue(MES_RESPONSE_QUEUE_PID4B); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + } + + + @Bean + public Queue EAP_REQUEST_QUEUE_PID4B(){ + Queue queue = new Queue(EAP_REQUEST_QUEUE_PID4B); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + + } + @Bean + public Queue EAP_RESPONSE_QUEUE_PID4B(){ + Queue queue = new Queue(EAP_RESPONSE_QUEUE_PID4B); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + } + + + @Bean + public Binding bindExchangeAndQueueA_PID4B(){ + return BindingBuilder.bind(EAP_REQUEST_QUEUE_PID4B()).to(EXCHANGE_NAME_PID4B()) + .with(EAP_REQUEST_QUEUE_ROUTINGKEY_PID4B); + } + @Bean + public Binding bindExchangeAndQueueB_PID4B(){ + return BindingBuilder.bind(EAP_RESPONSE_QUEUE_PID4B()).to(EXCHANGE_NAME_PID4B()) + .with(EAP_RESPONSE_QUEUE_ROUTINGKEY_PID4B); + } + + @Bean + public Binding bindExchangeAndQueueC_PID4B(){ + return BindingBuilder.bind(MES_REQUEST_QUEUE_PID4B()).to(EXCHANGE_NAME_PID4B()) + .with(MES_REQUEST_QUEUE_ROUTINGKEY_PID4B); + } + + @Bean + public Binding bindExchangeAndQueueD_PID4B(){ + return BindingBuilder.bind(MES_RESPONSE_QUEUE_PID4B()).to(EXCHANGE_NAME_PID4B()) + .with(MES_RESPONSE_QUEUE_ROUTINGKEY_PID4B); + } +} \ No newline at end of file diff --git a/src/main/java/com/qgs/dc/mq/consumer/PID00BReceived.java b/src/main/java/com/qgs/dc/mq/consumer/PID00BReceived.java index 35429fe..e35ed8d 100644 --- a/src/main/java/com/qgs/dc/mq/consumer/PID00BReceived.java +++ b/src/main/java/com/qgs/dc/mq/consumer/PID00BReceived.java @@ -44,6 +44,9 @@ public class PID00BReceived { @Autowired MQMessageHandler mqMessageHandler; + + //这个队列监听EAP 推送过来的Event,然后dc 把这个Event 转发给Mes + //need @RabbitListener(queues = ConfigOf00B.EAP_REQUEST_QUEUE_00B) @RabbitHandler public void eapRequest(Message message, Channel channel)throws Exception{ @@ -72,10 +75,6 @@ public class PID00BReceived { logger.error("在 EAP_REQUEST_QUEUE_00B 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); channel.basicNack(deliveryTag,false,false); } - - - - //2.模拟异常 ,然后 拒签消息 ,然后丢到死信队列 //throw new Exception("11111"); @@ -143,7 +142,8 @@ public class PID00BReceived { System.out.println(mqMessage2.toString()); } - @RabbitListener(queues = ConfigOf00B.MES_RESPONSE_QUEUE_00B) + + //这种情况:指的是 MES发出远程指令,然后eap处理完后,把结果丢到mesResponse队列 通知MES进行收尾处理。 @RabbitHandler public void mesResponse(Message message, Channel channel)throws Exception{ logger.info("==============PID00B_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1")); @@ -162,7 +162,6 @@ public class PID00BReceived { channel.basicNack(deliveryTag,false,false); } - //throw new Exception("11111"); }catch (Exception e){ // 第一个false 是 不批量签收;第二个false 是 不重回队列 channel.basicNack(deliveryTag,false,false); diff --git a/src/main/java/com/qgs/dc/mq/consumer/PID13SReceived.java b/src/main/java/com/qgs/dc/mq/consumer/PID13SReceived.java new file mode 100644 index 0000000..86ddf02 --- /dev/null +++ b/src/main/java/com/qgs/dc/mq/consumer/PID13SReceived.java @@ -0,0 +1,170 @@ +package com.qgs.dc.mq.consumer; + +import cn.hutool.http.HttpUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.qgs.dc.common.utils.CommonFunction; +import com.qgs.dc.mq.Constant.Constant; +import com.qgs.dc.mq.configuration.ConfigOfPID13S; +import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; +import com.qgs.dc.mq.entity.MQMessage; +import com.qgs.dc.mq.entity.common.Header; +import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody; +import com.rabbitmq.client.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +/** + * @Desc: "PIDPID13S设备 接收MQ消息 监听类" + * @Author: caixiang + * @DATE: 2021/6/22 15:30 + * + * Ctrl+R 替换设备名 + */ +@Component +public class PID13SReceived { + + private static final Logger logger = LoggerFactory.getLogger(PID13SReceived.class); + + + + @Autowired + MQMessageHandler mqMessageHandler; + + @RabbitListener(queues = ConfigOfPID13S.EAP_REQUEST_QUEUE_PID13S) + @RabbitHandler + public void eapRequest(Message message, Channel channel)throws Exception{ + + logger.info("==============received message-EAP_REQUEST_QUEUE_PID13S=================,priority:"+"equipmentName"+message.getHeaders().get("attr2")); + + Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); +// //MQMessage 中的 transactionId +// String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation"); +// //logger.info("transactionId:"+transactionId); + + try { + + MQMessage mqMessage = CommonFunction.parse(message); + String transactionId = mqMessage.getHeader().getTransactionId(); + //1. 正常情况 + //Integer integer = mqMessageHandler.requestHandler(message); + String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage)); + Integer result = Integer.valueOf(integer); + + //注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 + if(result == 1){ + logger.info("在 EAP_REQUEST_QUEUE_PID13S 队列中,transitionId"+transactionId+", 这条消息处理成功"); + channel.basicAck(deliveryTag,false); + }else { + logger.error("在 EAP_REQUEST_QUEUE_PID13S 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); + channel.basicNack(deliveryTag,false,false); + } + //2.模拟异常 ,然后 拒签消息 ,然后丢到死信队列 + + //throw new Exception("11111"); + }catch (Exception e){ + // 第一个false 是 不批量签收;第二个false 是 不重回队列 + logger.error(e.getMessage()); + channel.basicNack(deliveryTag,false,false); + return; + } + + + + } + + + public static void main(String[] args) { + //localhost:8001 + + + MQMessage mqMessage = new MQMessage(); + + Header header = new Header("Request","Execute","QUERYEQPStatus","12"); + QueryEQStatusBody queryEQStatusBody = new QueryEQStatusBody(); + queryEQStatusBody.setVidType("u4"); + List vids = new ArrayList<>(); + vids.add("10000"); + vids.add("10001"); + vids.add("10002"); + queryEQStatusBody.setVidList(vids); + + //queryEQStatusBodys => {"vidList":["10000","10001","10002"],"vidType":"u4"} + //bytes => [123, 34, 118, 105, 100, 76, 105, 115, 116, 34, 58, 91, 34, 49, 48, 48, 48, 48, 34, 44, 34, 49, 48, 48, 48, 49, 34, 44, 34, 49, 48, 48, 48, 50, 34, 93, 44, 34, 118, 105, 100, 84, 121, 112, 101, 34, 58, 34, 117, 52, 34, 125] + //"body": "eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==" + //message => {"header":{"transactionId":"12_20210908141555_d7d88","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 14:15:56","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} + //new String(bytes) : {"vidType":"u4","vidList":["10000","10001","10002"]} + String queryEQStatusBodys = JSONObject.toJSONString(queryEQStatusBody); + + + byte[] bytes = JSONObject.toJSONBytes(queryEQStatusBody); + System.out.println("myString :"+ new String(bytes)); + mqMessage.setBody(queryEQStatusBodys); + mqMessage.setHeader(header); + + //{"header":{"transactionId":"12_20210908094750_d62d4","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 09:47:50","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} + String s = JSONObject.toJSONString(mqMessage); + MQMessage mqMessagessss = JSONObject.parseObject(s.getBytes(StandardCharsets.UTF_8), MQMessage.class); + System.out.println("mqMessagessss : " + mqMessagessss.toString()); + System.out.println(s); + //String result= HttpUtil.post("localhost:8001/receivedFromEapRequest",s); + + //{"header":{"transactionId":"PIDPID13S_20210908093729_95196","messageType":"Request","messageCategory":"Alarm","messageName":"AlarmTimeOut","from":"EAP","to":"MES","equipmentId":"PIDPID13S","sendTimestamp":"2021-09-08 09:37:29"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} + String s22 = "{\"header\":{\"transactionId\":\"PIDPID13S_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID13S\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":\"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==\"}"; + + String s2 = "{\"header\":{\"transactionId\":\"PIDPID13S_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID13S\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":{\"TransactionId:\":\"PIDPID13S_20210908093644_71835\",\"MessageName:\":\"TraceData\"}}"; + JSONObject jsonObject = JSON.parseObject(s2); + String header1 = jsonObject.getString("header"); + String body = jsonObject.getString("body"); + String returns = jsonObject.getString("returns"); + System.out.println("header: "+header1); + System.out.println("body: "+body); + System.out.println("returns: "+returns); + + byte[] bytes2 = s2.getBytes(); + MQMessage mqMessage2 = JSONObject.parseObject(bytes2, MQMessage.class); + System.out.println(mqMessage2.toString()); + } + + @RabbitListener(queues = ConfigOfPID13S.MES_RESPONSE_QUEUE_PID13S) + @RabbitHandler + public void mesResponse(Message message, Channel channel)throws Exception{ + logger.info("==============PIDPID13S_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1")); + Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); + + //模拟异常 ,然后 拒签消息 ,然后丢到死信队列 + try { + MQMessage mqMessage = CommonFunction.parse(message); + String transactionId = mqMessage.getHeader().getTransactionId(); + Integer result = mqMessageHandler.responseHandler(message); + if(result == 1){ + logger.info("在 MES_RESPONSE_QUEUE_PID13S 队列中 , transitionId"+transactionId+" 这条消息处理成功"); + channel.basicAck(deliveryTag,false); + }else { + logger.error("在 MES_RESPONSE_QUEUE_PID13S 队列中 ,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); + channel.basicNack(deliveryTag,false,false); + } + + //throw new Exception("11111"); + }catch (Exception e){ + // 第一个false 是 不批量签收;第二个false 是 不重回队列 + channel.basicNack(deliveryTag,false,false); + return; + } + + //注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 + //channel.basicAck(deliveryTag,false); + + } + +} diff --git a/src/main/java/com/qgs/dc/mq/consumer/PID18Received.java b/src/main/java/com/qgs/dc/mq/consumer/PID18Received.java new file mode 100644 index 0000000..9328e17 --- /dev/null +++ b/src/main/java/com/qgs/dc/mq/consumer/PID18Received.java @@ -0,0 +1,170 @@ +package com.qgs.dc.mq.consumer; + +import cn.hutool.http.HttpUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.qgs.dc.common.utils.CommonFunction; +import com.qgs.dc.mq.Constant.Constant; +import com.qgs.dc.mq.configuration.ConfigOfPID18; +import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; +import com.qgs.dc.mq.entity.MQMessage; +import com.qgs.dc.mq.entity.common.Header; +import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody; +import com.rabbitmq.client.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +/** + * @Desc: "PIDPID18设备 接收MQ消息 监听类" + * @Author: caixiang + * @DATE: 2021/6/22 15:30 + * + * Ctrl+R 替换设备名 + */ +@Component +public class PID18Received { + + private static final Logger logger = LoggerFactory.getLogger(PID18Received.class); + + + + @Autowired + MQMessageHandler mqMessageHandler; + + @RabbitListener(queues = ConfigOfPID18.EAP_REQUEST_QUEUE_PID18) + @RabbitHandler + public void eapRequest(Message message, Channel channel)throws Exception{ + + logger.info("==============received message-EAP_REQUEST_QUEUE_PID18=================,priority:"+"equipmentName"+message.getHeaders().get("attr2")); + + Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); +// //MQMessage 中的 transactionId +// String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation"); +// //logger.info("transactionId:"+transactionId); + + try { + + MQMessage mqMessage = CommonFunction.parse(message); + String transactionId = mqMessage.getHeader().getTransactionId(); + //1. 正常情况 + //Integer integer = mqMessageHandler.requestHandler(message); + String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage)); + Integer result = Integer.valueOf(integer); + + //注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 + if(result == 1){ + logger.info("在 EAP_REQUEST_QUEUE_PID18 队列中,transitionId"+transactionId+", 这条消息处理成功"); + channel.basicAck(deliveryTag,false); + }else { + logger.error("在 EAP_REQUEST_QUEUE_PID18 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); + channel.basicNack(deliveryTag,false,false); + } + //2.模拟异常 ,然后 拒签消息 ,然后丢到死信队列 + + //throw new Exception("11111"); + }catch (Exception e){ + // 第一个false 是 不批量签收;第二个false 是 不重回队列 + logger.error(e.getMessage()); + channel.basicNack(deliveryTag,false,false); + return; + } + + + + } + + + public static void main(String[] args) { + //localhost:8001 + + + MQMessage mqMessage = new MQMessage(); + + Header header = new Header("Request","Execute","QUERYEQPStatus","12"); + QueryEQStatusBody queryEQStatusBody = new QueryEQStatusBody(); + queryEQStatusBody.setVidType("u4"); + List vids = new ArrayList<>(); + vids.add("10000"); + vids.add("10001"); + vids.add("10002"); + queryEQStatusBody.setVidList(vids); + + //queryEQStatusBodys => {"vidList":["10000","10001","10002"],"vidType":"u4"} + //bytes => [123, 34, 118, 105, 100, 76, 105, 115, 116, 34, 58, 91, 34, 49, 48, 48, 48, 48, 34, 44, 34, 49, 48, 48, 48, 49, 34, 44, 34, 49, 48, 48, 48, 50, 34, 93, 44, 34, 118, 105, 100, 84, 121, 112, 101, 34, 58, 34, 117, 52, 34, 125] + //"body": "eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==" + //message => {"header":{"transactionId":"12_20210908141555_d7d88","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 14:15:56","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} + //new String(bytes) : {"vidType":"u4","vidList":["10000","10001","10002"]} + String queryEQStatusBodys = JSONObject.toJSONString(queryEQStatusBody); + + + byte[] bytes = JSONObject.toJSONBytes(queryEQStatusBody); + System.out.println("myString :"+ new String(bytes)); + mqMessage.setBody(queryEQStatusBodys); + mqMessage.setHeader(header); + + //{"header":{"transactionId":"12_20210908094750_d62d4","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 09:47:50","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} + String s = JSONObject.toJSONString(mqMessage); + MQMessage mqMessagessss = JSONObject.parseObject(s.getBytes(StandardCharsets.UTF_8), MQMessage.class); + System.out.println("mqMessagessss : " + mqMessagessss.toString()); + System.out.println(s); + //String result= HttpUtil.post("localhost:8001/receivedFromEapRequest",s); + + //{"header":{"transactionId":"PIDPID18_20210908093729_95196","messageType":"Request","messageCategory":"Alarm","messageName":"AlarmTimeOut","from":"EAP","to":"MES","equipmentId":"PIDPID18","sendTimestamp":"2021-09-08 09:37:29"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} + String s22 = "{\"header\":{\"transactionId\":\"PIDPID18_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID18\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":\"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==\"}"; + + String s2 = "{\"header\":{\"transactionId\":\"PIDPID18_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID18\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":{\"TransactionId:\":\"PIDPID18_20210908093644_71835\",\"MessageName:\":\"TraceData\"}}"; + JSONObject jsonObject = JSON.parseObject(s2); + String header1 = jsonObject.getString("header"); + String body = jsonObject.getString("body"); + String returns = jsonObject.getString("returns"); + System.out.println("header: "+header1); + System.out.println("body: "+body); + System.out.println("returns: "+returns); + + byte[] bytes2 = s2.getBytes(); + MQMessage mqMessage2 = JSONObject.parseObject(bytes2, MQMessage.class); + System.out.println(mqMessage2.toString()); + } + + @RabbitListener(queues = ConfigOfPID18.MES_RESPONSE_QUEUE_PID18) + @RabbitHandler + public void mesResponse(Message message, Channel channel)throws Exception{ + logger.info("==============PIDPID18_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1")); + Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); + + //模拟异常 ,然后 拒签消息 ,然后丢到死信队列 + try { + MQMessage mqMessage = CommonFunction.parse(message); + String transactionId = mqMessage.getHeader().getTransactionId(); + Integer result = mqMessageHandler.responseHandler(message); + if(result == 1){ + logger.info("在 MES_RESPONSE_QUEUE_PID18 队列中 , transitionId"+transactionId+" 这条消息处理成功"); + channel.basicAck(deliveryTag,false); + }else { + logger.error("在 MES_RESPONSE_QUEUE_PID18 队列中 ,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); + channel.basicNack(deliveryTag,false,false); + } + + //throw new Exception("11111"); + }catch (Exception e){ + // 第一个false 是 不批量签收;第二个false 是 不重回队列 + channel.basicNack(deliveryTag,false,false); + return; + } + + //注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 + //channel.basicAck(deliveryTag,false); + + } + +} diff --git a/src/main/java/com/qgs/dc/mq/consumer/PID4BReceived.java b/src/main/java/com/qgs/dc/mq/consumer/PID4BReceived.java new file mode 100644 index 0000000..4406ba0 --- /dev/null +++ b/src/main/java/com/qgs/dc/mq/consumer/PID4BReceived.java @@ -0,0 +1,167 @@ +package com.qgs.dc.mq.consumer; + +import cn.hutool.http.HttpUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.qgs.dc.common.utils.CommonFunction; +import com.qgs.dc.mq.Constant.Constant; +import com.qgs.dc.mq.configuration.ConfigOfPID4B; +import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; +import com.qgs.dc.mq.entity.MQMessage; +import com.qgs.dc.mq.entity.common.Header; +import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody; +import com.rabbitmq.client.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +/** + * @Desc: "PIDPID4B设备 接收MQ消息 监听类" + * @Author: caixiang + * @DATE: 2021/6/22 15:30 + * + * Ctrl+R 替换设备名 + */ +@Component +public class PID4BReceived { + + private static final Logger logger = LoggerFactory.getLogger(PID4BReceived.class); + + + + @Autowired + MQMessageHandler mqMessageHandler; + + @RabbitListener(queues = ConfigOfPID4B.EAP_REQUEST_QUEUE_PID4B) + @RabbitHandler + public void eapRequest(Message message, Channel channel)throws Exception{ + + logger.info("==============received message-EAP_REQUEST_QUEUE_PID4B================="); + + Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); + //MQMessage 中的 transactionId +// String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation"); +// //logger.info("transactionId:"+transactionId); + + try { + + MQMessage mqMessage = CommonFunction.parse(message); + String transactionId = mqMessage.getHeader().getTransactionId(); + //1. 正常情况 + String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage)); + Integer result = Integer.valueOf(integer); + + //注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 + if(result == 1){ + logger.info("在 EAP_REQUEST_QUEUE_PID4B 队列中,transitionId"+transactionId+", 这条消息处理成功"); + channel.basicAck(deliveryTag,false); + }else { + logger.error("在 EAP_REQUEST_QUEUE_PID4B 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); + channel.basicNack(deliveryTag,false,false); + } + //2.模拟异常 ,然后 拒签消息 ,然后丢到死信队列 + }catch (Exception e){ + // 第一个false 是 不批量签收;第二个false 是 不重回队列 + logger.error(e.getMessage()); + channel.basicNack(deliveryTag,false,false); + return; + } + + + + } + + + public static void main(String[] args) { + //localhost:8001 + + + MQMessage mqMessage = new MQMessage(); + + Header header = new Header("Request","Execute","QUERYEQPStatus","12"); + QueryEQStatusBody queryEQStatusBody = new QueryEQStatusBody(); + queryEQStatusBody.setVidType("u4"); + List vids = new ArrayList<>(); + vids.add("10000"); + vids.add("10001"); + vids.add("10002"); + queryEQStatusBody.setVidList(vids); + + //queryEQStatusBodys => {"vidList":["10000","10001","10002"],"vidType":"u4"} + //bytes => [123, 34, 118, 105, 100, 76, 105, 115, 116, 34, 58, 91, 34, 49, 48, 48, 48, 48, 34, 44, 34, 49, 48, 48, 48, 49, 34, 44, 34, 49, 48, 48, 48, 50, 34, 93, 44, 34, 118, 105, 100, 84, 121, 112, 101, 34, 58, 34, 117, 52, 34, 125] + //"body": "eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==" + //message => {"header":{"transactionId":"12_20210908141555_d7d88","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 14:15:56","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} + //new String(bytes) : {"vidType":"u4","vidList":["10000","10001","10002"]} + String queryEQStatusBodys = JSONObject.toJSONString(queryEQStatusBody); + + + byte[] bytes = JSONObject.toJSONBytes(queryEQStatusBody); + System.out.println("myString :"+ new String(bytes)); + mqMessage.setBody(queryEQStatusBodys); + mqMessage.setHeader(header); + + //{"header":{"transactionId":"12_20210908094750_d62d4","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 09:47:50","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} + String s = JSONObject.toJSONString(mqMessage); + MQMessage mqMessagessss = JSONObject.parseObject(s.getBytes(StandardCharsets.UTF_8), MQMessage.class); + System.out.println("mqMessagessss : " + mqMessagessss.toString()); + System.out.println(s); + //String result= HttpUtil.post("localhost:8001/receivedFromEapRequest",s); + + //{"header":{"transactionId":"PIDPID4B_20210908093729_95196","messageType":"Request","messageCategory":"Alarm","messageName":"AlarmTimeOut","from":"EAP","to":"MES","equipmentId":"PIDPID4B","sendTimestamp":"2021-09-08 09:37:29"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} + String s22 = "{\"header\":{\"transactionId\":\"PIDPID4B_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID4B\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":\"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==\"}"; + + String s2 = "{\"header\":{\"transactionId\":\"PIDPID4B_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID4B\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":{\"TransactionId:\":\"PIDPID4B_20210908093644_71835\",\"MessageName:\":\"TraceData\"}}"; + JSONObject jsonObject = JSON.parseObject(s2); + String header1 = jsonObject.getString("header"); + String body = jsonObject.getString("body"); + String returns = jsonObject.getString("returns"); + System.out.println("header: "+header1); + System.out.println("body: "+body); + System.out.println("returns: "+returns); + + byte[] bytes2 = s2.getBytes(); + MQMessage mqMessage2 = JSONObject.parseObject(bytes2, MQMessage.class); + System.out.println(mqMessage2.toString()); + } + + @RabbitListener(queues = ConfigOfPID4B.MES_RESPONSE_QUEUE_PID4B) + @RabbitHandler + public void mesResponse(Message message, Channel channel)throws Exception{ + logger.info("==============PIDPID4B_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1")); + Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); + + //模拟异常 ,然后 拒签消息 ,然后丢到死信队列 + try { + MQMessage mqMessage = CommonFunction.parse(message); + String transactionId = mqMessage.getHeader().getTransactionId(); + Integer result = mqMessageHandler.responseHandler(message); + if(result == 1){ + logger.info("在 MES_RESPONSE_QUEUE_PID4B 队列中 , transitionId"+transactionId+" 这条消息处理成功"); + channel.basicAck(deliveryTag,false); + }else { + logger.error("在 MES_RESPONSE_QUEUE_PID4B 队列中 ,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); + channel.basicNack(deliveryTag,false,false); + } + + //throw new Exception("11111"); + }catch (Exception e){ + // 第一个false 是 不批量签收;第二个false 是 不重回队列 + channel.basicNack(deliveryTag,false,false); + return; + } + + //注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 + //channel.basicAck(deliveryTag,false); + + } + +} diff --git a/src/main/java/com/qgs/dc/mq/controller/MQController.java b/src/main/java/com/qgs/dc/mq/controller/MQController.java index ecac075..271a1a3 100644 --- a/src/main/java/com/qgs/dc/mq/controller/MQController.java +++ b/src/main/java/com/qgs/dc/mq/controller/MQController.java @@ -51,6 +51,7 @@ public class MQController { } } + //need @PostMapping("/mesRequest") /** * desc : MES给EAP发送远程指令(MES_Request)(向rabbitmq中发送消息(direct模式)) diff --git a/src/main/java/com/qgs/dc/opcua/controller/OperateController.java b/src/main/java/com/qgs/dc/opcua/controller/OperateController.java index 15b9362..57cd3af 100644 --- a/src/main/java/com/qgs/dc/opcua/controller/OperateController.java +++ b/src/main/java/com/qgs/dc/opcua/controller/OperateController.java @@ -137,6 +137,14 @@ public class OperateController { } + public String boolList(Boolean[] b){ + List c = new ArrayList<>(); + for(int i=0;i multiWorkRunnable1 = new MultiWorkRunnable<>(uaService); -// Future submit1 = QGSThreadPool.executor.submit(multiWorkRunnable1); +// Future submit1 = QGSThreadPool.execut or.submit(multiWorkRunnable1); // } // return R.ok(); // } diff --git a/src/main/java/com/qgs/dc/s7/my/s7connector/MainForRead.java b/src/main/java/com/qgs/dc/s7/my/s7connector/MainForRead.java index 3978ef8..4388636 100644 --- a/src/main/java/com/qgs/dc/s7/my/s7connector/MainForRead.java +++ b/src/main/java/com/qgs/dc/s7/my/s7connector/MainForRead.java @@ -29,7 +29,6 @@ public class MainForRead { S7Connector connector = S7Client.S7_1500.getConnector(); - // // [0] // byte[] bool = connector.read(DaveArea.DB, 3, 1, 3266,0); // byte[] bool2 = connector.read(DaveArea.DB, 3, 1, 3266,1); @@ -42,8 +41,9 @@ public class MainForRead { //bool3 => [3] 0000 0011 ==> 1100 0000 11....... //bool3 => [10] 0000 1010 ==> 0101 0000 0101..... //bool3 => [25] 0001 1001 ==> 1001 1000 10011.... - byte[] bool3 = connector.read(DaveArea.DB, 3, 1, 3267); - System.out.println("DB3.0-bool3 : " + ByteUtils.toBoolean(bool3)); + byte[] bool3 = connector.read(DaveArea.DB, 3, 1, 3267); + System.out.println("DB3.0-bool3 : " + ByteUtils.toBoolean(bool3)); + // @@ -83,21 +83,21 @@ public class MainForRead { // System.out.println("DB3.42-DATE : "+ByteUtils.addDate("1990-01-01",aLong)); // } // - { - //[7, -78, 1, 1, 5, 0, 0, 0, 0, 0, 0, 0] DTL#1970-01-01-00:00:00 1998-1-1 星期五 0.0.0.0 - //0000 0111 (1100 1110)=>(1011 0010 178) 178+256+512+1024=1970 - //(注意:第二个字节是负数要转成补码形式表示(因为在通行传输中 字节是原码形式传输的,但是java中long int byte... 都是以补码形式保存的 java帮你自动保存了其实这是不对的所以你要转换一下) 参考https://blog.csdn.net/csdn_ds/article/details/79106006) - //[7, -68, 1, 21, 2, 0, 0, 0, 0, 0, 0, 0] - //0000 0111 (1100 0100)=>(1011 1100 188) 188+256+512+1024=1980 - byte[] dtl = connector.read(DaveArea.DB, 3, 12, 44); - byte[] year = new byte[2]; - year[0] = dtl[0]; - year[1] = dtl[1]; - byte[] month = new byte[1]; - month[0] = dtl[2]; -// System.out.println("DB3.12-DTL : " + byteArrayByteUtils.toUInt(year)+"-"+byteArrayByteUtils.toUInt(month)); - System.out.println("DB3.44-DTL : " + ByteUtils.toInt(year[0],year[1])+"-"+ByteUtils.toInt(month[0])); - } +// { +// //[7, -78, 1, 1, 5, 0, 0, 0, 0, 0, 0, 0] DTL#1970-01-01-00:00:00 1998-1-1 星期五 0.0.0.0 +// //0000 0111 (1100 1110)=>(1011 0010 178) 178+256+512+1024=1970 +// //(注意:第二个字节是负数要转成补码形式表示(因为在通行传输中 字节是原码形式传输的,但是java中long int byte... 都是以补码形式保存的 java帮你自动保存了其实这是不对的所以你要转换一下) 参考https://blog.csdn.net/csdn_ds/article/details/79106006) +// //[7, -68, 1, 21, 2, 0, 0, 0, 0, 0, 0, 0] +// //0000 0111 (1100 0100)=>(1011 1100 188) 188+256+512+1024=1980 +// byte[] dtl = connector.read(DaveArea.DB, 3, 12, 44); +// byte[] year = new byte[2]; +// year[0] = dtl[0]; +// year[1] = dtl[1]; +// byte[] month = new byte[1]; +// month[0] = dtl[2]; +//// System.out.println("DB3.12-DTL : " + byteArrayByteUtils.toUInt(year)+"-"+byteArrayByteUtils.toUInt(month)); +// System.out.println("DB3.44-DTL : " + ByteUtils.toInt(year[0],year[1])+"-"+ByteUtils.toInt(month[0])); +// } // // { // //[59, -102, -55, -1] T#11D_13H_46M_39S_999MS @@ -176,59 +176,59 @@ public class MainForRead { // // } // - { - try { - //byte 占用一个字节,如果是数组的话,就读取2个(要事先知道,点表规定数组长度),实际就是读取 DB3.830 - //bytes = byteLength * arrayLength 举例:1(byteLength) * 2(arrayLength) =2(bytes) - byte[] boolArrays = connector.read(DaveArea.DB, 3, 2, 830); - List booleans = ByteUtils.toBoolArray(boolArrays); - System.out.println("DB3.830-boolArrays : " +booleans ); - - byte[] byteArrays = connector.read(DaveArea.DB, 3, 2, 832); - List bytes = ByteUtils.toByteArray(byteArrays); - System.out.println("DB3.832-byteArrays : " +bytes ); - - byte[] charArrays = connector.read(DaveArea.DB, 3, 2, 834); - List chars = ByteUtils.toCharArray(charArrays); - System.out.println("DB3.834-charArrays : " +chars ); - - byte[] wordArrays = connector.read(DaveArea.DB, 3, 4, 836); - List words = ByteUtils.toWordArray(wordArrays); - System.out.println("DB3.836-wordArrays : " +words ); - - byte[] dwordArrays = connector.read(DaveArea.DB, 3, 8, 840); - List dwords = ByteUtils.toDWordArray(dwordArrays); - System.out.println("DB3.840-dwordArrays : " +dwords ); - - byte[] sintArrays = connector.read(DaveArea.DB, 3, 2, 852); - List sints = ByteUtils.toSIntArray(sintArrays); - System.out.println("DB3.852-sintArrays : " +sints ); - - byte[] intArrays = connector.read(DaveArea.DB, 3, 4, 848); - List ints = ByteUtils.toIntArray(intArrays); - System.out.println("DB3.848-intArrays : " +ints ); - - byte[] dintArrays = connector.read(DaveArea.DB, 3, 8, 854); - List dints = ByteUtils.toDIntArray(dintArrays); - System.out.println("DB3.852-dintArrays : " +dints); - - - byte[] usintArrays = connector.read(DaveArea.DB, 3, 3, 3240); - List usints = ByteUtils.toUSIntArray(usintArrays); - System.out.println("DB3.3240-usintArrays : " +usints ); - - byte[] uintArrays = connector.read(DaveArea.DB, 3, 6, 3256); - List uints = ByteUtils.toUIntArray(uintArrays); - System.out.println("DB3.3256-uintArrays : " +uints ); - - byte[] udintArrays = connector.read(DaveArea.DB, 3, 12, 3244); - List udints = ByteUtils.toUDIntArray(udintArrays); - System.out.println("DB3.852-udintArrays : " +udints); - }catch (Exception e){ - e.printStackTrace(); - } - - }; +// { +// try { +// //byte 占用一个字节,如果是数组的话,就读取2个(要事先知道,点表规定数组长度),实际就是读取 DB3.830 +// //bytes = byteLength * arrayLength 举例:1(byteLength) * 2(arrayLength) =2(bytes) +// byte[] boolArrays = connector.read(DaveArea.DB, 3, 2, 830); +// List booleans = ByteUtils.toBoolArray(boolArrays); +// System.out.println("DB3.830-boolArrays : " +booleans ); +// +// byte[] byteArrays = connector.read(DaveArea.DB, 3, 2, 832); +// List bytes = ByteUtils.toByteArray(byteArrays); +// System.out.println("DB3.832-byteArrays : " +bytes ); +// +// byte[] charArrays = connector.read(DaveArea.DB, 3, 2, 834); +// List chars = ByteUtils.toCharArray(charArrays); +// System.out.println("DB3.834-charArrays : " +chars ); +// +// byte[] wordArrays = connector.read(DaveArea.DB, 3, 4, 836); +// List words = ByteUtils.toWordArray(wordArrays); +// System.out.println("DB3.836-wordArrays : " +words ); +// +// byte[] dwordArrays = connector.read(DaveArea.DB, 3, 8, 840); +// List dwords = ByteUtils.toDWordArray(dwordArrays); +// System.out.println("DB3.840-dwordArrays : " +dwords ); +// +// byte[] sintArrays = connector.read(DaveArea.DB, 3, 2, 852); +// List sints = ByteUtils.toSIntArray(sintArrays); +// System.out.println("DB3.852-sintArrays : " +sints ); +// +// byte[] intArrays = connector.read(DaveArea.DB, 3, 4, 848); +// List ints = ByteUtils.toIntArray(intArrays); +// System.out.println("DB3.848-intArrays : " +ints ); +// +// byte[] dintArrays = connector.read(DaveArea.DB, 3, 8, 854); +// List dints = ByteUtils.toDIntArray(dintArrays); +// System.out.println("DB3.852-dintArrays : " +dints); +// +// +// byte[] usintArrays = connector.read(DaveArea.DB, 3, 3, 3240); +// List usints = ByteUtils.toUSIntArray(usintArrays); +// System.out.println("DB3.3240-usintArrays : " +usints ); +// +// byte[] uintArrays = connector.read(DaveArea.DB, 3, 6, 3256); +// List uints = ByteUtils.toUIntArray(uintArrays); +// System.out.println("DB3.3256-uintArrays : " +uints ); +// +// byte[] udintArrays = connector.read(DaveArea.DB, 3, 12, 3244); +// List udints = ByteUtils.toUDIntArray(udintArrays); +// System.out.println("DB3.852-udintArrays : " +udints); +// }catch (Exception e){ +// e.printStackTrace(); +// } +// +// }; diff --git a/src/main/java/com/qgs/dc/s7/my/s7connector/enmuc/S7Client.java b/src/main/java/com/qgs/dc/s7/my/s7connector/enmuc/S7Client.java index b8a1748..ec1562c 100644 --- a/src/main/java/com/qgs/dc/s7/my/s7connector/enmuc/S7Client.java +++ b/src/main/java/com/qgs/dc/s7/my/s7connector/enmuc/S7Client.java @@ -29,9 +29,10 @@ import java.util.concurrent.TimeUnit; public enum S7Client { //TODO 步骤1 这里是配置多PLC 的,,,有多个plc 就在这里配置一个枚举类 //1500 西门子200smart、1200、1500默认的 机架号=0 槽位号=1; 300/400 默认的 机架-0 插槽-2 + S7_1200("192.168.0.52",0,1,1,PlcVarActual.HeartBeatFor1200), + S7_1500("192.168.0.51",0,1,1,PlcVarActual.HeartBeat), //1500 机架-0 插槽-1 - S7_1200("192.168.0.52",0,1,1,PlcVarActual.HeartBeatFor1200) //后续 在这里扩展 多PLC应用。 @@ -271,7 +272,7 @@ public enum S7Client { .build(); return connector; }catch (S7Exception e){ - logger.info("创建S7Connector 连接失败,原因:"+e.getMessage()); +// logger.info("创建S7Connector 连接失败,原因:"+e.getMessage()); return null; } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index b374784..8dea78f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -8,11 +8,11 @@ server: spring: rabbitmq: # 如果是rabbitmq+haproxy+keepalived集群 ,,那么192.168.0.176是haproxy代理的地址(严格来说是keepalived的vip) - addresses: 192.168.0.176:5672 ## 新版rabbitmq 版本还未测试 + addresses: 192.168.0.176:5672 # 新版rabbitmq 版本还未测试 #addresses: 172.16.21.133:5672 - username: cdte - password: cdte - virtual-host: cdte + username: cigs + password: cigs + virtual-host: cigs connection-timeout: 15000 publisher-confirm-type: correlated publisher-returns: true