mark
This commit is contained in:
parent
eb30974d6d
commit
28591e74dc
@ -0,0 +1,98 @@
|
||||
package com.qgs.dc.mq.configuration;
|
||||
|
||||
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.DirectExchange;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @Desc: "设备:PID13S 相关信息定义"
|
||||
* @Author: caixiang
|
||||
* @DATE: 2021/6/7 9:11
|
||||
*/
|
||||
@Configuration
|
||||
public class ConfigOfPID13S {
|
||||
|
||||
//水平扩展其他设备的时候 只要:control+R 然后 PID13S=>00C 然后replace all
|
||||
public static final String EQUIPMENT_NAME_PID13S = "PID13S";
|
||||
|
||||
public static final String EXCHANGE_NAME_PID13S = EQUIPMENT_NAME_PID13S +"_Exchange";
|
||||
public static final String EAP_REQUEST_QUEUE_PID13S = EQUIPMENT_NAME_PID13S +"_EAP_Request_Queue";
|
||||
public static final String EAP_RESPONSE_QUEUE_PID13S = EQUIPMENT_NAME_PID13S +"_EAP_Response_Queue";
|
||||
public static final String MES_REQUEST_QUEUE_PID13S = EQUIPMENT_NAME_PID13S +"_MES_Request_Queue";
|
||||
public static final String MES_RESPONSE_QUEUE_PID13S = EQUIPMENT_NAME_PID13S +"_MES_Response_Queue";
|
||||
public static final String EAP_REQUEST_QUEUE_ROUTINGKEY_PID13S = EQUIPMENT_NAME_PID13S +"_EAP_Request_Queue_RoutingKey";
|
||||
public static final String EAP_RESPONSE_QUEUE_ROUTINGKEY_PID13S = EQUIPMENT_NAME_PID13S +"_EAP_Response_Queue_RoutingKey";
|
||||
public static final String MES_REQUEST_QUEUE_ROUTINGKEY_PID13S = EQUIPMENT_NAME_PID13S +"_MES_Request_Queue_RoutingKey";
|
||||
public static final String MES_RESPONSE_QUEUE_ROUTINGKEY_PID13S = EQUIPMENT_NAME_PID13S +"_MES_Response_Queue_RoutingKey";
|
||||
|
||||
|
||||
@Bean
|
||||
public DirectExchange EXCHANGE_NAME_PID13S(){
|
||||
return new DirectExchange(EXCHANGE_NAME_PID13S);
|
||||
}
|
||||
|
||||
//todo
|
||||
@Bean
|
||||
public Queue MES_REQUEST_QUEUE_PID13S(){
|
||||
Queue queue = new Queue(MES_REQUEST_QUEUE_PID13S);
|
||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
|
||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
|
||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
|
||||
return queue;
|
||||
}
|
||||
@Bean
|
||||
public Queue MES_RESPONSE_QUEUE_PID13S(){
|
||||
Queue queue = new Queue(MES_RESPONSE_QUEUE_PID13S);
|
||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
|
||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
|
||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
|
||||
return queue;
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue EAP_REQUEST_QUEUE_PID13S(){
|
||||
Queue queue = new Queue(EAP_REQUEST_QUEUE_PID13S);
|
||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
|
||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
|
||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
|
||||
return queue;
|
||||
|
||||
}
|
||||
@Bean
|
||||
public Queue EAP_RESPONSE_QUEUE_PID13S(){
|
||||
Queue queue = new Queue(EAP_RESPONSE_QUEUE_PID13S);
|
||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
|
||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
|
||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
|
||||
return queue;
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Binding bindExchangeAndQueueA_PID13S(){
|
||||
return BindingBuilder.bind(EAP_REQUEST_QUEUE_PID13S()).to(EXCHANGE_NAME_PID13S())
|
||||
.with(EAP_REQUEST_QUEUE_ROUTINGKEY_PID13S);
|
||||
}
|
||||
@Bean
|
||||
public Binding bindExchangeAndQueueB_PID13S(){
|
||||
return BindingBuilder.bind(EAP_RESPONSE_QUEUE_PID13S()).to(EXCHANGE_NAME_PID13S())
|
||||
.with(EAP_RESPONSE_QUEUE_ROUTINGKEY_PID13S);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding bindExchangeAndQueueC_PID13S(){
|
||||
return BindingBuilder.bind(MES_REQUEST_QUEUE_PID13S()).to(EXCHANGE_NAME_PID13S())
|
||||
.with(MES_REQUEST_QUEUE_ROUTINGKEY_PID13S);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding bindExchangeAndQueueD_PID13S(){
|
||||
return BindingBuilder.bind(MES_RESPONSE_QUEUE_PID13S()).to(EXCHANGE_NAME_PID13S())
|
||||
.with(MES_RESPONSE_QUEUE_ROUTINGKEY_PID13S);
|
||||
}
|
||||
}
|
98
src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID18.java
Normal file
98
src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID18.java
Normal file
@ -0,0 +1,98 @@
|
||||
package com.qgs.dc.mq.configuration;
|
||||
|
||||
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.DirectExchange;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @Desc: "设备:PID18 相关信息定义"
|
||||
* @Author: caixiang
|
||||
* @DATE: 2021/6/7 9:11
|
||||
*/
|
||||
@Configuration
|
||||
public class ConfigOfPID18 {
|
||||
|
||||
//水平扩展其他设备的时候 只要:control+R 然后 PID18=>00C 然后replace all
|
||||
public static final String EQUIPMENT_NAME_PID18 = "PID18";
|
||||
|
||||
public static final String EXCHANGE_NAME_PID18 = EQUIPMENT_NAME_PID18 +"_Exchange";
|
||||
public static final String EAP_REQUEST_QUEUE_PID18 = EQUIPMENT_NAME_PID18 +"_EAP_Request_Queue";
|
||||
public static final String EAP_RESPONSE_QUEUE_PID18 = EQUIPMENT_NAME_PID18 +"_EAP_Response_Queue";
|
||||
public static final String MES_REQUEST_QUEUE_PID18 = EQUIPMENT_NAME_PID18 +"_MES_Request_Queue";
|
||||
public static final String MES_RESPONSE_QUEUE_PID18 = EQUIPMENT_NAME_PID18 +"_MES_Response_Queue";
|
||||
public static final String EAP_REQUEST_QUEUE_ROUTINGKEY_PID18 = EQUIPMENT_NAME_PID18 +"_EAP_Request_Queue_RoutingKey";
|
||||
public static final String EAP_RESPONSE_QUEUE_ROUTINGKEY_PID18 = EQUIPMENT_NAME_PID18 +"_EAP_Response_Queue_RoutingKey";
|
||||
public static final String MES_REQUEST_QUEUE_ROUTINGKEY_PID18 = EQUIPMENT_NAME_PID18 +"_MES_Request_Queue_RoutingKey";
|
||||
public static final String MES_RESPONSE_QUEUE_ROUTINGKEY_PID18 = EQUIPMENT_NAME_PID18 +"_MES_Response_Queue_RoutingKey";
|
||||
|
||||
|
||||
@Bean
|
||||
public DirectExchange EXCHANGE_NAME_PID18(){
|
||||
return new DirectExchange(EXCHANGE_NAME_PID18);
|
||||
}
|
||||
|
||||
//todo
|
||||
@Bean
|
||||
public Queue MES_REQUEST_QUEUE_PID18(){
|
||||
Queue queue = new Queue(MES_REQUEST_QUEUE_PID18);
|
||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
|
||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
|
||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
|
||||
return queue;
|
||||
}
|
||||
@Bean
|
||||
public Queue MES_RESPONSE_QUEUE_PID18(){
|
||||
Queue queue = new Queue(MES_RESPONSE_QUEUE_PID18);
|
||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
|
||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
|
||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
|
||||
return queue;
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue EAP_REQUEST_QUEUE_PID18(){
|
||||
Queue queue = new Queue(EAP_REQUEST_QUEUE_PID18);
|
||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
|
||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
|
||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
|
||||
return queue;
|
||||
|
||||
}
|
||||
@Bean
|
||||
public Queue EAP_RESPONSE_QUEUE_PID18(){
|
||||
Queue queue = new Queue(EAP_RESPONSE_QUEUE_PID18);
|
||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
|
||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
|
||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
|
||||
return queue;
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Binding bindExchangeAndQueueA_PID18(){
|
||||
return BindingBuilder.bind(EAP_REQUEST_QUEUE_PID18()).to(EXCHANGE_NAME_PID18())
|
||||
.with(EAP_REQUEST_QUEUE_ROUTINGKEY_PID18);
|
||||
}
|
||||
@Bean
|
||||
public Binding bindExchangeAndQueueB_PID18(){
|
||||
return BindingBuilder.bind(EAP_RESPONSE_QUEUE_PID18()).to(EXCHANGE_NAME_PID18())
|
||||
.with(EAP_RESPONSE_QUEUE_ROUTINGKEY_PID18);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding bindExchangeAndQueueC_PID18(){
|
||||
return BindingBuilder.bind(MES_REQUEST_QUEUE_PID18()).to(EXCHANGE_NAME_PID18())
|
||||
.with(MES_REQUEST_QUEUE_ROUTINGKEY_PID18);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding bindExchangeAndQueueD_PID18(){
|
||||
return BindingBuilder.bind(MES_RESPONSE_QUEUE_PID18()).to(EXCHANGE_NAME_PID18())
|
||||
.with(MES_RESPONSE_QUEUE_ROUTINGKEY_PID18);
|
||||
}
|
||||
}
|
98
src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID4B.java
Normal file
98
src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID4B.java
Normal file
@ -0,0 +1,98 @@
|
||||
package com.qgs.dc.mq.configuration;
|
||||
|
||||
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.DirectExchange;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @Desc: "设备:PID4B 相关信息定义"
|
||||
* @Author: caixiang
|
||||
* @DATE: 2021/6/7 9:11
|
||||
*/
|
||||
@Configuration
|
||||
public class ConfigOfPID4B {
|
||||
|
||||
//水平扩展其他设备的时候 只要:control+R 然后 PID4B=>00C 然后replace all
|
||||
public static final String EQUIPMENT_NAME_PID4B = "PID4B";
|
||||
|
||||
public static final String EXCHANGE_NAME_PID4B = EQUIPMENT_NAME_PID4B +"_Exchange";
|
||||
public static final String EAP_REQUEST_QUEUE_PID4B = EQUIPMENT_NAME_PID4B +"_EAP_Request_Queue";
|
||||
public static final String EAP_RESPONSE_QUEUE_PID4B = EQUIPMENT_NAME_PID4B +"_EAP_Response_Queue";
|
||||
public static final String MES_REQUEST_QUEUE_PID4B = EQUIPMENT_NAME_PID4B +"_MES_Request_Queue";
|
||||
public static final String MES_RESPONSE_QUEUE_PID4B = EQUIPMENT_NAME_PID4B +"_MES_Response_Queue";
|
||||
public static final String EAP_REQUEST_QUEUE_ROUTINGKEY_PID4B = EQUIPMENT_NAME_PID4B +"_EAP_Request_Queue_RoutingKey";
|
||||
public static final String EAP_RESPONSE_QUEUE_ROUTINGKEY_PID4B = EQUIPMENT_NAME_PID4B +"_EAP_Response_Queue_RoutingKey";
|
||||
public static final String MES_REQUEST_QUEUE_ROUTINGKEY_PID4B = EQUIPMENT_NAME_PID4B +"_MES_Request_Queue_RoutingKey";
|
||||
public static final String MES_RESPONSE_QUEUE_ROUTINGKEY_PID4B = EQUIPMENT_NAME_PID4B +"_MES_Response_Queue_RoutingKey";
|
||||
|
||||
|
||||
@Bean
|
||||
public DirectExchange EXCHANGE_NAME_PID4B(){
|
||||
return new DirectExchange(EXCHANGE_NAME_PID4B);
|
||||
}
|
||||
|
||||
//todo
|
||||
@Bean
|
||||
public Queue MES_REQUEST_QUEUE_PID4B(){
|
||||
Queue queue = new Queue(MES_REQUEST_QUEUE_PID4B);
|
||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
|
||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
|
||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
|
||||
return queue;
|
||||
}
|
||||
@Bean
|
||||
public Queue MES_RESPONSE_QUEUE_PID4B(){
|
||||
Queue queue = new Queue(MES_RESPONSE_QUEUE_PID4B);
|
||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
|
||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
|
||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
|
||||
return queue;
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Queue EAP_REQUEST_QUEUE_PID4B(){
|
||||
Queue queue = new Queue(EAP_REQUEST_QUEUE_PID4B);
|
||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
|
||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
|
||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
|
||||
return queue;
|
||||
|
||||
}
|
||||
@Bean
|
||||
public Queue EAP_RESPONSE_QUEUE_PID4B(){
|
||||
Queue queue = new Queue(EAP_RESPONSE_QUEUE_PID4B);
|
||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
|
||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
|
||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
|
||||
return queue;
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Binding bindExchangeAndQueueA_PID4B(){
|
||||
return BindingBuilder.bind(EAP_REQUEST_QUEUE_PID4B()).to(EXCHANGE_NAME_PID4B())
|
||||
.with(EAP_REQUEST_QUEUE_ROUTINGKEY_PID4B);
|
||||
}
|
||||
@Bean
|
||||
public Binding bindExchangeAndQueueB_PID4B(){
|
||||
return BindingBuilder.bind(EAP_RESPONSE_QUEUE_PID4B()).to(EXCHANGE_NAME_PID4B())
|
||||
.with(EAP_RESPONSE_QUEUE_ROUTINGKEY_PID4B);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding bindExchangeAndQueueC_PID4B(){
|
||||
return BindingBuilder.bind(MES_REQUEST_QUEUE_PID4B()).to(EXCHANGE_NAME_PID4B())
|
||||
.with(MES_REQUEST_QUEUE_ROUTINGKEY_PID4B);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding bindExchangeAndQueueD_PID4B(){
|
||||
return BindingBuilder.bind(MES_RESPONSE_QUEUE_PID4B()).to(EXCHANGE_NAME_PID4B())
|
||||
.with(MES_RESPONSE_QUEUE_ROUTINGKEY_PID4B);
|
||||
}
|
||||
}
|
@ -44,6 +44,9 @@ public class PID00BReceived {
|
||||
@Autowired
|
||||
MQMessageHandler mqMessageHandler;
|
||||
|
||||
|
||||
//这个队列监听EAP 推送过来的Event,然后dc 把这个Event 转发给Mes
|
||||
//need
|
||||
@RabbitListener(queues = ConfigOf00B.EAP_REQUEST_QUEUE_00B)
|
||||
@RabbitHandler
|
||||
public void eapRequest(Message<?> message, Channel channel)throws Exception{
|
||||
@ -72,10 +75,6 @@ public class PID00BReceived {
|
||||
logger.error("在 EAP_REQUEST_QUEUE_00B 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列");
|
||||
channel.basicNack(deliveryTag,false,false);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
//2.模拟异常 ,然后 拒签消息 ,然后丢到死信队列
|
||||
|
||||
//throw new Exception("11111");
|
||||
@ -143,7 +142,8 @@ public class PID00BReceived {
|
||||
System.out.println(mqMessage2.toString());
|
||||
}
|
||||
|
||||
@RabbitListener(queues = ConfigOf00B.MES_RESPONSE_QUEUE_00B)
|
||||
|
||||
//这种情况:指的是 MES发出远程指令,然后eap处理完后,把结果丢到mesResponse队列 通知MES进行收尾处理。
|
||||
@RabbitHandler
|
||||
public void mesResponse(Message<?> message, Channel channel)throws Exception{
|
||||
logger.info("==============PID00B_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1"));
|
||||
@ -162,7 +162,6 @@ public class PID00BReceived {
|
||||
channel.basicNack(deliveryTag,false,false);
|
||||
}
|
||||
|
||||
//throw new Exception("11111");
|
||||
}catch (Exception e){
|
||||
// 第一个false 是 不批量签收;第二个false 是 不重回队列
|
||||
channel.basicNack(deliveryTag,false,false);
|
||||
|
170
src/main/java/com/qgs/dc/mq/consumer/PID13SReceived.java
Normal file
170
src/main/java/com/qgs/dc/mq/consumer/PID13SReceived.java
Normal file
@ -0,0 +1,170 @@
|
||||
package com.qgs.dc.mq.consumer;
|
||||
|
||||
import cn.hutool.http.HttpUtil;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.qgs.dc.common.utils.CommonFunction;
|
||||
import com.qgs.dc.mq.Constant.Constant;
|
||||
import com.qgs.dc.mq.configuration.ConfigOfPID13S;
|
||||
import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler;
|
||||
import com.qgs.dc.mq.entity.MQMessage;
|
||||
import com.qgs.dc.mq.entity.common.Header;
|
||||
import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.support.AmqpHeaders;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @Desc: "PIDPID13S设备 接收MQ消息 监听类"
|
||||
* @Author: caixiang
|
||||
* @DATE: 2021/6/22 15:30
|
||||
*
|
||||
* Ctrl+R 替换设备名
|
||||
*/
|
||||
@Component
|
||||
public class PID13SReceived {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(PID13SReceived.class);
|
||||
|
||||
|
||||
|
||||
@Autowired
|
||||
MQMessageHandler mqMessageHandler;
|
||||
|
||||
@RabbitListener(queues = ConfigOfPID13S.EAP_REQUEST_QUEUE_PID13S)
|
||||
@RabbitHandler
|
||||
public void eapRequest(Message<?> message, Channel channel)throws Exception{
|
||||
|
||||
logger.info("==============received message-EAP_REQUEST_QUEUE_PID13S=================,priority:"+"equipmentName"+message.getHeaders().get("attr2"));
|
||||
|
||||
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
|
||||
// //MQMessage 中的 transactionId
|
||||
// String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation");
|
||||
// //logger.info("transactionId:"+transactionId);
|
||||
|
||||
try {
|
||||
|
||||
MQMessage mqMessage = CommonFunction.parse(message);
|
||||
String transactionId = mqMessage.getHeader().getTransactionId();
|
||||
//1. 正常情况
|
||||
//Integer integer = mqMessageHandler.requestHandler(message);
|
||||
String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage));
|
||||
Integer result = Integer.valueOf(integer);
|
||||
|
||||
//注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。
|
||||
if(result == 1){
|
||||
logger.info("在 EAP_REQUEST_QUEUE_PID13S 队列中,transitionId"+transactionId+", 这条消息处理成功");
|
||||
channel.basicAck(deliveryTag,false);
|
||||
}else {
|
||||
logger.error("在 EAP_REQUEST_QUEUE_PID13S 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列");
|
||||
channel.basicNack(deliveryTag,false,false);
|
||||
}
|
||||
//2.模拟异常 ,然后 拒签消息 ,然后丢到死信队列
|
||||
|
||||
//throw new Exception("11111");
|
||||
}catch (Exception e){
|
||||
// 第一个false 是 不批量签收;第二个false 是 不重回队列
|
||||
logger.error(e.getMessage());
|
||||
channel.basicNack(deliveryTag,false,false);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
public static void main(String[] args) {
|
||||
//localhost:8001
|
||||
|
||||
|
||||
MQMessage mqMessage = new MQMessage();
|
||||
|
||||
Header header = new Header("Request","Execute","QUERYEQPStatus","12");
|
||||
QueryEQStatusBody queryEQStatusBody = new QueryEQStatusBody();
|
||||
queryEQStatusBody.setVidType("u4");
|
||||
List<String> vids = new ArrayList<>();
|
||||
vids.add("10000");
|
||||
vids.add("10001");
|
||||
vids.add("10002");
|
||||
queryEQStatusBody.setVidList(vids);
|
||||
|
||||
//queryEQStatusBodys => {"vidList":["10000","10001","10002"],"vidType":"u4"}
|
||||
//bytes => [123, 34, 118, 105, 100, 76, 105, 115, 116, 34, 58, 91, 34, 49, 48, 48, 48, 48, 34, 44, 34, 49, 48, 48, 48, 49, 34, 44, 34, 49, 48, 48, 48, 50, 34, 93, 44, 34, 118, 105, 100, 84, 121, 112, 101, 34, 58, 34, 117, 52, 34, 125]
|
||||
//"body": "eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="
|
||||
//message => {"header":{"transactionId":"12_20210908141555_d7d88","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 14:15:56","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
|
||||
//new String(bytes) : {"vidType":"u4","vidList":["10000","10001","10002"]}
|
||||
String queryEQStatusBodys = JSONObject.toJSONString(queryEQStatusBody);
|
||||
|
||||
|
||||
byte[] bytes = JSONObject.toJSONBytes(queryEQStatusBody);
|
||||
System.out.println("myString :"+ new String(bytes));
|
||||
mqMessage.setBody(queryEQStatusBodys);
|
||||
mqMessage.setHeader(header);
|
||||
|
||||
//{"header":{"transactionId":"12_20210908094750_d62d4","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 09:47:50","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
|
||||
String s = JSONObject.toJSONString(mqMessage);
|
||||
MQMessage mqMessagessss = JSONObject.parseObject(s.getBytes(StandardCharsets.UTF_8), MQMessage.class);
|
||||
System.out.println("mqMessagessss : " + mqMessagessss.toString());
|
||||
System.out.println(s);
|
||||
//String result= HttpUtil.post("localhost:8001/receivedFromEapRequest",s);
|
||||
|
||||
//{"header":{"transactionId":"PIDPID13S_20210908093729_95196","messageType":"Request","messageCategory":"Alarm","messageName":"AlarmTimeOut","from":"EAP","to":"MES","equipmentId":"PIDPID13S","sendTimestamp":"2021-09-08 09:37:29"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
|
||||
String s22 = "{\"header\":{\"transactionId\":\"PIDPID13S_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID13S\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":\"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==\"}";
|
||||
|
||||
String s2 = "{\"header\":{\"transactionId\":\"PIDPID13S_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID13S\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":{\"TransactionId:\":\"PIDPID13S_20210908093644_71835\",\"MessageName:\":\"TraceData\"}}";
|
||||
JSONObject jsonObject = JSON.parseObject(s2);
|
||||
String header1 = jsonObject.getString("header");
|
||||
String body = jsonObject.getString("body");
|
||||
String returns = jsonObject.getString("returns");
|
||||
System.out.println("header: "+header1);
|
||||
System.out.println("body: "+body);
|
||||
System.out.println("returns: "+returns);
|
||||
|
||||
byte[] bytes2 = s2.getBytes();
|
||||
MQMessage mqMessage2 = JSONObject.parseObject(bytes2, MQMessage.class);
|
||||
System.out.println(mqMessage2.toString());
|
||||
}
|
||||
|
||||
@RabbitListener(queues = ConfigOfPID13S.MES_RESPONSE_QUEUE_PID13S)
|
||||
@RabbitHandler
|
||||
public void mesResponse(Message<?> message, Channel channel)throws Exception{
|
||||
logger.info("==============PIDPID13S_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1"));
|
||||
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
|
||||
|
||||
//模拟异常 ,然后 拒签消息 ,然后丢到死信队列
|
||||
try {
|
||||
MQMessage mqMessage = CommonFunction.parse(message);
|
||||
String transactionId = mqMessage.getHeader().getTransactionId();
|
||||
Integer result = mqMessageHandler.responseHandler(message);
|
||||
if(result == 1){
|
||||
logger.info("在 MES_RESPONSE_QUEUE_PID13S 队列中 , transitionId"+transactionId+" 这条消息处理成功");
|
||||
channel.basicAck(deliveryTag,false);
|
||||
}else {
|
||||
logger.error("在 MES_RESPONSE_QUEUE_PID13S 队列中 ,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列");
|
||||
channel.basicNack(deliveryTag,false,false);
|
||||
}
|
||||
|
||||
//throw new Exception("11111");
|
||||
}catch (Exception e){
|
||||
// 第一个false 是 不批量签收;第二个false 是 不重回队列
|
||||
channel.basicNack(deliveryTag,false,false);
|
||||
return;
|
||||
}
|
||||
|
||||
//注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。
|
||||
//channel.basicAck(deliveryTag,false);
|
||||
|
||||
}
|
||||
|
||||
}
|
170
src/main/java/com/qgs/dc/mq/consumer/PID18Received.java
Normal file
170
src/main/java/com/qgs/dc/mq/consumer/PID18Received.java
Normal file
@ -0,0 +1,170 @@
|
||||
package com.qgs.dc.mq.consumer;
|
||||
|
||||
import cn.hutool.http.HttpUtil;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.qgs.dc.common.utils.CommonFunction;
|
||||
import com.qgs.dc.mq.Constant.Constant;
|
||||
import com.qgs.dc.mq.configuration.ConfigOfPID18;
|
||||
import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler;
|
||||
import com.qgs.dc.mq.entity.MQMessage;
|
||||
import com.qgs.dc.mq.entity.common.Header;
|
||||
import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.support.AmqpHeaders;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @Desc: "PIDPID18设备 接收MQ消息 监听类"
|
||||
* @Author: caixiang
|
||||
* @DATE: 2021/6/22 15:30
|
||||
*
|
||||
* Ctrl+R 替换设备名
|
||||
*/
|
||||
@Component
|
||||
public class PID18Received {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(PID18Received.class);
|
||||
|
||||
|
||||
|
||||
@Autowired
|
||||
MQMessageHandler mqMessageHandler;
|
||||
|
||||
@RabbitListener(queues = ConfigOfPID18.EAP_REQUEST_QUEUE_PID18)
|
||||
@RabbitHandler
|
||||
public void eapRequest(Message<?> message, Channel channel)throws Exception{
|
||||
|
||||
logger.info("==============received message-EAP_REQUEST_QUEUE_PID18=================,priority:"+"equipmentName"+message.getHeaders().get("attr2"));
|
||||
|
||||
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
|
||||
// //MQMessage 中的 transactionId
|
||||
// String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation");
|
||||
// //logger.info("transactionId:"+transactionId);
|
||||
|
||||
try {
|
||||
|
||||
MQMessage mqMessage = CommonFunction.parse(message);
|
||||
String transactionId = mqMessage.getHeader().getTransactionId();
|
||||
//1. 正常情况
|
||||
//Integer integer = mqMessageHandler.requestHandler(message);
|
||||
String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage));
|
||||
Integer result = Integer.valueOf(integer);
|
||||
|
||||
//注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。
|
||||
if(result == 1){
|
||||
logger.info("在 EAP_REQUEST_QUEUE_PID18 队列中,transitionId"+transactionId+", 这条消息处理成功");
|
||||
channel.basicAck(deliveryTag,false);
|
||||
}else {
|
||||
logger.error("在 EAP_REQUEST_QUEUE_PID18 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列");
|
||||
channel.basicNack(deliveryTag,false,false);
|
||||
}
|
||||
//2.模拟异常 ,然后 拒签消息 ,然后丢到死信队列
|
||||
|
||||
//throw new Exception("11111");
|
||||
}catch (Exception e){
|
||||
// 第一个false 是 不批量签收;第二个false 是 不重回队列
|
||||
logger.error(e.getMessage());
|
||||
channel.basicNack(deliveryTag,false,false);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
public static void main(String[] args) {
|
||||
//localhost:8001
|
||||
|
||||
|
||||
MQMessage mqMessage = new MQMessage();
|
||||
|
||||
Header header = new Header("Request","Execute","QUERYEQPStatus","12");
|
||||
QueryEQStatusBody queryEQStatusBody = new QueryEQStatusBody();
|
||||
queryEQStatusBody.setVidType("u4");
|
||||
List<String> vids = new ArrayList<>();
|
||||
vids.add("10000");
|
||||
vids.add("10001");
|
||||
vids.add("10002");
|
||||
queryEQStatusBody.setVidList(vids);
|
||||
|
||||
//queryEQStatusBodys => {"vidList":["10000","10001","10002"],"vidType":"u4"}
|
||||
//bytes => [123, 34, 118, 105, 100, 76, 105, 115, 116, 34, 58, 91, 34, 49, 48, 48, 48, 48, 34, 44, 34, 49, 48, 48, 48, 49, 34, 44, 34, 49, 48, 48, 48, 50, 34, 93, 44, 34, 118, 105, 100, 84, 121, 112, 101, 34, 58, 34, 117, 52, 34, 125]
|
||||
//"body": "eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="
|
||||
//message => {"header":{"transactionId":"12_20210908141555_d7d88","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 14:15:56","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
|
||||
//new String(bytes) : {"vidType":"u4","vidList":["10000","10001","10002"]}
|
||||
String queryEQStatusBodys = JSONObject.toJSONString(queryEQStatusBody);
|
||||
|
||||
|
||||
byte[] bytes = JSONObject.toJSONBytes(queryEQStatusBody);
|
||||
System.out.println("myString :"+ new String(bytes));
|
||||
mqMessage.setBody(queryEQStatusBodys);
|
||||
mqMessage.setHeader(header);
|
||||
|
||||
//{"header":{"transactionId":"12_20210908094750_d62d4","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 09:47:50","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
|
||||
String s = JSONObject.toJSONString(mqMessage);
|
||||
MQMessage mqMessagessss = JSONObject.parseObject(s.getBytes(StandardCharsets.UTF_8), MQMessage.class);
|
||||
System.out.println("mqMessagessss : " + mqMessagessss.toString());
|
||||
System.out.println(s);
|
||||
//String result= HttpUtil.post("localhost:8001/receivedFromEapRequest",s);
|
||||
|
||||
//{"header":{"transactionId":"PIDPID18_20210908093729_95196","messageType":"Request","messageCategory":"Alarm","messageName":"AlarmTimeOut","from":"EAP","to":"MES","equipmentId":"PIDPID18","sendTimestamp":"2021-09-08 09:37:29"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
|
||||
String s22 = "{\"header\":{\"transactionId\":\"PIDPID18_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID18\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":\"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==\"}";
|
||||
|
||||
String s2 = "{\"header\":{\"transactionId\":\"PIDPID18_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID18\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":{\"TransactionId:\":\"PIDPID18_20210908093644_71835\",\"MessageName:\":\"TraceData\"}}";
|
||||
JSONObject jsonObject = JSON.parseObject(s2);
|
||||
String header1 = jsonObject.getString("header");
|
||||
String body = jsonObject.getString("body");
|
||||
String returns = jsonObject.getString("returns");
|
||||
System.out.println("header: "+header1);
|
||||
System.out.println("body: "+body);
|
||||
System.out.println("returns: "+returns);
|
||||
|
||||
byte[] bytes2 = s2.getBytes();
|
||||
MQMessage mqMessage2 = JSONObject.parseObject(bytes2, MQMessage.class);
|
||||
System.out.println(mqMessage2.toString());
|
||||
}
|
||||
|
||||
@RabbitListener(queues = ConfigOfPID18.MES_RESPONSE_QUEUE_PID18)
|
||||
@RabbitHandler
|
||||
public void mesResponse(Message<?> message, Channel channel)throws Exception{
|
||||
logger.info("==============PIDPID18_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1"));
|
||||
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
|
||||
|
||||
//模拟异常 ,然后 拒签消息 ,然后丢到死信队列
|
||||
try {
|
||||
MQMessage mqMessage = CommonFunction.parse(message);
|
||||
String transactionId = mqMessage.getHeader().getTransactionId();
|
||||
Integer result = mqMessageHandler.responseHandler(message);
|
||||
if(result == 1){
|
||||
logger.info("在 MES_RESPONSE_QUEUE_PID18 队列中 , transitionId"+transactionId+" 这条消息处理成功");
|
||||
channel.basicAck(deliveryTag,false);
|
||||
}else {
|
||||
logger.error("在 MES_RESPONSE_QUEUE_PID18 队列中 ,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列");
|
||||
channel.basicNack(deliveryTag,false,false);
|
||||
}
|
||||
|
||||
//throw new Exception("11111");
|
||||
}catch (Exception e){
|
||||
// 第一个false 是 不批量签收;第二个false 是 不重回队列
|
||||
channel.basicNack(deliveryTag,false,false);
|
||||
return;
|
||||
}
|
||||
|
||||
//注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。
|
||||
//channel.basicAck(deliveryTag,false);
|
||||
|
||||
}
|
||||
|
||||
}
|
167
src/main/java/com/qgs/dc/mq/consumer/PID4BReceived.java
Normal file
167
src/main/java/com/qgs/dc/mq/consumer/PID4BReceived.java
Normal file
@ -0,0 +1,167 @@
|
||||
package com.qgs.dc.mq.consumer;
|
||||
|
||||
import cn.hutool.http.HttpUtil;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.qgs.dc.common.utils.CommonFunction;
|
||||
import com.qgs.dc.mq.Constant.Constant;
|
||||
import com.qgs.dc.mq.configuration.ConfigOfPID4B;
|
||||
import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler;
|
||||
import com.qgs.dc.mq.entity.MQMessage;
|
||||
import com.qgs.dc.mq.entity.common.Header;
|
||||
import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.support.AmqpHeaders;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @Desc: "PIDPID4B设备 接收MQ消息 监听类"
|
||||
* @Author: caixiang
|
||||
* @DATE: 2021/6/22 15:30
|
||||
*
|
||||
* Ctrl+R 替换设备名
|
||||
*/
|
||||
@Component
|
||||
public class PID4BReceived {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(PID4BReceived.class);
|
||||
|
||||
|
||||
|
||||
@Autowired
|
||||
MQMessageHandler mqMessageHandler;
|
||||
|
||||
@RabbitListener(queues = ConfigOfPID4B.EAP_REQUEST_QUEUE_PID4B)
|
||||
@RabbitHandler
|
||||
public void eapRequest(Message<?> message, Channel channel)throws Exception{
|
||||
|
||||
logger.info("==============received message-EAP_REQUEST_QUEUE_PID4B=================");
|
||||
|
||||
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
|
||||
//MQMessage 中的 transactionId
|
||||
// String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation");
|
||||
// //logger.info("transactionId:"+transactionId);
|
||||
|
||||
try {
|
||||
|
||||
MQMessage mqMessage = CommonFunction.parse(message);
|
||||
String transactionId = mqMessage.getHeader().getTransactionId();
|
||||
//1. 正常情况
|
||||
String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage));
|
||||
Integer result = Integer.valueOf(integer);
|
||||
|
||||
//注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。
|
||||
if(result == 1){
|
||||
logger.info("在 EAP_REQUEST_QUEUE_PID4B 队列中,transitionId"+transactionId+", 这条消息处理成功");
|
||||
channel.basicAck(deliveryTag,false);
|
||||
}else {
|
||||
logger.error("在 EAP_REQUEST_QUEUE_PID4B 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列");
|
||||
channel.basicNack(deliveryTag,false,false);
|
||||
}
|
||||
//2.模拟异常 ,然后 拒签消息 ,然后丢到死信队列
|
||||
}catch (Exception e){
|
||||
// 第一个false 是 不批量签收;第二个false 是 不重回队列
|
||||
logger.error(e.getMessage());
|
||||
channel.basicNack(deliveryTag,false,false);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
public static void main(String[] args) {
|
||||
//localhost:8001
|
||||
|
||||
|
||||
MQMessage mqMessage = new MQMessage();
|
||||
|
||||
Header header = new Header("Request","Execute","QUERYEQPStatus","12");
|
||||
QueryEQStatusBody queryEQStatusBody = new QueryEQStatusBody();
|
||||
queryEQStatusBody.setVidType("u4");
|
||||
List<String> vids = new ArrayList<>();
|
||||
vids.add("10000");
|
||||
vids.add("10001");
|
||||
vids.add("10002");
|
||||
queryEQStatusBody.setVidList(vids);
|
||||
|
||||
//queryEQStatusBodys => {"vidList":["10000","10001","10002"],"vidType":"u4"}
|
||||
//bytes => [123, 34, 118, 105, 100, 76, 105, 115, 116, 34, 58, 91, 34, 49, 48, 48, 48, 48, 34, 44, 34, 49, 48, 48, 48, 49, 34, 44, 34, 49, 48, 48, 48, 50, 34, 93, 44, 34, 118, 105, 100, 84, 121, 112, 101, 34, 58, 34, 117, 52, 34, 125]
|
||||
//"body": "eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="
|
||||
//message => {"header":{"transactionId":"12_20210908141555_d7d88","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 14:15:56","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
|
||||
//new String(bytes) : {"vidType":"u4","vidList":["10000","10001","10002"]}
|
||||
String queryEQStatusBodys = JSONObject.toJSONString(queryEQStatusBody);
|
||||
|
||||
|
||||
byte[] bytes = JSONObject.toJSONBytes(queryEQStatusBody);
|
||||
System.out.println("myString :"+ new String(bytes));
|
||||
mqMessage.setBody(queryEQStatusBodys);
|
||||
mqMessage.setHeader(header);
|
||||
|
||||
//{"header":{"transactionId":"12_20210908094750_d62d4","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 09:47:50","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
|
||||
String s = JSONObject.toJSONString(mqMessage);
|
||||
MQMessage mqMessagessss = JSONObject.parseObject(s.getBytes(StandardCharsets.UTF_8), MQMessage.class);
|
||||
System.out.println("mqMessagessss : " + mqMessagessss.toString());
|
||||
System.out.println(s);
|
||||
//String result= HttpUtil.post("localhost:8001/receivedFromEapRequest",s);
|
||||
|
||||
//{"header":{"transactionId":"PIDPID4B_20210908093729_95196","messageType":"Request","messageCategory":"Alarm","messageName":"AlarmTimeOut","from":"EAP","to":"MES","equipmentId":"PIDPID4B","sendTimestamp":"2021-09-08 09:37:29"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
|
||||
String s22 = "{\"header\":{\"transactionId\":\"PIDPID4B_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID4B\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":\"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==\"}";
|
||||
|
||||
String s2 = "{\"header\":{\"transactionId\":\"PIDPID4B_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID4B\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":{\"TransactionId:\":\"PIDPID4B_20210908093644_71835\",\"MessageName:\":\"TraceData\"}}";
|
||||
JSONObject jsonObject = JSON.parseObject(s2);
|
||||
String header1 = jsonObject.getString("header");
|
||||
String body = jsonObject.getString("body");
|
||||
String returns = jsonObject.getString("returns");
|
||||
System.out.println("header: "+header1);
|
||||
System.out.println("body: "+body);
|
||||
System.out.println("returns: "+returns);
|
||||
|
||||
byte[] bytes2 = s2.getBytes();
|
||||
MQMessage mqMessage2 = JSONObject.parseObject(bytes2, MQMessage.class);
|
||||
System.out.println(mqMessage2.toString());
|
||||
}
|
||||
|
||||
@RabbitListener(queues = ConfigOfPID4B.MES_RESPONSE_QUEUE_PID4B)
|
||||
@RabbitHandler
|
||||
public void mesResponse(Message<?> message, Channel channel)throws Exception{
|
||||
logger.info("==============PIDPID4B_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1"));
|
||||
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
|
||||
|
||||
//模拟异常 ,然后 拒签消息 ,然后丢到死信队列
|
||||
try {
|
||||
MQMessage mqMessage = CommonFunction.parse(message);
|
||||
String transactionId = mqMessage.getHeader().getTransactionId();
|
||||
Integer result = mqMessageHandler.responseHandler(message);
|
||||
if(result == 1){
|
||||
logger.info("在 MES_RESPONSE_QUEUE_PID4B 队列中 , transitionId"+transactionId+" 这条消息处理成功");
|
||||
channel.basicAck(deliveryTag,false);
|
||||
}else {
|
||||
logger.error("在 MES_RESPONSE_QUEUE_PID4B 队列中 ,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列");
|
||||
channel.basicNack(deliveryTag,false,false);
|
||||
}
|
||||
|
||||
//throw new Exception("11111");
|
||||
}catch (Exception e){
|
||||
// 第一个false 是 不批量签收;第二个false 是 不重回队列
|
||||
channel.basicNack(deliveryTag,false,false);
|
||||
return;
|
||||
}
|
||||
|
||||
//注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。
|
||||
//channel.basicAck(deliveryTag,false);
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -51,6 +51,7 @@ public class MQController {
|
||||
}
|
||||
}
|
||||
|
||||
//need
|
||||
@PostMapping("/mesRequest")
|
||||
/**
|
||||
* desc : MES给EAP发送远程指令(MES_Request)(向rabbitmq中发送消息(direct模式))
|
||||
|
@ -137,6 +137,14 @@ public class OperateController {
|
||||
|
||||
}
|
||||
|
||||
public String boolList(Boolean[] b){
|
||||
List<Boolean> c = new ArrayList<>();
|
||||
for(int i=0;i<b.length;i++){
|
||||
c.add(b[i]);
|
||||
}
|
||||
return c.toString();
|
||||
}
|
||||
|
||||
@PostMapping("/read")
|
||||
//public R read(Integer nameSpace,String identifier,String plcName) {
|
||||
public R read(@RequestBody ReadArgEntity readArgEntity) {
|
||||
|
@ -29,7 +29,6 @@ public class MainForRead {
|
||||
|
||||
S7Connector connector = S7Client.S7_1500.getConnector();
|
||||
|
||||
|
||||
// // [0]
|
||||
// byte[] bool = connector.read(DaveArea.DB, 3, 1, 3266,0);
|
||||
// byte[] bool2 = connector.read(DaveArea.DB, 3, 1, 3266,1);
|
||||
@ -46,6 +45,7 @@ public class MainForRead {
|
||||
System.out.println("DB3.0-bool3 : " + ByteUtils.toBoolean(bool3));
|
||||
|
||||
|
||||
|
||||
//
|
||||
// byte[] bool4 = connector.read(DaveArea.DB, 3, 1, 0);
|
||||
// System.out.println("DB3.0-bool3 : " + ByteUtils.toBoolean(bool3));
|
||||
@ -83,21 +83,21 @@ public class MainForRead {
|
||||
// System.out.println("DB3.42-DATE : "+ByteUtils.addDate("1990-01-01",aLong));
|
||||
// }
|
||||
//
|
||||
{
|
||||
//[7, -78, 1, 1, 5, 0, 0, 0, 0, 0, 0, 0] DTL#1970-01-01-00:00:00 1998-1-1 星期五 0.0.0.0
|
||||
//0000 0111 (1100 1110)=>(1011 0010 178) 178+256+512+1024=1970
|
||||
//(注意:第二个字节是负数要转成补码形式表示(因为在通行传输中 字节是原码形式传输的,但是java中long int byte... 都是以补码形式保存的 java帮你自动保存了其实这是不对的所以你要转换一下) 参考https://blog.csdn.net/csdn_ds/article/details/79106006)
|
||||
//[7, -68, 1, 21, 2, 0, 0, 0, 0, 0, 0, 0]
|
||||
//0000 0111 (1100 0100)=>(1011 1100 188) 188+256+512+1024=1980
|
||||
byte[] dtl = connector.read(DaveArea.DB, 3, 12, 44);
|
||||
byte[] year = new byte[2];
|
||||
year[0] = dtl[0];
|
||||
year[1] = dtl[1];
|
||||
byte[] month = new byte[1];
|
||||
month[0] = dtl[2];
|
||||
// System.out.println("DB3.12-DTL : " + byteArrayByteUtils.toUInt(year)+"-"+byteArrayByteUtils.toUInt(month));
|
||||
System.out.println("DB3.44-DTL : " + ByteUtils.toInt(year[0],year[1])+"-"+ByteUtils.toInt(month[0]));
|
||||
}
|
||||
// {
|
||||
// //[7, -78, 1, 1, 5, 0, 0, 0, 0, 0, 0, 0] DTL#1970-01-01-00:00:00 1998-1-1 星期五 0.0.0.0
|
||||
// //0000 0111 (1100 1110)=>(1011 0010 178) 178+256+512+1024=1970
|
||||
// //(注意:第二个字节是负数要转成补码形式表示(因为在通行传输中 字节是原码形式传输的,但是java中long int byte... 都是以补码形式保存的 java帮你自动保存了其实这是不对的所以你要转换一下) 参考https://blog.csdn.net/csdn_ds/article/details/79106006)
|
||||
// //[7, -68, 1, 21, 2, 0, 0, 0, 0, 0, 0, 0]
|
||||
// //0000 0111 (1100 0100)=>(1011 1100 188) 188+256+512+1024=1980
|
||||
// byte[] dtl = connector.read(DaveArea.DB, 3, 12, 44);
|
||||
// byte[] year = new byte[2];
|
||||
// year[0] = dtl[0];
|
||||
// year[1] = dtl[1];
|
||||
// byte[] month = new byte[1];
|
||||
// month[0] = dtl[2];
|
||||
//// System.out.println("DB3.12-DTL : " + byteArrayByteUtils.toUInt(year)+"-"+byteArrayByteUtils.toUInt(month));
|
||||
// System.out.println("DB3.44-DTL : " + ByteUtils.toInt(year[0],year[1])+"-"+ByteUtils.toInt(month[0]));
|
||||
// }
|
||||
//
|
||||
// {
|
||||
// //[59, -102, -55, -1] T#11D_13H_46M_39S_999MS
|
||||
@ -176,59 +176,59 @@ public class MainForRead {
|
||||
//
|
||||
// }
|
||||
//
|
||||
{
|
||||
try {
|
||||
//byte 占用一个字节,如果是数组的话,就读取2个(要事先知道,点表规定数组长度),实际就是读取 DB3.830
|
||||
//bytes = byteLength * arrayLength 举例:1(byteLength) * 2(arrayLength) =2(bytes)
|
||||
byte[] boolArrays = connector.read(DaveArea.DB, 3, 2, 830);
|
||||
List<Boolean> booleans = ByteUtils.toBoolArray(boolArrays);
|
||||
System.out.println("DB3.830-boolArrays : " +booleans );
|
||||
|
||||
byte[] byteArrays = connector.read(DaveArea.DB, 3, 2, 832);
|
||||
List<Byte> bytes = ByteUtils.toByteArray(byteArrays);
|
||||
System.out.println("DB3.832-byteArrays : " +bytes );
|
||||
|
||||
byte[] charArrays = connector.read(DaveArea.DB, 3, 2, 834);
|
||||
List<Character> chars = ByteUtils.toCharArray(charArrays);
|
||||
System.out.println("DB3.834-charArrays : " +chars );
|
||||
|
||||
byte[] wordArrays = connector.read(DaveArea.DB, 3, 4, 836);
|
||||
List<Integer> words = ByteUtils.toWordArray(wordArrays);
|
||||
System.out.println("DB3.836-wordArrays : " +words );
|
||||
|
||||
byte[] dwordArrays = connector.read(DaveArea.DB, 3, 8, 840);
|
||||
List<Integer> dwords = ByteUtils.toDWordArray(dwordArrays);
|
||||
System.out.println("DB3.840-dwordArrays : " +dwords );
|
||||
|
||||
byte[] sintArrays = connector.read(DaveArea.DB, 3, 2, 852);
|
||||
List<Integer> sints = ByteUtils.toSIntArray(sintArrays);
|
||||
System.out.println("DB3.852-sintArrays : " +sints );
|
||||
|
||||
byte[] intArrays = connector.read(DaveArea.DB, 3, 4, 848);
|
||||
List<Integer> ints = ByteUtils.toIntArray(intArrays);
|
||||
System.out.println("DB3.848-intArrays : " +ints );
|
||||
|
||||
byte[] dintArrays = connector.read(DaveArea.DB, 3, 8, 854);
|
||||
List<Integer> dints = ByteUtils.toDIntArray(dintArrays);
|
||||
System.out.println("DB3.852-dintArrays : " +dints);
|
||||
|
||||
|
||||
byte[] usintArrays = connector.read(DaveArea.DB, 3, 3, 3240);
|
||||
List<Integer> usints = ByteUtils.toUSIntArray(usintArrays);
|
||||
System.out.println("DB3.3240-usintArrays : " +usints );
|
||||
|
||||
byte[] uintArrays = connector.read(DaveArea.DB, 3, 6, 3256);
|
||||
List<Integer> uints = ByteUtils.toUIntArray(uintArrays);
|
||||
System.out.println("DB3.3256-uintArrays : " +uints );
|
||||
|
||||
byte[] udintArrays = connector.read(DaveArea.DB, 3, 12, 3244);
|
||||
List<Long> udints = ByteUtils.toUDIntArray(udintArrays);
|
||||
System.out.println("DB3.852-udintArrays : " +udints);
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
};
|
||||
// {
|
||||
// try {
|
||||
// //byte 占用一个字节,如果是数组的话,就读取2个(要事先知道,点表规定数组长度),实际就是读取 DB3.830
|
||||
// //bytes = byteLength * arrayLength 举例:1(byteLength) * 2(arrayLength) =2(bytes)
|
||||
// byte[] boolArrays = connector.read(DaveArea.DB, 3, 2, 830);
|
||||
// List<Boolean> booleans = ByteUtils.toBoolArray(boolArrays);
|
||||
// System.out.println("DB3.830-boolArrays : " +booleans );
|
||||
//
|
||||
// byte[] byteArrays = connector.read(DaveArea.DB, 3, 2, 832);
|
||||
// List<Byte> bytes = ByteUtils.toByteArray(byteArrays);
|
||||
// System.out.println("DB3.832-byteArrays : " +bytes );
|
||||
//
|
||||
// byte[] charArrays = connector.read(DaveArea.DB, 3, 2, 834);
|
||||
// List<Character> chars = ByteUtils.toCharArray(charArrays);
|
||||
// System.out.println("DB3.834-charArrays : " +chars );
|
||||
//
|
||||
// byte[] wordArrays = connector.read(DaveArea.DB, 3, 4, 836);
|
||||
// List<Integer> words = ByteUtils.toWordArray(wordArrays);
|
||||
// System.out.println("DB3.836-wordArrays : " +words );
|
||||
//
|
||||
// byte[] dwordArrays = connector.read(DaveArea.DB, 3, 8, 840);
|
||||
// List<Integer> dwords = ByteUtils.toDWordArray(dwordArrays);
|
||||
// System.out.println("DB3.840-dwordArrays : " +dwords );
|
||||
//
|
||||
// byte[] sintArrays = connector.read(DaveArea.DB, 3, 2, 852);
|
||||
// List<Integer> sints = ByteUtils.toSIntArray(sintArrays);
|
||||
// System.out.println("DB3.852-sintArrays : " +sints );
|
||||
//
|
||||
// byte[] intArrays = connector.read(DaveArea.DB, 3, 4, 848);
|
||||
// List<Integer> ints = ByteUtils.toIntArray(intArrays);
|
||||
// System.out.println("DB3.848-intArrays : " +ints );
|
||||
//
|
||||
// byte[] dintArrays = connector.read(DaveArea.DB, 3, 8, 854);
|
||||
// List<Integer> dints = ByteUtils.toDIntArray(dintArrays);
|
||||
// System.out.println("DB3.852-dintArrays : " +dints);
|
||||
//
|
||||
//
|
||||
// byte[] usintArrays = connector.read(DaveArea.DB, 3, 3, 3240);
|
||||
// List<Integer> usints = ByteUtils.toUSIntArray(usintArrays);
|
||||
// System.out.println("DB3.3240-usintArrays : " +usints );
|
||||
//
|
||||
// byte[] uintArrays = connector.read(DaveArea.DB, 3, 6, 3256);
|
||||
// List<Integer> uints = ByteUtils.toUIntArray(uintArrays);
|
||||
// System.out.println("DB3.3256-uintArrays : " +uints );
|
||||
//
|
||||
// byte[] udintArrays = connector.read(DaveArea.DB, 3, 12, 3244);
|
||||
// List<Long> udints = ByteUtils.toUDIntArray(udintArrays);
|
||||
// System.out.println("DB3.852-udintArrays : " +udints);
|
||||
// }catch (Exception e){
|
||||
// e.printStackTrace();
|
||||
// }
|
||||
//
|
||||
// };
|
||||
|
||||
|
||||
|
||||
|
@ -29,9 +29,10 @@ import java.util.concurrent.TimeUnit;
|
||||
public enum S7Client {
|
||||
//TODO 步骤1 这里是配置多PLC 的,,,有多个plc 就在这里配置一个枚举类
|
||||
//1500 西门子200smart、1200、1500默认的 机架号=0 槽位号=1; 300/400 默认的 机架-0 插槽-2
|
||||
S7_1200("192.168.0.52",0,1,1,PlcVarActual.HeartBeatFor1200),
|
||||
|
||||
S7_1500("192.168.0.51",0,1,1,PlcVarActual.HeartBeat),
|
||||
//1500 机架-0 插槽-1
|
||||
S7_1200("192.168.0.52",0,1,1,PlcVarActual.HeartBeatFor1200)
|
||||
//后续 在这里扩展 多PLC应用。
|
||||
|
||||
|
||||
@ -271,7 +272,7 @@ public enum S7Client {
|
||||
.build();
|
||||
return connector;
|
||||
}catch (S7Exception e){
|
||||
logger.info("创建S7Connector 连接失败,原因:"+e.getMessage());
|
||||
// logger.info("创建S7Connector 连接失败,原因:"+e.getMessage());
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -8,11 +8,11 @@ server:
|
||||
spring:
|
||||
rabbitmq:
|
||||
# 如果是rabbitmq+haproxy+keepalived集群 ,,那么192.168.0.176是haproxy代理的地址(严格来说是keepalived的vip)
|
||||
addresses: 192.168.0.176:5672 ## 新版rabbitmq 版本还未测试
|
||||
addresses: 192.168.0.176:5672 # 新版rabbitmq 版本还未测试
|
||||
#addresses: 172.16.21.133:5672
|
||||
username: cdte
|
||||
password: cdte
|
||||
virtual-host: cdte
|
||||
username: cigs
|
||||
password: cigs
|
||||
virtual-host: cigs
|
||||
connection-timeout: 15000
|
||||
publisher-confirm-type: correlated
|
||||
publisher-returns: true
|
||||
|
Loading…
Reference in New Issue
Block a user