Conflicts:
	src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID10_1.java
	src/main/java/com/qgs/dc/mq/consumer/PID10_1Received.java
This commit is contained in:
于连琛 2022-11-17 09:50:03 +08:00
commit 84b17fd995
16 changed files with 518 additions and 64 deletions

View File

@ -1,5 +1,6 @@
package com.qgs.dc.common.utils; package com.qgs.dc.common.utils;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.qgs.dc.mq.entity.MQMessage; import com.qgs.dc.mq.entity.MQMessage;
@ -11,6 +12,9 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.*;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.Arrays; import java.util.Arrays;
@ -153,12 +157,97 @@ public class CommonFunction {
*/ */
public static MQMessage parse(Message<?> message){ public static MQMessage parse(Message<?> message){
byte[] bytes =(byte[]) message.getPayload(); byte[] bytes =(byte[]) message.getPayload();
String s = new String(bytes);
MQMessage mqMessage = JSONObject.parseObject(bytes, MQMessage.class); MQMessage mqMessage = JSONObject.parseObject(bytes, MQMessage.class);
return mqMessage; return mqMessage;
} }
public static String unescapeJava(String str) throws IOException
{
Writer out = new StringWriter();
if (str != null)
{
int sz = str.length();
StringBuilder unicode = new StringBuilder(4);
boolean hadSlash = false;
boolean inUnicode = false;
for (int i = 0; i < sz; ++i)
{
char ch = str.charAt(i);
if (inUnicode)
{
unicode.append(ch);
if (unicode.length() == 4)
{
try
{
int nfe = Integer.parseInt(unicode.toString(), 16);
out.write((char) nfe);
unicode.setLength(0);
inUnicode = false;
hadSlash = false;
}
catch (NumberFormatException var9)
{
}
}
}
else if (hadSlash)
{
hadSlash = false;
switch (ch)
{
case '\"':
out.write(34);
break;
case '\'':
out.write(39);
break;
case '\\':
out.write(92);
break;
case 'b':
out.write(8);
break;
case 'f':
out.write(12);
break;
case 'n':
out.write(10);
break;
case 'r':
out.write(13);
break;
case 't':
out.write(9);
break;
case 'u':
inUnicode = true;
break;
default:
out.write(ch);
}
}
else if (ch == 92)
{
hadSlash = true;
}
else
{
out.write(ch);
}
}
if (hadSlash)
{
out.write(92);
}
}
return out.toString();
}
/** /**
* 解析body * 解析body

View File

@ -0,0 +1,98 @@
package com.qgs.dc.mq.configuration;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Desc: "设备PID2 相关信息定义"
* @Author: caixiang
* @DATE: 2021/6/7 9:11
*/
@Configuration
public class ConfigOfPID2 {
//水平扩展其他设备的时候 只要control+R 然后 PID2=>00C 然后replace all
public static final String EQUIPMENT_NAME_PID2 = "PID2";
public static final String EXCHANGE_NAME_PID2 = EQUIPMENT_NAME_PID2 +"_Exchange";
public static final String EAP_REQUEST_QUEUE_PID2 = EQUIPMENT_NAME_PID2 +"_EAP_Request_Queue";
public static final String EAP_RESPONSE_QUEUE_PID2 = EQUIPMENT_NAME_PID2 +"_EAP_Response_Queue";
public static final String MES_REQUEST_QUEUE_PID2 = EQUIPMENT_NAME_PID2 +"_MES_Request_Queue";
public static final String MES_RESPONSE_QUEUE_PID2 = EQUIPMENT_NAME_PID2 +"_MES_Response_Queue";
public static final String EAP_REQUEST_QUEUE_ROUTINGKEY_PID2 = EQUIPMENT_NAME_PID2 +"_EAP_Request_Queue_RoutingKey";
public static final String EAP_RESPONSE_QUEUE_ROUTINGKEY_PID2 = EQUIPMENT_NAME_PID2 +"_EAP_Response_Queue_RoutingKey";
public static final String MES_REQUEST_QUEUE_ROUTINGKEY_PID2 = EQUIPMENT_NAME_PID2 +"_MES_Request_Queue_RoutingKey";
public static final String MES_RESPONSE_QUEUE_ROUTINGKEY_PID2 = EQUIPMENT_NAME_PID2 +"_MES_Response_Queue_RoutingKey";
@Bean
public DirectExchange EXCHANGE_NAME_PID2(){
return new DirectExchange(EXCHANGE_NAME_PID2);
}
//todo
@Bean
public Queue MES_REQUEST_QUEUE_PID2(){
Queue queue = new Queue(MES_REQUEST_QUEUE_PID2);
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
return queue;
}
@Bean
public Queue MES_RESPONSE_QUEUE_PID2(){
Queue queue = new Queue(MES_RESPONSE_QUEUE_PID2);
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
return queue;
}
@Bean
public Queue EAP_REQUEST_QUEUE_PID2(){
Queue queue = new Queue(EAP_REQUEST_QUEUE_PID2);
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
return queue;
}
@Bean
public Queue EAP_RESPONSE_QUEUE_PID2(){
Queue queue = new Queue(EAP_RESPONSE_QUEUE_PID2);
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
return queue;
}
@Bean
public Binding bindExchangeAndQueueA_PID2(){
return BindingBuilder.bind(EAP_REQUEST_QUEUE_PID2()).to(EXCHANGE_NAME_PID2())
.with(EAP_REQUEST_QUEUE_ROUTINGKEY_PID2);
}
@Bean
public Binding bindExchangeAndQueueB_PID2(){
return BindingBuilder.bind(EAP_RESPONSE_QUEUE_PID2()).to(EXCHANGE_NAME_PID2())
.with(EAP_RESPONSE_QUEUE_ROUTINGKEY_PID2);
}
@Bean
public Binding bindExchangeAndQueueC_PID2(){
return BindingBuilder.bind(MES_REQUEST_QUEUE_PID2()).to(EXCHANGE_NAME_PID2())
.with(MES_REQUEST_QUEUE_ROUTINGKEY_PID2);
}
@Bean
public Binding bindExchangeAndQueueD_PID2(){
return BindingBuilder.bind(MES_RESPONSE_QUEUE_PID2()).to(EXCHANGE_NAME_PID2())
.with(MES_RESPONSE_QUEUE_ROUTINGKEY_PID2);
}
}

View File

@ -45,6 +45,7 @@ public class PID00BReceived {
MQMessageHandler mqMessageHandler; MQMessageHandler mqMessageHandler;
//这个队列监听EAP 推送过来的Event,然后dc 把这个Event 转发给Mes //这个队列监听EAP 推送过来的Event,然后dc 把这个Event 转发给Mes
//need //need
@RabbitListener(queues = ConfigOf00B.EAP_REQUEST_QUEUE_00B) @RabbitListener(queues = ConfigOf00B.EAP_REQUEST_QUEUE_00B)
@ -77,7 +78,6 @@ public class PID00BReceived {
} }
//2.模拟异常 然后 拒签消息 然后丢到死信队列 //2.模拟异常 然后 拒签消息 然后丢到死信队列
//throw new Exception("11111");
}catch (Exception e){ }catch (Exception e){
// 第一个false 不批量签收第二个false 不重回队列 // 第一个false 不批量签收第二个false 不重回队列
logger.error(e.getMessage()); logger.error(e.getMessage());
@ -143,6 +143,8 @@ public class PID00BReceived {
} }
//这种情况指的是 MES发出远程指令然后eap处理完后把结果丢到mesResponse队列 通知MES进行收尾处理 //这种情况指的是 MES发出远程指令然后eap处理完后把结果丢到mesResponse队列 通知MES进行收尾处理
@RabbitHandler @RabbitHandler
public void mesResponse(Message<?> message, Channel channel)throws Exception{ public void mesResponse(Message<?> message, Channel channel)throws Exception{

View File

@ -9,8 +9,11 @@ import com.qgs.dc.mq.configuration.ConfigOfPID10_1;
import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler;
import com.qgs.dc.mq.entity.MQMessage; import com.qgs.dc.mq.entity.MQMessage;
import com.qgs.dc.mq.entity.common.Header; import com.qgs.dc.mq.entity.common.Header;
import com.qgs.dc.mq.entity.common.Returns;
import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody; import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody;
import com.qgs.dc.mq.producer.component.RabbitSender;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Return;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitHandler;
@ -25,18 +28,19 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
/** /**
* @Desc: "PIDPID10_1设备 接收MQ消息 监听类" * @Desc: "PID10_1设备 接收MQ消息 监听类"
* @Author: caixiang * @Author: caixiang
* @DATE: 2021/6/22 15:30 * @DATE: 2021/6/22 15:30
* *
* Ctrl+R 替换设备名 * Ctrl+R 替换设备名
*/ */
@Component @Component
public class PID10_1Received { public class PID10_1Received{
private static final Logger logger = LoggerFactory.getLogger(PID10_1Received.class); private static final Logger logger = LoggerFactory.getLogger(PID10_1Received.class);
@Autowired
RabbitSender rabbitSender;
@Autowired @Autowired
MQMessageHandler mqMessageHandler; MQMessageHandler mqMessageHandler;
@ -45,10 +49,10 @@ public class PID10_1Received {
@RabbitHandler @RabbitHandler
public void eapRequest(Message<?> message, Channel channel)throws Exception{ public void eapRequest(Message<?> message, Channel channel)throws Exception{
logger.info("==============received message-EAP_REQUEST_QUEUE_PID10_1=================,priority:"+"equipmentName"+message.getHeaders().get("attr2")); logger.info("==============received message-EAP_REQUEST_QUEUE_PID10_1=================");
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
// //MQMessage 中的 transactionId //MQMessage 中的 transactionId
// String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation"); // String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation");
// //logger.info("transactionId:"+transactionId); // //logger.info("transactionId:"+transactionId);
@ -56,22 +60,29 @@ public class PID10_1Received {
MQMessage mqMessage = CommonFunction.parse(message); MQMessage mqMessage = CommonFunction.parse(message);
String transactionId = mqMessage.getHeader().getTransactionId(); String transactionId = mqMessage.getHeader().getTransactionId();
//1. 正常情况 System.out.println("收到的消息是: "+mqMessage.toString());
//Integer integer = mqMessageHandler.requestHandler(message); System.out.println();
String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage)); //todo reply
Integer result = Integer.valueOf(integer); reply(mqMessage);
//注意 这里特别注意 已经拒收的消息 再签收是要出错的这里要特别注意 // //回调给MES --- 开始
if(result == 1){ // //1. 正常情况
logger.info("在 EAP_REQUEST_QUEUE_PID10_1 队列中,transitionId"+transactionId+", 这条消息处理成功"); // String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage));
channel.basicAck(deliveryTag,false); // Integer result = Integer.valueOf(integer);
}else { //
logger.error("在 EAP_REQUEST_QUEUE_PID10_1 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); // //注意 这里特别注意 已经拒收的消息 再签收是要出错的这里要特别注意
channel.basicNack(deliveryTag,false,false); // if(result == 1){
} // logger.info("在 EAP_REQUEST_QUEUE_PID4B 队列中,transitionId"+transactionId+", 这条消息处理成功");
//2.模拟异常 然后 拒签消息 然后丢到死信队列 // channel.basicAck(deliveryTag,false);
// }else {
// logger.error("在 EAP_REQUEST_QUEUE_PID4B 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列");
// channel.basicNack(deliveryTag,false,false);
// }
// //回调给MES --- 结束
//throw new Exception("11111");
channel.basicAck(deliveryTag,false);
}catch (Exception e){ }catch (Exception e){
// 第一个false 不批量签收第二个false 不重回队列 // 第一个false 不批量签收第二个false 不重回队列
logger.error(e.getMessage()); logger.error(e.getMessage());
@ -79,10 +90,57 @@ public class PID10_1Received {
return; return;
} }
} }
public void reply(MQMessage request) throws Exception {
MQMessage response = new MQMessage();
Header header = request.getHeader();
header.setFrom("mes");
header.setTo("eap");
header.setMessageType("Response");
header.setSendTimestamp(CommonFunction.getNowDate(1));
Returns aReturn = new Returns("0","0");
response.setHeader(header);
response.setReturns(aReturn);
rabbitSender.sendDirect(response,ConfigOfPID10_1.EXCHANGE_NAME_PID10_1,ConfigOfPID10_1.EAP_RESPONSE_QUEUE_ROUTINGKEY_PID10_1);
System.out.println("回复的消息是: "+response.toString());
}
// @RabbitListener(queues = ConfigOfPID10_1.MES_REQUEST_QUEUE_PID10_1)
// @RabbitHandler
// public void mesRequest(Message<?> message, Channel channel)throws Exception{
//
// logger.info("==============received message-EAP_REQUEST_QUEUE_PID10_1=================");
//
// Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
// //MQMessage 中的 transactionId
//// String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation");
//// //logger.info("transactionId:"+transactionId);
//
// try {
// System.out.println("message : "+message.toString());
// MQMessage mqMessage = CommonFunction.parse(message);
// String transactionId = mqMessage.getHeader().getTransactionId();
// System.out.println("收到的消息是: "+mqMessage.toString());
// System.out.println();
// logger.info(mqMessage.toString());
// channel.basicAck(deliveryTag,false);
// System.out.println(mqMessage.toString());
// }catch (Exception e){
// // 第一个false 不批量签收第二个false 不重回队列
// logger.error(e.getMessage());
// channel.basicNack(deliveryTag,false,false);
// return;
// }
//
// }
public static void main(String[] args) { public static void main(String[] args) {
//localhost:8001 //localhost:8001

View File

@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.qgs.dc.common.utils.CommonFunction; import com.qgs.dc.common.utils.CommonFunction;
import com.qgs.dc.mq.Constant.Constant; import com.qgs.dc.mq.Constant.Constant;
import com.qgs.dc.mq.configuration.ConfigOfPID13S;
import com.qgs.dc.mq.configuration.ConfigOfPID18; import com.qgs.dc.mq.configuration.ConfigOfPID18;
import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler;
import com.qgs.dc.mq.entity.MQMessage; import com.qgs.dc.mq.entity.MQMessage;
@ -61,6 +62,8 @@ public class PID18Received {
String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage)); String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage));
Integer result = Integer.valueOf(integer); Integer result = Integer.valueOf(integer);
//注意 这里特别注意 已经拒收的消息 再签收是要出错的这里要特别注意 //注意 这里特别注意 已经拒收的消息 再签收是要出错的这里要特别注意
if(result == 1){ if(result == 1){
logger.info("在 EAP_REQUEST_QUEUE_PID18 队列中,transitionId"+transactionId+", 这条消息处理成功"); logger.info("在 EAP_REQUEST_QUEUE_PID18 队列中,transitionId"+transactionId+", 这条消息处理成功");

View File

@ -0,0 +1,167 @@
package com.qgs.dc.mq.consumer;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.qgs.dc.common.utils.CommonFunction;
import com.qgs.dc.mq.Constant.Constant;
import com.qgs.dc.mq.configuration.ConfigOfPID2;
import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler;
import com.qgs.dc.mq.entity.MQMessage;
import com.qgs.dc.mq.entity.common.Header;
import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
/**
* @Desc: "PID2设备 接收MQ消息 监听类"
* @Author: caixiang
* @DATE: 2021/6/22 15:30
*
* Ctrl+R 替换设备名
*/
@Component
public class PID2Received {
private static final Logger logger = LoggerFactory.getLogger(PID2Received.class);
@Autowired
MQMessageHandler mqMessageHandler;
@RabbitListener(queues = ConfigOfPID2.EAP_REQUEST_QUEUE_PID2)
@RabbitHandler
public void eapRequest(Message<?> message, Channel channel)throws Exception{
logger.info("==============received message-EAP_REQUEST_QUEUE_PID2=================");
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//MQMessage 中的 transactionId
// String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation");
// //logger.info("transactionId:"+transactionId);
try {
MQMessage mqMessage = CommonFunction.parse(message);
String transactionId = mqMessage.getHeader().getTransactionId();
//1. 正常情况
String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage));
Integer result = Integer.valueOf(integer);
//注意 这里特别注意 已经拒收的消息 再签收是要出错的这里要特别注意
if(result == 1){
logger.info("在 EAP_REQUEST_QUEUE_PID2 队列中,transitionId"+transactionId+", 这条消息处理成功");
channel.basicAck(deliveryTag,false);
}else {
logger.error("在 EAP_REQUEST_QUEUE_PID2 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列");
channel.basicNack(deliveryTag,false,false);
}
//2.模拟异常 然后 拒签消息 然后丢到死信队列
}catch (Exception e){
// 第一个false 不批量签收第二个false 不重回队列
logger.error(e.getMessage());
channel.basicNack(deliveryTag,false,false);
return;
}
}
public static void main(String[] args) {
//localhost:8001
MQMessage mqMessage = new MQMessage();
Header header = new Header("Request","Execute","QUERYEQPStatus","12");
QueryEQStatusBody queryEQStatusBody = new QueryEQStatusBody();
queryEQStatusBody.setVidType("u4");
List<String> vids = new ArrayList<>();
vids.add("10000");
vids.add("10001");
vids.add("10002");
queryEQStatusBody.setVidList(vids);
//queryEQStatusBodys => {"vidList":["10000","10001","10002"],"vidType":"u4"}
//bytes => [123, 34, 118, 105, 100, 76, 105, 115, 116, 34, 58, 91, 34, 49, 48, 48, 48, 48, 34, 44, 34, 49, 48, 48, 48, 49, 34, 44, 34, 49, 48, 48, 48, 50, 34, 93, 44, 34, 118, 105, 100, 84, 121, 112, 101, 34, 58, 34, 117, 52, 34, 125]
//"body": "eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="
//message => {"header":{"transactionId":"12_20210908141555_d7d88","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 14:15:56","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
//new String(bytes) : {"vidType":"u4","vidList":["10000","10001","10002"]}
String queryEQStatusBodys = JSONObject.toJSONString(queryEQStatusBody);
byte[] bytes = JSONObject.toJSONBytes(queryEQStatusBody);
System.out.println("myString :"+ new String(bytes));
mqMessage.setBody(queryEQStatusBodys);
mqMessage.setHeader(header);
//{"header":{"transactionId":"12_20210908094750_d62d4","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 09:47:50","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
String s = JSONObject.toJSONString(mqMessage);
MQMessage mqMessagessss = JSONObject.parseObject(s.getBytes(StandardCharsets.UTF_8), MQMessage.class);
System.out.println("mqMessagessss : " + mqMessagessss.toString());
System.out.println(s);
//String result= HttpUtil.post("localhost:8001/receivedFromEapRequest",s);
//{"header":{"transactionId":"PIDPID2_20210908093729_95196","messageType":"Request","messageCategory":"Alarm","messageName":"AlarmTimeOut","from":"EAP","to":"MES","equipmentId":"PIDPID2","sendTimestamp":"2021-09-08 09:37:29"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
String s22 = "{\"header\":{\"transactionId\":\"PIDPID2_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID2\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":\"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==\"}";
String s2 = "{\"header\":{\"transactionId\":\"PIDPID2_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID2\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":{\"TransactionId:\":\"PIDPID2_20210908093644_71835\",\"MessageName:\":\"TraceData\"}}";
JSONObject jsonObject = JSON.parseObject(s2);
String header1 = jsonObject.getString("header");
String body = jsonObject.getString("body");
String returns = jsonObject.getString("returns");
System.out.println("header: "+header1);
System.out.println("body: "+body);
System.out.println("returns: "+returns);
byte[] bytes2 = s2.getBytes();
MQMessage mqMessage2 = JSONObject.parseObject(bytes2, MQMessage.class);
System.out.println(mqMessage2.toString());
}
@RabbitListener(queues = ConfigOfPID2.MES_RESPONSE_QUEUE_PID2)
@RabbitHandler
public void mesResponse(Message<?> message, Channel channel)throws Exception{
logger.info("==============PIDPID2_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1"));
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//模拟异常 然后 拒签消息 然后丢到死信队列
try {
MQMessage mqMessage = CommonFunction.parse(message);
String transactionId = mqMessage.getHeader().getTransactionId();
Integer result = mqMessageHandler.responseHandler(message);
if(result == 1){
logger.info("在 MES_RESPONSE_QUEUE_PID2 队列中 , transitionId"+transactionId+" 这条消息处理成功");
channel.basicAck(deliveryTag,false);
}else {
logger.error("在 MES_RESPONSE_QUEUE_PID2 队列中 ,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列");
channel.basicNack(deliveryTag,false,false);
}
//throw new Exception("11111");
}catch (Exception e){
// 第一个false 不批量签收第二个false 不重回队列
channel.basicNack(deliveryTag,false,false);
return;
}
//注意 这里特别注意 已经拒收的消息 再签收是要出错的这里要特别注意
//channel.basicAck(deliveryTag,false);
}
}

View File

@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.qgs.dc.common.utils.CommonFunction; import com.qgs.dc.common.utils.CommonFunction;
import com.qgs.dc.mq.Constant.Constant; import com.qgs.dc.mq.Constant.Constant;
import com.qgs.dc.mq.configuration.ConfigOfPID13S;
import com.qgs.dc.mq.configuration.ConfigOfPID4B; import com.qgs.dc.mq.configuration.ConfigOfPID4B;
import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler;
import com.qgs.dc.mq.entity.MQMessage; import com.qgs.dc.mq.entity.MQMessage;

View File

@ -68,12 +68,7 @@ public class MQController {
String routingKey = callbackMessageEntity.getRoutingKey(); String routingKey = callbackMessageEntity.getRoutingKey();
logger.info("MES => EAP (MES_Request), 状态DC已收到 , 内容:"+ callbackMessageEntity.toString()); logger.info("MES => EAP (MES_Request), 状态DC已收到 , 内容:"+ callbackMessageEntity.toString());
//properties 这里的参数是写在MQ消息 header里面的如果EAP端 需要某些参数 可以写在这里eap去取更方便一些 rabbitSender.sendDirect(mqMessage,exchangeName,routingKey);
Map<String,Object> properties = new HashMap<>();
properties.put("equipmentName","PID001");
properties.put("transitionId",transitionId);
//rabbitSender.sendDirect(mqMessage,properties,exchangeName,routingKey,"6000");
rabbitSender.sendDirect(mqMessage,properties,exchangeName,routingKey);
logger.info("MES => EAP (MES_Request) , 状态DC已发送给MQ , 内容:"+ callbackMessageEntity.toString()); logger.info("MES => EAP (MES_Request) , 状态DC已发送给MQ , 内容:"+ callbackMessageEntity.toString());
AsyncFuture<MQMessage> add = SendedList.add(transitionId,mqMessage); AsyncFuture<MQMessage> add = SendedList.add(transitionId,mqMessage);
@ -84,4 +79,30 @@ public class MQController {
return R.error(2,e.getMessage()); return R.error(2,e.getMessage());
} }
} }
@PostMapping("/mesRequestNoW")
/**
* desc : MES给EAP发送远程指令MES_Request向rabbitmq中发送消息direct模式
* MES => DC(数据采集中间件) => MQ => EAP
* return :返回 的就是这个指令的回复指令
* 1 = 成功
* 其他 = 异常
*/
public R mesRequestNoW(@RequestBody CallbackMessageEntity callbackMessageEntity){
try {
String transitionId = callbackMessageEntity.getMqMessage().getHeader().getTransactionId();
MQMessage mqMessage = callbackMessageEntity.getMqMessage();
String exchangeName = callbackMessageEntity.getExchangeName();
String routingKey = callbackMessageEntity.getRoutingKey();
logger.info("MES => EAP (MES_Request), 状态DC已收到 , 内容:"+ callbackMessageEntity.toString());
rabbitSender.sendDirect(mqMessage,exchangeName,routingKey);
logger.info("MES => EAP (MES_Request) , 状态DC已发送给MQ , 内容:"+ callbackMessageEntity.toString());
return R.ok();
}catch (Exception e){
logger.error("MES => EAP (MES_Request) , 状态DC处理异常 , 内容:"+ callbackMessageEntity.toString());
return R.error(2,e.getMessage());
}
}
} }

