diff --git a/src/main/java/com/qgs/dc/influx/config/InfluxClient.java b/src/main/java/com/qgs/dc/influx/config/InfluxClient.java index 8c3910f..7a95a1e 100644 --- a/src/main/java/com/qgs/dc/influx/config/InfluxClient.java +++ b/src/main/java/com/qgs/dc/influx/config/InfluxClient.java @@ -143,14 +143,20 @@ public enum InfluxClient { List dropedTagNames = param.getDropedTagNames(); Range range = param.getRange(); String bucket = param.getBucket(); - String tagName = param.getTag().getTagName(); - String tagValue = param.getTag().getTagValue(); + String tagName = null; + String tagValue = null; + if (param.getTag()!=null){ + tagName = param.getTag().getTagName(); + tagValue = param.getTag().getTagValue(); + } PageInfo pageInfo = param.getPageInfo(); String flux = "from(bucket:\""+bucket+"\")"; flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")"; flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")"; - flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; + if (tagName!=null){ + flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; + } for(String dropName:dropedTagNames){ flux += "|> drop(columns: [\""+dropName+"\"])"; } @@ -158,6 +164,7 @@ public enum InfluxClient { if(pageInfo!=null){ flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; } + System.out.println(flux); return queryApi.query(flux); } diff --git a/src/main/java/com/qgs/dc/influx/controller/InfluxController.java b/src/main/java/com/qgs/dc/influx/controller/InfluxController.java index 9ca5863..52df8f7 100644 --- a/src/main/java/com/qgs/dc/influx/controller/InfluxController.java +++ b/src/main/java/com/qgs/dc/influx/controller/InfluxController.java @@ -220,15 +220,19 @@ public class InfluxController { @PostMapping("/queryEvents") public JSONArray queryEvents(@RequestBody QueryDataParam queryDataParam) throws InterruptedException{ List fluxTables = InfluxClient.Client.query(queryDataParam); - List fluxRecords = fluxTables.get(0).getRecords(); + List fluxRecords = new ArrayList<>(); + for (FluxTable fluxTable:fluxTables){ + fluxRecords.addAll(fluxTable.getRecords()); + } JSONArray jsonArray = new JSONArray(); - for (FluxRecord fluxRecord:fluxRecords){ + for (FluxRecord fluxRecord:fluxRecords) { Map map = fluxRecord.getValues(); System.out.println(map); - //todo JSONObject jsonObject = new JSONObject(); jsonObject.put(map.get("argName").toString(),map.get("_value").toString()); - jsonObject.put("transationId",map.get("transationId").toString()); + if (map.get("transationId")!=null){ + jsonObject.put("transationId",map.get("transationId").toString()); + } jsonObject.put("measurement",map.get("_measurement").toString()); jsonObject.put("time",map.get("_time").toString()); jsonArray.add(jsonObject); diff --git a/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID10.java b/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID10.java new file mode 100644 index 0000000..c1b50c2 --- /dev/null +++ b/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID10.java @@ -0,0 +1,98 @@ +package com.qgs.dc.mq.configuration; + + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @Desc: "设备:PID10 相关信息定义" + * @Author: caixiang + * @DATE: 2021/6/7 9:11 + */ +@Configuration +public class ConfigOfPID10 { + + //水平扩展其他设备的时候 只要:control+R 然后 PID10=>00C 然后replace all + public static final String EQUIPMENT_NAME_PID10 = "PID10"; + + public static final String EXCHANGE_NAME_PID10 = EQUIPMENT_NAME_PID10 +"_Exchange"; + public static final String EAP_REQUEST_QUEUE_PID10 = EQUIPMENT_NAME_PID10 +"_EAP_Request_Queue"; + public static final String EAP_RESPONSE_QUEUE_PID10 = EQUIPMENT_NAME_PID10 +"_EAP_Response_Queue"; + public static final String MES_REQUEST_QUEUE_PID10 = EQUIPMENT_NAME_PID10 +"_MES_Request_Queue"; + public static final String MES_RESPONSE_QUEUE_PID10 = EQUIPMENT_NAME_PID10 +"_MES_Response_Queue"; + public static final String EAP_REQUEST_QUEUE_ROUTINGKEY_PID10 = EQUIPMENT_NAME_PID10 +"_EAP_Request_Queue_RoutingKey"; + public static final String EAP_RESPONSE_QUEUE_ROUTINGKEY_PID10 = EQUIPMENT_NAME_PID10 +"_EAP_Response_Queue_RoutingKey"; + public static final String MES_REQUEST_QUEUE_ROUTINGKEY_PID10 = EQUIPMENT_NAME_PID10 +"_MES_Request_Queue_RoutingKey"; + public static final String MES_RESPONSE_QUEUE_ROUTINGKEY_PID10 = EQUIPMENT_NAME_PID10 +"_MES_Response_Queue_RoutingKey"; + + + @Bean + public DirectExchange EXCHANGE_NAME_PID10(){ + return new DirectExchange(EXCHANGE_NAME_PID10); + } + + //todo + @Bean + public Queue MES_REQUEST_QUEUE_PID10(){ + Queue queue = new Queue(MES_REQUEST_QUEUE_PID10); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + } + @Bean + public Queue MES_RESPONSE_QUEUE_PID10(){ + Queue queue = new Queue(MES_RESPONSE_QUEUE_PID10); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + } + + + @Bean + public Queue EAP_REQUEST_QUEUE_PID10(){ + Queue queue = new Queue(EAP_REQUEST_QUEUE_PID10); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + + } + @Bean + public Queue EAP_RESPONSE_QUEUE_PID10(){ + Queue queue = new Queue(EAP_RESPONSE_QUEUE_PID10); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + } + + + @Bean + public Binding bindExchangeAndQueueA_PID10(){ + return BindingBuilder.bind(EAP_REQUEST_QUEUE_PID10()).to(EXCHANGE_NAME_PID10()) + .with(EAP_REQUEST_QUEUE_ROUTINGKEY_PID10); + } + @Bean + public Binding bindExchangeAndQueueB_PID10(){ + return BindingBuilder.bind(EAP_RESPONSE_QUEUE_PID10()).to(EXCHANGE_NAME_PID10()) + .with(EAP_RESPONSE_QUEUE_ROUTINGKEY_PID10); + } + + @Bean + public Binding bindExchangeAndQueueC_PID10(){ + return BindingBuilder.bind(MES_REQUEST_QUEUE_PID10()).to(EXCHANGE_NAME_PID10()) + .with(MES_REQUEST_QUEUE_ROUTINGKEY_PID10); + } + + @Bean + public Binding bindExchangeAndQueueD_PID10(){ + return BindingBuilder.bind(MES_RESPONSE_QUEUE_PID10()).to(EXCHANGE_NAME_PID10()) + .with(MES_RESPONSE_QUEUE_ROUTINGKEY_PID10); + } +} \ No newline at end of file diff --git a/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID2.java b/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID2.java new file mode 100644 index 0000000..27f2693 --- /dev/null +++ b/src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID2.java @@ -0,0 +1,98 @@ +package com.qgs.dc.mq.configuration; + + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @Desc: "设备:PID2 相关信息定义" + * @Author: caixiang + * @DATE: 2021/6/7 9:11 + */ +@Configuration +public class ConfigOfPID2 { + + //水平扩展其他设备的时候 只要:control+R 然后 PID2=>00C 然后replace all + public static final String EQUIPMENT_NAME_PID2 = "PID2"; + + public static final String EXCHANGE_NAME_PID2 = EQUIPMENT_NAME_PID2 +"_Exchange"; + public static final String EAP_REQUEST_QUEUE_PID2 = EQUIPMENT_NAME_PID2 +"_EAP_Request_Queue"; + public static final String EAP_RESPONSE_QUEUE_PID2 = EQUIPMENT_NAME_PID2 +"_EAP_Response_Queue"; + public static final String MES_REQUEST_QUEUE_PID2 = EQUIPMENT_NAME_PID2 +"_MES_Request_Queue"; + public static final String MES_RESPONSE_QUEUE_PID2 = EQUIPMENT_NAME_PID2 +"_MES_Response_Queue"; + public static final String EAP_REQUEST_QUEUE_ROUTINGKEY_PID2 = EQUIPMENT_NAME_PID2 +"_EAP_Request_Queue_RoutingKey"; + public static final String EAP_RESPONSE_QUEUE_ROUTINGKEY_PID2 = EQUIPMENT_NAME_PID2 +"_EAP_Response_Queue_RoutingKey"; + public static final String MES_REQUEST_QUEUE_ROUTINGKEY_PID2 = EQUIPMENT_NAME_PID2 +"_MES_Request_Queue_RoutingKey"; + public static final String MES_RESPONSE_QUEUE_ROUTINGKEY_PID2 = EQUIPMENT_NAME_PID2 +"_MES_Response_Queue_RoutingKey"; + + + @Bean + public DirectExchange EXCHANGE_NAME_PID2(){ + return new DirectExchange(EXCHANGE_NAME_PID2); + } + + //todo + @Bean + public Queue MES_REQUEST_QUEUE_PID2(){ + Queue queue = new Queue(MES_REQUEST_QUEUE_PID2); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + } + @Bean + public Queue MES_RESPONSE_QUEUE_PID2(){ + Queue queue = new Queue(MES_RESPONSE_QUEUE_PID2); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + } + + + @Bean + public Queue EAP_REQUEST_QUEUE_PID2(){ + Queue queue = new Queue(EAP_REQUEST_QUEUE_PID2); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + + } + @Bean + public Queue EAP_RESPONSE_QUEUE_PID2(){ + Queue queue = new Queue(EAP_RESPONSE_QUEUE_PID2); + queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); + queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); + //queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); + return queue; + } + + + @Bean + public Binding bindExchangeAndQueueA_PID2(){ + return BindingBuilder.bind(EAP_REQUEST_QUEUE_PID2()).to(EXCHANGE_NAME_PID2()) + .with(EAP_REQUEST_QUEUE_ROUTINGKEY_PID2); + } + @Bean + public Binding bindExchangeAndQueueB_PID2(){ + return BindingBuilder.bind(EAP_RESPONSE_QUEUE_PID2()).to(EXCHANGE_NAME_PID2()) + .with(EAP_RESPONSE_QUEUE_ROUTINGKEY_PID2); + } + + @Bean + public Binding bindExchangeAndQueueC_PID2(){ + return BindingBuilder.bind(MES_REQUEST_QUEUE_PID2()).to(EXCHANGE_NAME_PID2()) + .with(MES_REQUEST_QUEUE_ROUTINGKEY_PID2); + } + + @Bean + public Binding bindExchangeAndQueueD_PID2(){ + return BindingBuilder.bind(MES_RESPONSE_QUEUE_PID2()).to(EXCHANGE_NAME_PID2()) + .with(MES_RESPONSE_QUEUE_ROUTINGKEY_PID2); + } +} \ No newline at end of file diff --git a/src/main/java/com/qgs/dc/mq/consumer/PID10Received.java b/src/main/java/com/qgs/dc/mq/consumer/PID10Received.java new file mode 100644 index 0000000..32d80b2 --- /dev/null +++ b/src/main/java/com/qgs/dc/mq/consumer/PID10Received.java @@ -0,0 +1,170 @@ +package com.qgs.dc.mq.consumer; + +import cn.hutool.http.HttpUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.qgs.dc.common.utils.CommonFunction; +import com.qgs.dc.mq.Constant.Constant; +import com.qgs.dc.mq.configuration.ConfigOfPID10; +import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; +import com.qgs.dc.mq.entity.MQMessage; +import com.qgs.dc.mq.entity.common.Header; +import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody; +import com.rabbitmq.client.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +/** + * @Desc: "PIDPID10设备 接收MQ消息 监听类" + * @Author: caixiang + * @DATE: 2021/6/22 15:30 + * + * Ctrl+R 替换设备名 + */ +@Component +public class PID10Received { + + private static final Logger logger = LoggerFactory.getLogger(PID10Received.class); + + + + @Autowired + MQMessageHandler mqMessageHandler; + + @RabbitListener(queues = ConfigOfPID10.EAP_REQUEST_QUEUE_PID10) + @RabbitHandler + public void eapRequest(Message message, Channel channel)throws Exception{ + + logger.info("==============received message-EAP_REQUEST_QUEUE_PID10=================,priority:"+"equipmentName"+message.getHeaders().get("attr2")); + + Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); +// //MQMessage 中的 transactionId +// String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation"); +// //logger.info("transactionId:"+transactionId); + + try { + + MQMessage mqMessage = CommonFunction.parse(message); + String transactionId = mqMessage.getHeader().getTransactionId(); + //1. 正常情况 + //Integer integer = mqMessageHandler.requestHandler(message); + String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage)); + Integer result = Integer.valueOf(integer); + + //注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 + if(result == 1){ + logger.info("在 EAP_REQUEST_QUEUE_PID10 队列中,transitionId"+transactionId+", 这条消息处理成功"); + channel.basicAck(deliveryTag,false); + }else { + logger.error("在 EAP_REQUEST_QUEUE_PID10 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); + channel.basicNack(deliveryTag,false,false); + } + //2.模拟异常 ,然后 拒签消息 ,然后丢到死信队列 + + //throw new Exception("11111"); + }catch (Exception e){ + // 第一个false 是 不批量签收;第二个false 是 不重回队列 + logger.error(e.getMessage()); + channel.basicNack(deliveryTag,false,false); + return; + } + + + + } + + + public static void main(String[] args) { + //localhost:8001 + + + MQMessage mqMessage = new MQMessage(); + + Header header = new Header("Request","Execute","QUERYEQPStatus","12"); + QueryEQStatusBody queryEQStatusBody = new QueryEQStatusBody(); + queryEQStatusBody.setVidType("u4"); + List vids = new ArrayList<>(); + vids.add("10000"); + vids.add("10001"); + vids.add("10002"); + queryEQStatusBody.setVidList(vids); + + //queryEQStatusBodys => {"vidList":["10000","10001","10002"],"vidType":"u4"} + //bytes => [123, 34, 118, 105, 100, 76, 105, 115, 116, 34, 58, 91, 34, 49, 48, 48, 48, 48, 34, 44, 34, 49, 48, 48, 48, 49, 34, 44, 34, 49, 48, 48, 48, 50, 34, 93, 44, 34, 118, 105, 100, 84, 121, 112, 101, 34, 58, 34, 117, 52, 34, 125] + //"body": "eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==" + //message => {"header":{"transactionId":"12_20210908141555_d7d88","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 14:15:56","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} + //new String(bytes) : {"vidType":"u4","vidList":["10000","10001","10002"]} + String queryEQStatusBodys = JSONObject.toJSONString(queryEQStatusBody); + + + byte[] bytes = JSONObject.toJSONBytes(queryEQStatusBody); + System.out.println("myString :"+ new String(bytes)); + mqMessage.setBody(queryEQStatusBodys); + mqMessage.setHeader(header); + + //{"header":{"transactionId":"12_20210908094750_d62d4","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 09:47:50","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} + String s = JSONObject.toJSONString(mqMessage); + MQMessage mqMessagessss = JSONObject.parseObject(s.getBytes(StandardCharsets.UTF_8), MQMessage.class); + System.out.println("mqMessagessss : " + mqMessagessss.toString()); + System.out.println(s); + //String result= HttpUtil.post("localhost:8001/receivedFromEapRequest",s); + + //{"header":{"transactionId":"PIDPID10_20210908093729_95196","messageType":"Request","messageCategory":"Alarm","messageName":"AlarmTimeOut","from":"EAP","to":"MES","equipmentId":"PIDPID10","sendTimestamp":"2021-09-08 09:37:29"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} + String s22 = "{\"header\":{\"transactionId\":\"PIDPID10_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID10\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":\"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==\"}"; + + String s2 = "{\"header\":{\"transactionId\":\"PIDPID10_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID10\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":{\"TransactionId:\":\"PIDPID10_20210908093644_71835\",\"MessageName:\":\"TraceData\"}}"; + JSONObject jsonObject = JSON.parseObject(s2); + String header1 = jsonObject.getString("header"); + String body = jsonObject.getString("body"); + String returns = jsonObject.getString("returns"); + System.out.println("header: "+header1); + System.out.println("body: "+body); + System.out.println("returns: "+returns); + + byte[] bytes2 = s2.getBytes(); + MQMessage mqMessage2 = JSONObject.parseObject(bytes2, MQMessage.class); + System.out.println(mqMessage2.toString()); + } + + @RabbitListener(queues = ConfigOfPID10.MES_RESPONSE_QUEUE_PID10) + @RabbitHandler + public void mesResponse(Message message, Channel channel)throws Exception{ + logger.info("==============PIDPID10_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1")); + Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); + + //模拟异常 ,然后 拒签消息 ,然后丢到死信队列 + try { + MQMessage mqMessage = CommonFunction.parse(message); + String transactionId = mqMessage.getHeader().getTransactionId(); + Integer result = mqMessageHandler.responseHandler(message); + if(result == 1){ + logger.info("在 MES_RESPONSE_QUEUE_PID10 队列中 , transitionId"+transactionId+" 这条消息处理成功"); + channel.basicAck(deliveryTag,false); + }else { + logger.error("在 MES_RESPONSE_QUEUE_PID10 队列中 ,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); + channel.basicNack(deliveryTag,false,false); + } + + //throw new Exception("11111"); + }catch (Exception e){ + // 第一个false 是 不批量签收;第二个false 是 不重回队列 + channel.basicNack(deliveryTag,false,false); + return; + } + + //注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 + //channel.basicAck(deliveryTag,false); + + } + +} diff --git a/src/main/java/com/qgs/dc/mq/consumer/PID2Received.java b/src/main/java/com/qgs/dc/mq/consumer/PID2Received.java new file mode 100644 index 0000000..6f102f8 --- /dev/null +++ b/src/main/java/com/qgs/dc/mq/consumer/PID2Received.java @@ -0,0 +1,170 @@ +package com.qgs.dc.mq.consumer; + +import cn.hutool.http.HttpUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.qgs.dc.common.utils.CommonFunction; +import com.qgs.dc.mq.Constant.Constant; +import com.qgs.dc.mq.configuration.ConfigOfPID2; +import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; +import com.qgs.dc.mq.entity.MQMessage; +import com.qgs.dc.mq.entity.common.Header; +import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody; +import com.rabbitmq.client.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +/** + * @Desc: "PIDPID2设备 接收MQ消息 监听类" + * @Author: caixiang + * @DATE: 2021/6/22 15:30 + * + * Ctrl+R 替换设备名 + */ +@Component +public class PID2Received { + + private static final Logger logger = LoggerFactory.getLogger(PID2Received.class); + + + + @Autowired + MQMessageHandler mqMessageHandler; + + @RabbitListener(queues = ConfigOfPID2.EAP_REQUEST_QUEUE_PID2) + @RabbitHandler + public void eapRequest(Message message, Channel channel)throws Exception{ + + logger.info("==============received message-EAP_REQUEST_QUEUE_PID2=================,priority:"+"equipmentName"+message.getHeaders().get("attr2")); + + Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); +// //MQMessage 中的 transactionId +// String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation"); +// //logger.info("transactionId:"+transactionId); + + try { + + MQMessage mqMessage = CommonFunction.parse(message); + String transactionId = mqMessage.getHeader().getTransactionId(); + //1. 正常情况 + //Integer integer = mqMessageHandler.requestHandler(message); + String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage)); + Integer result = Integer.valueOf(integer); + + //注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 + if(result == 1){ + logger.info("在 EAP_REQUEST_QUEUE_PID2 队列中,transitionId"+transactionId+", 这条消息处理成功"); + channel.basicAck(deliveryTag,false); + }else { + logger.error("在 EAP_REQUEST_QUEUE_PID2 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); + channel.basicNack(deliveryTag,false,false); + } + //2.模拟异常 ,然后 拒签消息 ,然后丢到死信队列 + + //throw new Exception("11111"); + }catch (Exception e){ + // 第一个false 是 不批量签收;第二个false 是 不重回队列 + logger.error(e.getMessage()); + channel.basicNack(deliveryTag,false,false); + return; + } + + + + } + + + public static void main(String[] args) { + //localhost:8001 + + + MQMessage mqMessage = new MQMessage(); + + Header header = new Header("Request","Execute","QUERYEQPStatus","12"); + QueryEQStatusBody queryEQStatusBody = new QueryEQStatusBody(); + queryEQStatusBody.setVidType("u4"); + List vids = new ArrayList<>(); + vids.add("10000"); + vids.add("10001"); + vids.add("10002"); + queryEQStatusBody.setVidList(vids); + + //queryEQStatusBodys => {"vidList":["10000","10001","10002"],"vidType":"u4"} + //bytes => [123, 34, 118, 105, 100, 76, 105, 115, 116, 34, 58, 91, 34, 49, 48, 48, 48, 48, 34, 44, 34, 49, 48, 48, 48, 49, 34, 44, 34, 49, 48, 48, 48, 50, 34, 93, 44, 34, 118, 105, 100, 84, 121, 112, 101, 34, 58, 34, 117, 52, 34, 125] + //"body": "eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==" + //message => {"header":{"transactionId":"12_20210908141555_d7d88","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 14:15:56","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} + //new String(bytes) : {"vidType":"u4","vidList":["10000","10001","10002"]} + String queryEQStatusBodys = JSONObject.toJSONString(queryEQStatusBody); + + + byte[] bytes = JSONObject.toJSONBytes(queryEQStatusBody); + System.out.println("myString :"+ new String(bytes)); + mqMessage.setBody(queryEQStatusBodys); + mqMessage.setHeader(header); + + //{"header":{"transactionId":"12_20210908094750_d62d4","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 09:47:50","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} + String s = JSONObject.toJSONString(mqMessage); + MQMessage mqMessagessss = JSONObject.parseObject(s.getBytes(StandardCharsets.UTF_8), MQMessage.class); + System.out.println("mqMessagessss : " + mqMessagessss.toString()); + System.out.println(s); + //String result= HttpUtil.post("localhost:8001/receivedFromEapRequest",s); + + //{"header":{"transactionId":"PIDPID2_20210908093729_95196","messageType":"Request","messageCategory":"Alarm","messageName":"AlarmTimeOut","from":"EAP","to":"MES","equipmentId":"PIDPID2","sendTimestamp":"2021-09-08 09:37:29"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} + String s22 = "{\"header\":{\"transactionId\":\"PIDPID2_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID2\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":\"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==\"}"; + + String s2 = "{\"header\":{\"transactionId\":\"PIDPID2_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID2\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":{\"TransactionId:\":\"PIDPID2_20210908093644_71835\",\"MessageName:\":\"TraceData\"}}"; + JSONObject jsonObject = JSON.parseObject(s2); + String header1 = jsonObject.getString("header"); + String body = jsonObject.getString("body"); + String returns = jsonObject.getString("returns"); + System.out.println("header: "+header1); + System.out.println("body: "+body); + System.out.println("returns: "+returns); + + byte[] bytes2 = s2.getBytes(); + MQMessage mqMessage2 = JSONObject.parseObject(bytes2, MQMessage.class); + System.out.println(mqMessage2.toString()); + } + + @RabbitListener(queues = ConfigOfPID2.MES_RESPONSE_QUEUE_PID2) + @RabbitHandler + public void mesResponse(Message message, Channel channel)throws Exception{ + logger.info("==============PIDPID2_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1")); + Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); + + //模拟异常 ,然后 拒签消息 ,然后丢到死信队列 + try { + MQMessage mqMessage = CommonFunction.parse(message); + String transactionId = mqMessage.getHeader().getTransactionId(); + Integer result = mqMessageHandler.responseHandler(message); + if(result == 1){ + logger.info("在 MES_RESPONSE_QUEUE_PID2 队列中 , transitionId"+transactionId+" 这条消息处理成功"); + channel.basicAck(deliveryTag,false); + }else { + logger.error("在 MES_RESPONSE_QUEUE_PID2 队列中 ,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); + channel.basicNack(deliveryTag,false,false); + } + + //throw new Exception("11111"); + }catch (Exception e){ + // 第一个false 是 不批量签收;第二个false 是 不重回队列 + channel.basicNack(deliveryTag,false,false); + return; + } + + //注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 + //channel.basicAck(deliveryTag,false); + + } + +}