This commit is contained in:
caixiang 2022-11-11 15:28:00 +08:00
parent 2cc12991b0
commit ac7f311a43
14 changed files with 707 additions and 22 deletions

View File

@ -1,5 +1,6 @@
package com.qgs.dc.common.utils; package com.qgs.dc.common.utils;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.qgs.dc.mq.entity.MQMessage; import com.qgs.dc.mq.entity.MQMessage;
@ -11,6 +12,9 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.*;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.Arrays; import java.util.Arrays;
@ -153,12 +157,97 @@ public class CommonFunction {
*/ */
public static MQMessage parse(Message<?> message){ public static MQMessage parse(Message<?> message){
byte[] bytes =(byte[]) message.getPayload(); byte[] bytes =(byte[]) message.getPayload();
String s = new String(bytes);
MQMessage mqMessage = JSONObject.parseObject(bytes, MQMessage.class); MQMessage mqMessage = JSONObject.parseObject(bytes, MQMessage.class);
return mqMessage; return mqMessage;
} }
public static String unescapeJava(String str) throws IOException
{
Writer out = new StringWriter();
if (str != null)
{
int sz = str.length();
StringBuilder unicode = new StringBuilder(4);
boolean hadSlash = false;
boolean inUnicode = false;
for (int i = 0; i < sz; ++i)
{
char ch = str.charAt(i);
if (inUnicode)
{
unicode.append(ch);
if (unicode.length() == 4)
{
try
{
int nfe = Integer.parseInt(unicode.toString(), 16);
out.write((char) nfe);
unicode.setLength(0);
inUnicode = false;
hadSlash = false;
}
catch (NumberFormatException var9)
{
}
}
}
else if (hadSlash)
{
hadSlash = false;
switch (ch)
{
case '\"':
out.write(34);
break;
case '\'':
out.write(39);
break;
case '\\':
out.write(92);
break;
case 'b':
out.write(8);
break;
case 'f':
out.write(12);
break;
case 'n':
out.write(10);
break;
case 'r':
out.write(13);
break;
case 't':
out.write(9);
break;
case 'u':
inUnicode = true;
break;
default:
out.write(ch);
}
}
else if (ch == 92)
{
hadSlash = true;
}
else
{
out.write(ch);
}
}
if (hadSlash)
{
out.write(92);
}
}
return out.toString();
}
/** /**
* 解析body * 解析body

View File

@ -0,0 +1,98 @@
package com.qgs.dc.mq.configuration;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Desc: "设备PID10_1 相关信息定义"
* @Author: caixiang
* @DATE: 2021/6/7 9:11
*/
@Configuration
public class ConfigOfPID10_1 {
//水平扩展其他设备的时候 只要control+R 然后 PID10_1=>00C 然后replace all
public static final String EQUIPMENT_NAME_PID10_1 = "PID10_1";
public static final String EXCHANGE_NAME_PID10_1 = EQUIPMENT_NAME_PID10_1 +"_Exchange";
public static final String EAP_REQUEST_QUEUE_PID10_1 = EQUIPMENT_NAME_PID10_1 +"_EAP_Request_Queue";
public static final String EAP_RESPONSE_QUEUE_PID10_1 = EQUIPMENT_NAME_PID10_1 +"_EAP_Response_Queue";
public static final String MES_REQUEST_QUEUE_PID10_1 = EQUIPMENT_NAME_PID10_1 +"_MES_Request_Queue";
public static final String MES_RESPONSE_QUEUE_PID10_1 = EQUIPMENT_NAME_PID10_1 +"_MES_Response_Queue";
public static final String EAP_REQUEST_QUEUE_ROUTINGKEY_PID10_1 = EQUIPMENT_NAME_PID10_1 +"_EAP_Request_Queue_RoutingKey";
public static final String EAP_RESPONSE_QUEUE_ROUTINGKEY_PID10_1 = EQUIPMENT_NAME_PID10_1 +"_EAP_Response_Queue_RoutingKey";
public static final String MES_REQUEST_QUEUE_ROUTINGKEY_PID10_1 = EQUIPMENT_NAME_PID10_1 +"_MES_Request_Queue_RoutingKey";
public static final String MES_RESPONSE_QUEUE_ROUTINGKEY_PID10_1 = EQUIPMENT_NAME_PID10_1 +"_MES_Response_Queue_RoutingKey";
@Bean
public DirectExchange EXCHANGE_NAME_PID10_1(){
return new DirectExchange(EXCHANGE_NAME_PID10_1);
}
//todo
@Bean
public Queue MES_REQUEST_QUEUE_PID10_1(){
Queue queue = new Queue(MES_REQUEST_QUEUE_PID10_1);
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
return queue;
}
@Bean
public Queue MES_RESPONSE_QUEUE_PID10_1(){
Queue queue = new Queue(MES_RESPONSE_QUEUE_PID10_1);
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
return queue;
}
@Bean
public Queue EAP_REQUEST_QUEUE_PID10_1(){
Queue queue = new Queue(EAP_REQUEST_QUEUE_PID10_1);
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
return queue;
}
@Bean
public Queue EAP_RESPONSE_QUEUE_PID10_1(){
Queue queue = new Queue(EAP_RESPONSE_QUEUE_PID10_1);
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
return queue;
}
@Bean
public Binding bindExchangeAndQueueA_PID10_1(){
return BindingBuilder.bind(EAP_REQUEST_QUEUE_PID10_1()).to(EXCHANGE_NAME_PID10_1())
.with(EAP_REQUEST_QUEUE_ROUTINGKEY_PID10_1);
}
@Bean
public Binding bindExchangeAndQueueB_PID10_1(){
return BindingBuilder.bind(EAP_RESPONSE_QUEUE_PID10_1()).to(EXCHANGE_NAME_PID10_1())
.with(EAP_RESPONSE_QUEUE_ROUTINGKEY_PID10_1);
}
@Bean
public Binding bindExchangeAndQueueC_PID10_1(){
return BindingBuilder.bind(MES_REQUEST_QUEUE_PID10_1()).to(EXCHANGE_NAME_PID10_1())
.with(MES_REQUEST_QUEUE_ROUTINGKEY_PID10_1);
}
@Bean
public Binding bindExchangeAndQueueD_PID10_1(){
return BindingBuilder.bind(MES_RESPONSE_QUEUE_PID10_1()).to(EXCHANGE_NAME_PID10_1())
.with(MES_RESPONSE_QUEUE_ROUTINGKEY_PID10_1);
}
}

View File

@ -0,0 +1,98 @@
package com.qgs.dc.mq.configuration;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Desc: "设备PID2 相关信息定义"
* @Author: caixiang
* @DATE: 2021/6/7 9:11
*/
@Configuration
public class ConfigOfPID2 {
//水平扩展其他设备的时候 只要control+R 然后 PID2=>00C 然后replace all
public static final String EQUIPMENT_NAME_PID2 = "PID2";
public static final String EXCHANGE_NAME_PID2 = EQUIPMENT_NAME_PID2 +"_Exchange";
public static final String EAP_REQUEST_QUEUE_PID2 = EQUIPMENT_NAME_PID2 +"_EAP_Request_Queue";
public static final String EAP_RESPONSE_QUEUE_PID2 = EQUIPMENT_NAME_PID2 +"_EAP_Response_Queue";
public static final String MES_REQUEST_QUEUE_PID2 = EQUIPMENT_NAME_PID2 +"_MES_Request_Queue";
public static final String MES_RESPONSE_QUEUE_PID2 = EQUIPMENT_NAME_PID2 +"_MES_Response_Queue";
public static final String EAP_REQUEST_QUEUE_ROUTINGKEY_PID2 = EQUIPMENT_NAME_PID2 +"_EAP_Request_Queue_RoutingKey";
public static final String EAP_RESPONSE_QUEUE_ROUTINGKEY_PID2 = EQUIPMENT_NAME_PID2 +"_EAP_Response_Queue_RoutingKey";
public static final String MES_REQUEST_QUEUE_ROUTINGKEY_PID2 = EQUIPMENT_NAME_PID2 +"_MES_Request_Queue_RoutingKey";
public static final String MES_RESPONSE_QUEUE_ROUTINGKEY_PID2 = EQUIPMENT_NAME_PID2 +"_MES_Response_Queue_RoutingKey";
@Bean
public DirectExchange EXCHANGE_NAME_PID2(){
return new DirectExchange(EXCHANGE_NAME_PID2);
}
//todo
@Bean
public Queue MES_REQUEST_QUEUE_PID2(){
Queue queue = new Queue(MES_REQUEST_QUEUE_PID2);
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
return queue;
}
@Bean
public Queue MES_RESPONSE_QUEUE_PID2(){
Queue queue = new Queue(MES_RESPONSE_QUEUE_PID2);
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
return queue;
}
@Bean
public Queue EAP_REQUEST_QUEUE_PID2(){
Queue queue = new Queue(EAP_REQUEST_QUEUE_PID2);
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
return queue;
}
@Bean
public Queue EAP_RESPONSE_QUEUE_PID2(){
Queue queue = new Queue(EAP_RESPONSE_QUEUE_PID2);
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
return queue;
}
@Bean
public Binding bindExchangeAndQueueA_PID2(){
return BindingBuilder.bind(EAP_REQUEST_QUEUE_PID2()).to(EXCHANGE_NAME_PID2())
.with(EAP_REQUEST_QUEUE_ROUTINGKEY_PID2);
}
@Bean
public Binding bindExchangeAndQueueB_PID2(){
return BindingBuilder.bind(EAP_RESPONSE_QUEUE_PID2()).to(EXCHANGE_NAME_PID2())
.with(EAP_RESPONSE_QUEUE_ROUTINGKEY_PID2);
}
@Bean
public Binding bindExchangeAndQueueC_PID2(){
return BindingBuilder.bind(MES_REQUEST_QUEUE_PID2()).to(EXCHANGE_NAME_PID2())
.with(MES_REQUEST_QUEUE_ROUTINGKEY_PID2);
}
@Bean
public Binding bindExchangeAndQueueD_PID2(){
return BindingBuilder.bind(MES_RESPONSE_QUEUE_PID2()).to(EXCHANGE_NAME_PID2())
.with(MES_RESPONSE_QUEUE_ROUTINGKEY_PID2);
}
}

View File

@ -45,6 +45,7 @@ public class PID00BReceived {
MQMessageHandler mqMessageHandler; MQMessageHandler mqMessageHandler;
//这个队列监听EAP 推送过来的Event,然后dc 把这个Event 转发给Mes //这个队列监听EAP 推送过来的Event,然后dc 把这个Event 转发给Mes
//need //need
@RabbitListener(queues = ConfigOf00B.EAP_REQUEST_QUEUE_00B) @RabbitListener(queues = ConfigOf00B.EAP_REQUEST_QUEUE_00B)
@ -77,7 +78,6 @@ public class PID00BReceived {
} }
//2.模拟异常 然后 拒签消息 然后丢到死信队列 //2.模拟异常 然后 拒签消息 然后丢到死信队列
//throw new Exception("11111");
}catch (Exception e){ }catch (Exception e){
// 第一个false 不批量签收第二个false 不重回队列 // 第一个false 不批量签收第二个false 不重回队列
logger.error(e.getMessage()); logger.error(e.getMessage());
@ -143,6 +143,8 @@ public class PID00BReceived {
} }
//这种情况指的是 MES发出远程指令然后eap处理完后把结果丢到mesResponse队列 通知MES进行收尾处理 //这种情况指的是 MES发出远程指令然后eap处理完后把结果丢到mesResponse队列 通知MES进行收尾处理
@RabbitHandler @RabbitHandler
public void mesResponse(Message<?> message, Channel channel)throws Exception{ public void mesResponse(Message<?> message, Channel channel)throws Exception{

View File

@ -0,0 +1,188 @@
package com.qgs.dc.mq.consumer;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.qgs.dc.common.utils.CommonFunction;
import com.qgs.dc.mq.Constant.Constant;
import com.qgs.dc.mq.configuration.ConfigOfPID10_1;
import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler;
import com.qgs.dc.mq.entity.MQMessage;
import com.qgs.dc.mq.entity.common.Header;
import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
/**
* @Desc: "PID10_1设备 接收MQ消息 监听类"
* @Author: caixiang
* @DATE: 2021/6/22 15:30
*
* Ctrl+R 替换设备名
*/
@Component
public class PID10_1Received {
private static final Logger logger = LoggerFactory.getLogger(PID10_1Received.class);
@Autowired
MQMessageHandler mqMessageHandler;
@RabbitListener(queues = ConfigOfPID10_1.EAP_REQUEST_QUEUE_PID10_1)
@RabbitHandler
public void eapRequest(Message<?> message, Channel channel)throws Exception{
logger.info("==============received message-EAP_REQUEST_QUEUE_PID10_1=================");
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//MQMessage 中的 transactionId
// String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation");
// //logger.info("transactionId:"+transactionId);
try {
MQMessage mqMessage = CommonFunction.parse(message);
String transactionId = mqMessage.getHeader().getTransactionId();
System.out.println("收到的消息是: "+mqMessage.toString());
System.out.println();
logger.info(mqMessage.toString());
channel.basicAck(deliveryTag,false);
System.out.println(mqMessage.toString());
}catch (Exception e){
// 第一个false 不批量签收第二个false 不重回队列
logger.error(e.getMessage());
channel.basicNack(deliveryTag,false,false);
return;
}
}
// @RabbitListener(queues = ConfigOfPID10_1.MES_REQUEST_QUEUE_PID10_1)
// @RabbitHandler
// public void mesRequest(Message<?> message, Channel channel)throws Exception{
//
// logger.info("==============received message-EAP_REQUEST_QUEUE_PID10_1=================");
//
// Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
// //MQMessage 中的 transactionId
//// String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation");
//// //logger.info("transactionId:"+transactionId);
//
// try {
// System.out.println("message : "+message.toString());
// MQMessage mqMessage = CommonFunction.parse(message);
// String transactionId = mqMessage.getHeader().getTransactionId();
// System.out.println("收到的消息是: "+mqMessage.toString());
// System.out.println();
// logger.info(mqMessage.toString());
// channel.basicAck(deliveryTag,false);
// System.out.println(mqMessage.toString());
// }catch (Exception e){
// // 第一个false 不批量签收第二个false 不重回队列
// logger.error(e.getMessage());
// channel.basicNack(deliveryTag,false,false);
// return;
// }
//
// }
public static void main(String[] args) {
//localhost:8001
MQMessage mqMessage = new MQMessage();
Header header = new Header("Request","Execute","QUERYEQPStatus","12");
QueryEQStatusBody queryEQStatusBody = new QueryEQStatusBody();
queryEQStatusBody.setVidType("u4");
List<String> vids = new ArrayList<>();
vids.add("10000");
vids.add("10001");
vids.add("10002");
queryEQStatusBody.setVidList(vids);
//queryEQStatusBodys => {"vidList":["10000","10001","10002"],"vidType":"u4"}
//bytes => [123, 34, 118, 105, 100, 76, 105, 115, 116, 34, 58, 91, 34, 49, 48, 48, 48, 48, 34, 44, 34, 49, 48, 48, 48, 49, 34, 44, 34, 49, 48, 48, 48, 50, 34, 93, 44, 34, 118, 105, 100, 84, 121, 112, 101, 34, 58, 34, 117, 52, 34, 125]
//"body": "eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="
//message => {"header":{"transactionId":"12_20210908141555_d7d88","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 14:15:56","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
//new String(bytes) : {"vidType":"u4","vidList":["10000","10001","10002"]}
String queryEQStatusBodys = JSONObject.toJSONString(queryEQStatusBody);
byte[] bytes = JSONObject.toJSONBytes(queryEQStatusBody);
System.out.println("myString :"+ new String(bytes));
mqMessage.setBody(queryEQStatusBodys);
mqMessage.setHeader(header);
//{"header":{"transactionId":"12_20210908094750_d62d4","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 09:47:50","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
String s = JSONObject.toJSONString(mqMessage);
MQMessage mqMessagessss = JSONObject.parseObject(s.getBytes(StandardCharsets.UTF_8), MQMessage.class);
System.out.println("mqMessagessss : " + mqMessagessss.toString());
System.out.println(s);
//String result= HttpUtil.post("localhost:8001/receivedFromEapRequest",s);
//{"header":{"transactionId":"PIDPID10_1_20210908093729_95196","messageType":"Request","messageCategory":"Alarm","messageName":"AlarmTimeOut","from":"EAP","to":"MES","equipmentId":"PIDPID10_1","sendTimestamp":"2021-09-08 09:37:29"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
String s22 = "{\"header\":{\"transactionId\":\"PIDPID10_1_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID10_1\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":\"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==\"}";
String s2 = "{\"header\":{\"transactionId\":\"PIDPID10_1_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID10_1\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":{\"TransactionId:\":\"PIDPID10_1_20210908093644_71835\",\"MessageName:\":\"TraceData\"}}";
JSONObject jsonObject = JSON.parseObject(s2);
String header1 = jsonObject.getString("header");
String body = jsonObject.getString("body");
String returns = jsonObject.getString("returns");
System.out.println("header: "+header1);
System.out.println("body: "+body);
System.out.println("returns: "+returns);
byte[] bytes2 = s2.getBytes();
MQMessage mqMessage2 = JSONObject.parseObject(bytes2, MQMessage.class);
System.out.println(mqMessage2.toString());
}
@RabbitListener(queues = ConfigOfPID10_1.MES_RESPONSE_QUEUE_PID10_1)
@RabbitHandler
public void mesResponse(Message<?> message, Channel channel)throws Exception{
logger.info("==============PIDPID10_1_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1"));
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//模拟异常 然后 拒签消息 然后丢到死信队列
try {
MQMessage mqMessage = CommonFunction.parse(message);
String transactionId = mqMessage.getHeader().getTransactionId();
Integer result = mqMessageHandler.responseHandler(message);
if(result == 1){
logger.info("在 MES_RESPONSE_QUEUE_PID10_1 队列中 , transitionId"+transactionId+" 这条消息处理成功");
channel.basicAck(deliveryTag,false);
}else {
logger.error("在 MES_RESPONSE_QUEUE_PID10_1 队列中 ,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列");
channel.basicNack(deliveryTag,false,false);
}
//throw new Exception("11111");
}catch (Exception e){
// 第一个false 不批量签收第二个false 不重回队列
channel.basicNack(deliveryTag,false,false);
return;
}
//注意 这里特别注意 已经拒收的消息 再签收是要出错的这里要特别注意
//channel.basicAck(deliveryTag,false);
}
}

View File

@ -61,6 +61,8 @@ public class PID18Received {
String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage)); String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage));
Integer result = Integer.valueOf(integer); Integer result = Integer.valueOf(integer);
//注意 这里特别注意 已经拒收的消息 再签收是要出错的这里要特别注意 //注意 这里特别注意 已经拒收的消息 再签收是要出错的这里要特别注意
if(result == 1){ if(result == 1){
logger.info("在 EAP_REQUEST_QUEUE_PID18 队列中,transitionId"+transactionId+", 这条消息处理成功"); logger.info("在 EAP_REQUEST_QUEUE_PID18 队列中,transitionId"+transactionId+", 这条消息处理成功");

View File

@ -0,0 +1,167 @@
package com.qgs.dc.mq.consumer;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.qgs.dc.common.utils.CommonFunction;
import com.qgs.dc.mq.Constant.Constant;
import com.qgs.dc.mq.configuration.ConfigOfPID2;
import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler;
import com.qgs.dc.mq.entity.MQMessage;
import com.qgs.dc.mq.entity.common.Header;
import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
/**
* @Desc: "PID2设备 接收MQ消息 监听类"
* @Author: caixiang
* @DATE: 2021/6/22 15:30
*
* Ctrl+R 替换设备名
*/
@Component
public class PID2Received {
private static final Logger logger = LoggerFactory.getLogger(PID2Received.class);
@Autowired
MQMessageHandler mqMessageHandler;
@RabbitListener(queues = ConfigOfPID2.EAP_REQUEST_QUEUE_PID2)
@RabbitHandler
public void eapRequest(Message<?> message, Channel channel)throws Exception{
logger.info("==============received message-EAP_REQUEST_QUEUE_PID2=================");
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//MQMessage 中的 transactionId
// String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation");
// //logger.info("transactionId:"+transactionId);
try {
MQMessage mqMessage = CommonFunction.parse(message);
String transactionId = mqMessage.getHeader().getTransactionId();
//1. 正常情况
String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage));
Integer result = Integer.valueOf(integer);
//注意 这里特别注意 已经拒收的消息 再签收是要出错的这里要特别注意
if(result == 1){
logger.info("在 EAP_REQUEST_QUEUE_PID2 队列中,transitionId"+transactionId+", 这条消息处理成功");
channel.basicAck(deliveryTag,false);
}else {
logger.error("在 EAP_REQUEST_QUEUE_PID2 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列");
channel.basicNack(deliveryTag,false,false);
}
//2.模拟异常 然后 拒签消息 然后丢到死信队列
}catch (Exception e){
// 第一个false 不批量签收第二个false 不重回队列
logger.error(e.getMessage());
channel.basicNack(deliveryTag,false,false);
return;
}
}
public static void main(String[] args) {
//localhost:8001
MQMessage mqMessage = new MQMessage();
Header header = new Header("Request","Execute","QUERYEQPStatus","12");
QueryEQStatusBody queryEQStatusBody = new QueryEQStatusBody();
queryEQStatusBody.setVidType("u4");
List<String> vids = new ArrayList<>();
vids.add("10000");
vids.add("10001");
vids.add("10002");
queryEQStatusBody.setVidList(vids);
//queryEQStatusBodys => {"vidList":["10000","10001","10002"],"vidType":"u4"}
//bytes => [123, 34, 118, 105, 100, 76, 105, 115, 116, 34, 58, 91, 34, 49, 48, 48, 48, 48, 34, 44, 34, 49, 48, 48, 48, 49, 34, 44, 34, 49, 48, 48, 48, 50, 34, 93, 44, 34, 118, 105, 100, 84, 121, 112, 101, 34, 58, 34, 117, 52, 34, 125]
//"body": "eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="
//message => {"header":{"transactionId":"12_20210908141555_d7d88","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 14:15:56","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
//new String(bytes) : {"vidType":"u4","vidList":["10000","10001","10002"]}
String queryEQStatusBodys = JSONObject.toJSONString(queryEQStatusBody);
byte[] bytes = JSONObject.toJSONBytes(queryEQStatusBody);
System.out.println("myString :"+ new String(bytes));
mqMessage.setBody(queryEQStatusBodys);
mqMessage.setHeader(header);
//{"header":{"transactionId":"12_20210908094750_d62d4","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 09:47:50","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
String s = JSONObject.toJSONString(mqMessage);
MQMessage mqMessagessss = JSONObject.parseObject(s.getBytes(StandardCharsets.UTF_8), MQMessage.class);
System.out.println("mqMessagessss : " + mqMessagessss.toString());
System.out.println(s);
//String result= HttpUtil.post("localhost:8001/receivedFromEapRequest",s);
//{"header":{"transactionId":"PIDPID2_20210908093729_95196","messageType":"Request","messageCategory":"Alarm","messageName":"AlarmTimeOut","from":"EAP","to":"MES","equipmentId":"PIDPID2","sendTimestamp":"2021-09-08 09:37:29"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
String s22 = "{\"header\":{\"transactionId\":\"PIDPID2_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID2\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":\"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==\"}";
String s2 = "{\"header\":{\"transactionId\":\"PIDPID2_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID2\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":{\"TransactionId:\":\"PIDPID2_20210908093644_71835\",\"MessageName:\":\"TraceData\"}}";
JSONObject jsonObject = JSON.parseObject(s2);
String header1 = jsonObject.getString("header");
String body = jsonObject.getString("body");
String returns = jsonObject.getString("returns");
System.out.println("header: "+header1);
System.out.println("body: "+body);
System.out.println("returns: "+returns);
byte[] bytes2 = s2.getBytes();
MQMessage mqMessage2 = JSONObject.parseObject(bytes2, MQMessage.class);
System.out.println(mqMessage2.toString());
}
@RabbitListener(queues = ConfigOfPID2.MES_RESPONSE_QUEUE_PID2)
@RabbitHandler
public void mesResponse(Message<?> message, Channel channel)throws Exception{
logger.info("==============PIDPID2_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1"));
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//模拟异常 然后 拒签消息 然后丢到死信队列
try {
MQMessage mqMessage = CommonFunction.parse(message);
String transactionId = mqMessage.getHeader().getTransactionId();
Integer result = mqMessageHandler.responseHandler(message);
if(result == 1){
logger.info("在 MES_RESPONSE_QUEUE_PID2 队列中 , transitionId"+transactionId+" 这条消息处理成功");
channel.basicAck(deliveryTag,false);
}else {
logger.error("在 MES_RESPONSE_QUEUE_PID2 队列中 ,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列");
channel.basicNack(deliveryTag,false,false);
}
//throw new Exception("11111");
}catch (Exception e){
// 第一个false 不批量签收第二个false 不重回队列
channel.basicNack(deliveryTag,false,false);
return;
}
//注意 这里特别注意 已经拒收的消息 再签收是要出错的这里要特别注意
//channel.basicAck(deliveryTag,false);
}
}