View File

@ -1,6 +1,7 @@
package com.qgs.dc.mq.entity; package com.qgs.dc.mq.entity;
import cn.hutool.http.HttpUtil; import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.annotation.JSONField;
import com.alibaba.fastjson.annotation.JSONType; import com.alibaba.fastjson.annotation.JSONType;
import com.qgs.dc.mq.entity.common.Header; import com.qgs.dc.mq.entity.common.Header;
import com.qgs.dc.mq.entity.common.Returns; import com.qgs.dc.mq.entity.common.Returns;
@ -14,9 +15,14 @@ import java.util.HashMap;
* @DATE: 2021/8/12 15:38 * @DATE: 2021/8/12 15:38
*/ */
@Data @Data
@JSONType(orders={"header","body","returns"}) @JSONType(orders={"header","body","return"})
public class MQMessage { public class MQMessage {
private Header header; private Header header;
@JSONField(jsonDirect = true)
private String body; private String body;
//在序列化和反序列化 的时候 都会把 json 里面的return字段转成 object 里面的returns字段
@JSONField(name = "return")
private Returns returns; private Returns returns;
} }

View File

@ -37,6 +37,7 @@ public class Header {
this.to = "eap"; this.to = "eap";
} }
public String getTransactionId() { public String getTransactionId() {
return transactionId; return transactionId;
} }

