This commit is contained in:
caixiang 2021-09-09 11:02:00 +08:00
parent 15fbe0f14a
commit 032f8af8cf
7 changed files with 90 additions and 26 deletions

View File

@ -138,6 +138,7 @@ public class CommonFunction {
*/
public static MQMessage parse(Message<?> message){
byte[] bytes =(byte[]) message.getPayload();
String s = new String(bytes);
MQMessage mqMessage = JSONObject.parseObject(bytes, MQMessage.class);
return mqMessage;
}

View File

@ -2,6 +2,7 @@ package com.qgs.dc.mq.consumer;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.qgs.dc.common.utils.CommonFunction;
import com.qgs.dc.mq.Constant.Constant;
@ -21,6 +22,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -75,6 +77,7 @@ public class PID00BReceived {
//throw new Exception("11111");
}catch (Exception e){
// 第一个false 不批量签收第二个false 不重回队列
logger.error(e.getMessage());
channel.basicNack(deliveryTag,false,false);
return;
}
@ -83,6 +86,7 @@ public class PID00BReceived {
}
public static void main(String[] args) {
//localhost:8001
@ -97,16 +101,42 @@ public class PID00BReceived {
vids.add("10001");
vids.add("10002");
queryEQStatusBody.setVidList(vids);
mqMessage.setBody(JSONObject.toJSONBytes(queryEQStatusBody));
//queryEQStatusBodys => {"vidList":["10000","10001","10002"],"vidType":"u4"}
//bytes => [123, 34, 118, 105, 100, 76, 105, 115, 116, 34, 58, 91, 34, 49, 48, 48, 48, 48, 34, 44, 34, 49, 48, 48, 48, 49, 34, 44, 34, 49, 48, 48, 48, 50, 34, 93, 44, 34, 118, 105, 100, 84, 121, 112, 101, 34, 58, 34, 117, 52, 34, 125]
//"body": "eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="
//message => {"header":{"transactionId":"12_20210908141555_d7d88","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 14:15:56","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
//new String(bytes) : {"vidType":"u4","vidList":["10000","10001","10002"]}
String queryEQStatusBodys = JSONObject.toJSONString(queryEQStatusBody);
byte[] bytes = JSONObject.toJSONBytes(queryEQStatusBody);
System.out.println("myString :"+ new String(bytes));
mqMessage.setBody(queryEQStatusBodys);
mqMessage.setHeader(header);
//{"header":{"transactionId":"12_20210908094750_d62d4","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 09:47:50","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
String s = JSONObject.toJSONString(mqMessage);
MQMessage mqMessagessss = JSONObject.parseObject(s.getBytes(StandardCharsets.UTF_8), MQMessage.class);
System.out.println("mqMessagessss : " + mqMessagessss.toString());
System.out.println(s);
String result= HttpUtil.post("localhost:8001/receivedFromEapRequest",s);
//String result= HttpUtil.post("localhost:8001/receivedFromEapRequest",s);
System.out.println(result);
//{"header":{"transactionId":"PID00B_20210908093729_95196","messageType":"Request","messageCategory":"Alarm","messageName":"AlarmTimeOut","from":"EAP","to":"MES","equipmentId":"PID00B","sendTimestamp":"2021-09-08 09:37:29"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
String s22 = "{\"header\":{\"transactionId\":\"PID00B_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PID00B\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":\"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==\"}";
String s2 = "{\"header\":{\"transactionId\":\"PID00B_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PID00B\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":{\"TransactionId:\":\"PID00B_20210908093644_71835\",\"MessageName:\":\"TraceData\"}}";
JSONObject jsonObject = JSON.parseObject(s2);
String header1 = jsonObject.getString("header");
String body = jsonObject.getString("body");
String returns = jsonObject.getString("returns");
System.out.println("header: "+header1);
System.out.println("body: "+body);
System.out.println("returns: "+returns);
byte[] bytes2 = s2.getBytes();
MQMessage mqMessage2 = JSONObject.parseObject(bytes2, MQMessage.class);
System.out.println(mqMessage2.toString());
}
@RabbitListener(queues = ConfigOf00B.MES_RESPONSE_QUEUE_00B)

View File

@ -42,7 +42,7 @@ public class MQController {
Map<String,Object> properties = new HashMap<>();
properties.put("equipmentName","PID001");
properties.put("transitionId", callbackMessageEntity.getMqMessage().getHeader().getTransactionId());
rabbitSender.reply(callbackMessageEntity.getMqMessage(),properties, callbackMessageEntity.getExchangeName(), callbackMessageEntity.getRoutingKey(),"6000");
rabbitSender.reply(callbackMessageEntity.getMqMessage(),properties, callbackMessageEntity.getExchangeName(), callbackMessageEntity.getRoutingKey());
logger.info("MES => EAP (EAP_Response) , 状态DC已发送给MQ , 内容:"+ callbackMessageEntity.toString());
return R.ok("回复成功");
}catch (Exception e){
@ -71,8 +71,8 @@ public class MQController {
Map<String,Object> properties = new HashMap<>();
properties.put("equipmentName","PID001");
properties.put("transitionId",transitionId);
rabbitSender.reply(mqMessage,properties,exchangeName,routingKey,"6000");
//rabbitSender.sendDirect(mqMessage,properties,exchangeName,routingKey,"6000");
rabbitSender.sendDirect(mqMessage,properties,exchangeName,routingKey);
logger.info("MES => EAP (MES_Request) , 状态DC已发送给MQ , 内容:"+ callbackMessageEntity.toString());
AsyncFuture<MQMessage> add = SendedList.add(transitionId,mqMessage);

View File

@ -14,6 +14,6 @@ import lombok.Data;
@JSONType(orders={"header","body","returns"})
public class MQMessage {
private Header header;
private byte[] body;
private String body;
private Returns returns;
}

View File

@ -190,6 +190,22 @@ public class RabbitSender {
cd
);
}
public void sendDirect(MQMessage mqMessage, Map<String,Object> properties, String exchange, String routingKey) throws Exception{
byte[] message = JSONObject.toJSONBytes(mqMessage);
MessageHeaders messageHeaders = new MessageHeaders(properties);
Message<?> msg = MessageBuilder.createMessage(message,messageHeaders);
//String transitionId = mqMessage.getHeader().getEquipmentId()+"_"+CommonFunction.getNowDate(2)+"_"+CommonFunction.getUUID(10);
String transitionId = mqMessage.getHeader().getTransactionId();
CorrelationData cd = new CorrelationData(transitionId);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnsCallback(returnsCallback);
rabbitTemplate.convertAndSend(exchange,
routingKey,
msg,
cd
);
}
/**
* @Description 回复EAP的request向rabbitmq中发送消息direct模式
@ -262,25 +278,42 @@ public class RabbitSender {
cd
);
}
public void reply(MQMessage mqMessage, Map<String,Object> properties, String exchange, String routingKey) throws Exception{
byte[] message = JSONObject.toJSONBytes(mqMessage);
MessageHeaders messageHeaders = new MessageHeaders(properties);
Message<?> msg = MessageBuilder.createMessage(message,messageHeaders);
String transitionId = mqMessage.getHeader().getEquipmentId()+"_"+ CommonFunction.getNowDate(2)+"_"+CommonFunction.getUUID(5);
CorrelationData cd = new CorrelationData(transitionId);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnsCallback(returnsCallback);
MessagePostProcessor mpp = new MessagePostProcessor(){
@Override
public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
return message;
}
};
rabbitTemplate.convertAndSend(exchange,
routingKey,
msg,
mpp,
cd
);
}
//todo sendDirect应该改成 sendCommon加上T3超时校验还要加一个reply() 方法 专门用于 eapRequest 回复用
public void sendDirect(MQMessage mqMessage, Map<String,Object> properties, String exchange, String routingKey) throws Exception{
byte[] message = JSONObject.toJSONBytes(mqMessage);
MessageHeaders messageHeaders = new MessageHeaders(properties);
Message<?> msg = MessageBuilder.createMessage(message,messageHeaders);
//String transitionId = mqMessage.getHeader().getEquipmentId()+"_"+CommonFunction.getNowDate(2)+"_"+CommonFunction.getUUID(10);
String transitionId = mqMessage.getHeader().getTransactionId();
CorrelationData cd = new CorrelationData(transitionId);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnsCallback(returnsCallback);
rabbitTemplate.convertAndSend(exchange,
routingKey,
msg,
cd
);
}
/**
* @Description 向rabbitmq中发送消息direct模式

View File

@ -47,7 +47,7 @@ public class TestController {
vids.add("10001");
vids.add("10002");
queryEQStatusBody.setVidList(vids);
mqMessage.setBody(JSONObject.toJSONBytes(queryEQStatusBody));
mqMessage.setBody(JSONObject.toJSONString(queryEQStatusBody));
mqMessage.setHeader(header);
System.out.println(mqMessage);
//序列化

View File

@ -33,7 +33,7 @@ public class Main {
vids.add("10001");
vids.add("10002");
queryEQStatusBody.setVidList(vids);
mqMessage.setBody(JSONObject.toJSONBytes(queryEQStatusBody));
mqMessage.setBody(JSONObject.toJSONString(queryEQStatusBody));
mqMessage.setHeader(header);
AsyncFuture<MQMessage> add = SendedList.add(header.getTransactionId(),mqMessage);