View File

@ -68,12 +68,7 @@ public class MQController {
String routingKey = callbackMessageEntity.getRoutingKey(); String routingKey = callbackMessageEntity.getRoutingKey();
logger.info("MES => EAP (MES_Request), 状态DC已收到 , 内容:"+ callbackMessageEntity.toString()); logger.info("MES => EAP (MES_Request), 状态DC已收到 , 内容:"+ callbackMessageEntity.toString());
//properties 这里的参数是写在MQ消息 header里面的如果EAP端 需要某些参数 可以写在这里eap去取更方便一些 rabbitSender.sendDirect(mqMessage,exchangeName,routingKey);
Map<String,Object> properties = new HashMap<>();
properties.put("equipmentName","PID001");
properties.put("transitionId",transitionId);
//rabbitSender.sendDirect(mqMessage,properties,exchangeName,routingKey,"6000");
rabbitSender.sendDirect(mqMessage,properties,exchangeName,routingKey);
logger.info("MES => EAP (MES_Request) , 状态DC已发送给MQ , 内容:"+ callbackMessageEntity.toString()); logger.info("MES => EAP (MES_Request) , 状态DC已发送给MQ , 内容:"+ callbackMessageEntity.toString());
AsyncFuture<MQMessage> add = SendedList.add(transitionId,mqMessage); AsyncFuture<MQMessage> add = SendedList.add(transitionId,mqMessage);
@ -84,4 +79,30 @@ public class MQController {
return R.error(2,e.getMessage()); return R.error(2,e.getMessage());
} }
} }
@PostMapping("/mesRequestNoW")
/**
* desc : MES给EAP发送远程指令MES_Request向rabbitmq中发送消息direct模式
* MES => DC(数据采集中间件) => MQ => EAP
* return :返回 的就是这个指令的回复指令
* 1 = 成功
* 其他 = 异常
*/
public R mesRequestNoW(@RequestBody CallbackMessageEntity callbackMessageEntity){
try {
String transitionId = callbackMessageEntity.getMqMessage().getHeader().getTransactionId();
MQMessage mqMessage = callbackMessageEntity.getMqMessage();
String exchangeName = callbackMessageEntity.getExchangeName();
String routingKey = callbackMessageEntity.getRoutingKey();
logger.info("MES => EAP (MES_Request), 状态DC已收到 , 内容:"+ callbackMessageEntity.toString());
rabbitSender.sendDirect(mqMessage,exchangeName,routingKey);
logger.info("MES => EAP (MES_Request) , 状态DC已发送给MQ , 内容:"+ callbackMessageEntity.toString());
return R.ok();
}catch (Exception e){
logger.error("MES => EAP (MES_Request) , 状态DC处理异常 , 内容:"+ callbackMessageEntity.toString());
return R.error(2,e.getMessage());
}
}
} }