View File

@ -1,5 +1,6 @@
package com.qgs.dc.mq.entity.common; package com.qgs.dc.mq.entity.common;
import com.alibaba.fastjson.annotation.JSONField;
import com.alibaba.fastjson.annotation.JSONType; import com.alibaba.fastjson.annotation.JSONType;
import lombok.Data; import lombok.Data;
@ -9,29 +10,15 @@ import lombok.Data;
* @DATE: 2021/8/12 15:35 * @DATE: 2021/8/12 15:35
*/ */
@Data @Data
@JSONType(orders={"returnCode","ReasonCode"}) @JSONType(orders={"ReturnCode","ReasonCode"})
public class Returns { public class Returns {
private String returnCode; @JSONField(name = "ReturnCode")
private String ReturnCode;
@JSONField(name = "ReasonCode")
private String ReasonCode; private String ReasonCode;
public String getReturnCode() {
return returnCode;
}
public void setReturnCode(String returnCode) {
this.returnCode = returnCode;
}
public String getReasonCode() {
return ReasonCode;
}
public void setReasonCode(String reasonCode) {
ReasonCode = reasonCode;
}
public Returns(String returnCode, String reasonCode) { public Returns(String returnCode, String reasonCode) {
this.returnCode = returnCode; ReturnCode = returnCode;
ReasonCode = reasonCode; ReasonCode = reasonCode;
} }
} }