View File

@ -1,6 +1,7 @@
package com.qgs.dc.mq.entity; package com.qgs.dc.mq.entity;
import cn.hutool.http.HttpUtil; import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.annotation.JSONField;
import com.alibaba.fastjson.annotation.JSONType; import com.alibaba.fastjson.annotation.JSONType;
import com.qgs.dc.mq.entity.common.Header; import com.qgs.dc.mq.entity.common.Header;
import com.qgs.dc.mq.entity.common.Returns; import com.qgs.dc.mq.entity.common.Returns;
@ -17,6 +18,7 @@ import java.util.HashMap;
@JSONType(orders={"header","body","returns"}) @JSONType(orders={"header","body","returns"})
public class MQMessage { public class MQMessage {
private Header header; private Header header;
@JSONField(jsonDirect = true)
private String body; private String body;
private Returns returns; private Returns returns;
} }

View File

@ -37,6 +37,7 @@ public class Header {
this.to = "eap"; this.to = "eap";
} }
public String getTransactionId() { public String getTransactionId() {
return transactionId; return transactionId;
} }

View File

@ -190,20 +190,28 @@ public class RabbitSender {
cd cd
); );
} }
public void sendDirect(MQMessage mqMessage, Map<String,Object> properties, String exchange, String routingKey) throws Exception{ public void sendDirect(MQMessage mqMessage, String exchange, String routingKey) throws Exception{
System.out.println("(before)mqMessage: "+mqMessage.toString());
byte[] message = JSONObject.toJSONBytes(mqMessage); byte[] message = JSONObject.toJSONBytes(mqMessage);
MessageHeaders messageHeaders = new MessageHeaders(properties);
Message<?> msg = MessageBuilder.createMessage(message,messageHeaders); System.out.println("(after)mqMessage: "+new String(message));
//String transitionId = mqMessage.getHeader().getEquipmentId()+"_"+CommonFunction.getNowDate(2)+"_"+CommonFunction.getUUID(10);
String transitionId = mqMessage.getHeader().getTransactionId(); String transitionId = mqMessage.getHeader().getTransactionId();
CorrelationData cd = new CorrelationData(transitionId); CorrelationData cd = new CorrelationData(transitionId);
rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnsCallback(returnsCallback); rabbitTemplate.setReturnsCallback(returnsCallback);
rabbitTemplate.convertAndSend(exchange, //rabbitTemplate.send 此方法发送的消息 是不含java相关信息的rabbitTemplate.convertAndSend是包含java相关信息的
routingKey, org.springframework.amqp.core.Message mqsMessage = new org.springframework.amqp.core.Message(message);
msg, rabbitTemplate.send(exchange,routingKey,mqsMessage,cd);
cd
); // MessageHeaders messageHeaders = new MessageHeaders(properties);
// Message<?> msg = MessageBuilder.createMessage(message,messageHeaders);Message<?> msg = MessageBuilder.createMessage(message,messageHeaders);
// rabbitTemplate.convertAndSend(exchange,
// routingKey,
// msg,
// cd
// );
} }