View File

@ -3,6 +3,8 @@ package com.qgs.dc.mq.producer.component;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.qgs.dc.common.utils.CommonFunction; import com.qgs.dc.common.utils.CommonFunction;
import com.qgs.dc.mq.entity.MQMessage; import com.qgs.dc.mq.entity.MQMessage;
import com.qgs.dc.mq.entity.common.Header;
import com.qgs.dc.mq.entity.common.Returns;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -17,6 +19,7 @@ import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.sql.SQLOutput;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.UUID; import java.util.UUID;
@ -190,20 +193,29 @@ public class RabbitSender {
cd cd
); );
} }
public void sendDirect(MQMessage mqMessage, Map<String,Object> properties, String exchange, String routingKey) throws Exception{
public void sendDirect(MQMessage mqMessage, String exchange, String routingKey) throws Exception{
byte[] message = JSONObject.toJSONBytes(mqMessage); byte[] message = JSONObject.toJSONBytes(mqMessage);
MessageHeaders messageHeaders = new MessageHeaders(properties);
Message<?> msg = MessageBuilder.createMessage(message,messageHeaders);
//String transitionId = mqMessage.getHeader().getEquipmentId()+"_"+CommonFunction.getNowDate(2)+"_"+CommonFunction.getUUID(10);
String transitionId = mqMessage.getHeader().getTransactionId(); String transitionId = mqMessage.getHeader().getTransactionId();
CorrelationData cd = new CorrelationData(transitionId); CorrelationData cd = new CorrelationData(transitionId);
rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnsCallback(returnsCallback); rabbitTemplate.setReturnsCallback(returnsCallback);
rabbitTemplate.convertAndSend(exchange, //rabbitTemplate.send 此方法发送的消息 是不含java相关信息的rabbitTemplate.convertAndSend是包含java相关信息的
routingKey, org.springframework.amqp.core.Message mqsMessage = new org.springframework.amqp.core.Message(message);
msg, rabbitTemplate.send(exchange,routingKey,mqsMessage,cd);
cd
); // MessageHeaders messageHeaders = new MessageHeaders(properties);
// Message<?> msg = MessageBuilder.createMessage(message,messageHeaders);Message<?> msg = MessageBuilder.createMessage(message,messageHeaders);
// rabbitTemplate.convertAndSend(exchange,
// routingKey,
// msg,
// cd
// );
} }

View File

@ -20,10 +20,16 @@ public enum DaveArea {
ANALOGOUTPUTS200(7), // System flags of 200 family ANALOGOUTPUTS200(7), // System flags of 200 family
COUNTER(28), // analog inputs of 200 family COUNTER(28), // analog inputs of 200 family
COUNTER200(30), // analog outputs of 200 family COUNTER200(30), // analog outputs of 200 family
DB(0x84), // Peripheral I/O //这个是plc中的不同区有V区input区output区 DB(0x84), // Peripheral I/O //DB块
DI(0x85), FLAGS(0x83), INPUTS(0x81), LOCAL(0x86), // data blocks DI(0x85), //DBI块
OUTPUTS(0x82), // instance data blocks FLAGS(0x83), //M块
P(0x80), // not tested INPUTS(0x81), //I块
T(0x1D), //T块ed
C(0x1C), //C块
LOCAL(0x86), // data blocks //LD块
OUTPUTS(0x82), // instance data blocks //Q块
P(0x80), // not tested //D块
SYSINFO(3), // local of caller SYSINFO(3), // local of caller
SYSTEMFLAGS(5), // S7 counters SYSTEMFLAGS(5), // S7 counters
TIMER(29), // S7 timers TIMER(29), // S7 timers

View File

@ -476,6 +476,8 @@ public final class PDU {
//0x03响应数据Ack-Data响应0x01的请求; //0x03响应数据Ack-Data响应0x01的请求;
//0x07自定义数据Userdata扩展协议类型 //0x07自定义数据Userdata扩展协议类型
this.mem[this.header + 1] = (byte) type; this.mem[this.header + 1] = (byte) type;
//dlen = data length ( 报文中存在两个字节 ) //dlen = data length ( 报文中存在两个字节 )
this.dlen = 0; this.dlen = 0;
//plen = param length ( 报文中存在两个字节 ) //plen = param length ( 报文中存在两个字节 )
@ -608,6 +610,7 @@ public final class PDU {
if (this.mem[this.param] != FUNC_WRITE) { if (this.mem[this.param] != FUNC_WRITE) {
return Nodave.RESULT_UNEXPECTED_FUNC; return Nodave.RESULT_UNEXPECTED_FUNC;
} }
if ((this.mem[this.data] == 255)) { if ((this.mem[this.data] == 255)) {
res = Nodave.RESULT_OK; res = Nodave.RESULT_OK;
} else { } else {

View File

@ -8,7 +8,7 @@ server:
spring: spring:
rabbitmq: rabbitmq:
# 如果是rabbitmq+haproxy+keepalived集群 那么192.168.0.176是haproxy代理的地址严格来说是keepalived的vip # 如果是rabbitmq+haproxy+keepalived集群 那么192.168.0.176是haproxy代理的地址严格来说是keepalived的vip
addresses: 192.168.0.176:5672 # 新版rabbitmq 版本还未测试 addresses: 192.168.0.170:5672 # 新版rabbitmq 版本还未测试
#addresses: 172.16.21.133:5672 #addresses: 172.16.21.133:5672
username: cigs username: cigs
password: cigs password: cigs