View File

@ -20,10 +20,16 @@ public enum DaveArea {
ANALOGOUTPUTS200(7), // System flags of 200 family ANALOGOUTPUTS200(7), // System flags of 200 family
COUNTER(28), // analog inputs of 200 family COUNTER(28), // analog inputs of 200 family
COUNTER200(30), // analog outputs of 200 family COUNTER200(30), // analog outputs of 200 family
DB(0x84), // Peripheral I/O //这个是plc中的不同区有V区input区output区 DB(0x84), // Peripheral I/O //DB块
DI(0x85), FLAGS(0x83), INPUTS(0x81), LOCAL(0x86), // data blocks DI(0x85), //DBI块
OUTPUTS(0x82), // instance data blocks FLAGS(0x83), //M块
P(0x80), // not tested INPUTS(0x81), //I块
T(0x1D), //T块ed
C(0x1C), //C块
LOCAL(0x86), // data blocks //LD块
OUTPUTS(0x82), // instance data blocks //Q块
P(0x80), // not tested //D块
SYSINFO(3), // local of caller SYSINFO(3), // local of caller
SYSTEMFLAGS(5), // S7 counters SYSTEMFLAGS(5), // S7 counters
TIMER(29), // S7 timers TIMER(29), // S7 timers

View File

@ -476,6 +476,8 @@ public final class PDU {
//0x03响应数据Ack-Data响应0x01的请求; //0x03响应数据Ack-Data响应0x01的请求;
//0x07自定义数据Userdata扩展协议类型 //0x07自定义数据Userdata扩展协议类型
this.mem[this.header + 1] = (byte) type; this.mem[this.header + 1] = (byte) type;
//dlen = data length ( 报文中存在两个字节 ) //dlen = data length ( 报文中存在两个字节 )
this.dlen = 0; this.dlen = 0;
//plen = param length ( 报文中存在两个字节 ) //plen = param length ( 报文中存在两个字节 )
@ -608,6 +610,7 @@ public final class PDU {
if (this.mem[this.param] != FUNC_WRITE) { if (this.mem[this.param] != FUNC_WRITE) {
return Nodave.RESULT_UNEXPECTED_FUNC; return Nodave.RESULT_UNEXPECTED_FUNC;
} }
if ((this.mem[this.data] == 255)) { if ((this.mem[this.data] == 255)) {
res = Nodave.RESULT_OK; res = Nodave.RESULT_OK;
} else { } else {

View File

@ -8,7 +8,7 @@ server:
spring: spring:
rabbitmq: rabbitmq:
# 如果是rabbitmq+haproxy+keepalived集群 那么192.168.0.176是haproxy代理的地址严格来说是keepalived的vip # 如果是rabbitmq+haproxy+keepalived集群 那么192.168.0.176是haproxy代理的地址严格来说是keepalived的vip
addresses: 192.168.0.176:5672 # 新版rabbitmq 版本还未测试 addresses: 192.168.0.170:5672 # 新版rabbitmq 版本还未测试
#addresses: 172.16.21.133:5672 #addresses: 172.16.21.133:5672
username: cigs username: cigs
password: cigs password: cigs