Browse Source

PID2/PID10,查询接口

master
于连琛 1 year ago
parent
commit
ca72d89ab1
6 changed files with 554 additions and 7 deletions
  1. +10
    -3
      src/main/java/com/qgs/dc/influx/config/InfluxClient.java
  2. +8
    -4
      src/main/java/com/qgs/dc/influx/controller/InfluxController.java
  3. +98
    -0
      src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID10.java
  4. +98
    -0
      src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID2.java
  5. +170
    -0
      src/main/java/com/qgs/dc/mq/consumer/PID10Received.java
  6. +170
    -0
      src/main/java/com/qgs/dc/mq/consumer/PID2Received.java

+ 10
- 3
src/main/java/com/qgs/dc/influx/config/InfluxClient.java View File

@@ -143,14 +143,20 @@ public enum InfluxClient {
List<String> dropedTagNames = param.getDropedTagNames(); List<String> dropedTagNames = param.getDropedTagNames();
Range range = param.getRange(); Range range = param.getRange();
String bucket = param.getBucket(); String bucket = param.getBucket();
String tagName = param.getTag().getTagName();
String tagValue = param.getTag().getTagValue();
String tagName = null;
String tagValue = null;
if (param.getTag()!=null){
tagName = param.getTag().getTagName();
tagValue = param.getTag().getTagValue();
}
PageInfo pageInfo = param.getPageInfo(); PageInfo pageInfo = param.getPageInfo();


String flux = "from(bucket:\""+bucket+"\")"; String flux = "from(bucket:\""+bucket+"\")";
flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")"; flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")";
flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")"; flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")";
flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")";
if (tagName!=null){
flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")";
}
for(String dropName:dropedTagNames){ for(String dropName:dropedTagNames){
flux += "|> drop(columns: [\""+dropName+"\"])"; flux += "|> drop(columns: [\""+dropName+"\"])";
} }
@@ -158,6 +164,7 @@ public enum InfluxClient {
if(pageInfo!=null){ if(pageInfo!=null){
flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")";
} }
System.out.println(flux);
return queryApi.query(flux); return queryApi.query(flux);
} }




+ 8
- 4
src/main/java/com/qgs/dc/influx/controller/InfluxController.java View File

@@ -220,15 +220,19 @@ public class InfluxController {
@PostMapping("/queryEvents") @PostMapping("/queryEvents")
public JSONArray queryEvents(@RequestBody QueryDataParam queryDataParam) throws InterruptedException{ public JSONArray queryEvents(@RequestBody QueryDataParam queryDataParam) throws InterruptedException{
List<FluxTable> fluxTables = InfluxClient.Client.query(queryDataParam); List<FluxTable> fluxTables = InfluxClient.Client.query(queryDataParam);
List<FluxRecord> fluxRecords = fluxTables.get(0).getRecords();
List<FluxRecord> fluxRecords = new ArrayList<>();
for (FluxTable fluxTable:fluxTables){
fluxRecords.addAll(fluxTable.getRecords());
}
JSONArray jsonArray = new JSONArray(); JSONArray jsonArray = new JSONArray();
for (FluxRecord fluxRecord:fluxRecords){
for (FluxRecord fluxRecord:fluxRecords) {
Map<String,Object> map = fluxRecord.getValues(); Map<String,Object> map = fluxRecord.getValues();
System.out.println(map); System.out.println(map);
//todo
JSONObject jsonObject = new JSONObject(); JSONObject jsonObject = new JSONObject();
jsonObject.put(map.get("argName").toString(),map.get("_value").toString()); jsonObject.put(map.get("argName").toString(),map.get("_value").toString());
jsonObject.put("transationId",map.get("transationId").toString());
if (map.get("transationId")!=null){
jsonObject.put("transationId",map.get("transationId").toString());
}
jsonObject.put("measurement",map.get("_measurement").toString()); jsonObject.put("measurement",map.get("_measurement").toString());
jsonObject.put("time",map.get("_time").toString()); jsonObject.put("time",map.get("_time").toString());
jsonArray.add(jsonObject); jsonArray.add(jsonObject);


+ 98
- 0
src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID10.java View File

@@ -0,0 +1,98 @@
package com.qgs.dc.mq.configuration;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @Desc: "设备:PID10 相关信息定义"
* @Author: caixiang
* @DATE: 2021/6/7 9:11
*/
@Configuration
public class ConfigOfPID10 {

//水平扩展其他设备的时候 只要:control+R 然后 PID10=>00C 然后replace all
public static final String EQUIPMENT_NAME_PID10 = "PID10";

public static final String EXCHANGE_NAME_PID10 = EQUIPMENT_NAME_PID10 +"_Exchange";
public static final String EAP_REQUEST_QUEUE_PID10 = EQUIPMENT_NAME_PID10 +"_EAP_Request_Queue";
public static final String EAP_RESPONSE_QUEUE_PID10 = EQUIPMENT_NAME_PID10 +"_EAP_Response_Queue";
public static final String MES_REQUEST_QUEUE_PID10 = EQUIPMENT_NAME_PID10 +"_MES_Request_Queue";
public static final String MES_RESPONSE_QUEUE_PID10 = EQUIPMENT_NAME_PID10 +"_MES_Response_Queue";
public static final String EAP_REQUEST_QUEUE_ROUTINGKEY_PID10 = EQUIPMENT_NAME_PID10 +"_EAP_Request_Queue_RoutingKey";
public static final String EAP_RESPONSE_QUEUE_ROUTINGKEY_PID10 = EQUIPMENT_NAME_PID10 +"_EAP_Response_Queue_RoutingKey";
public static final String MES_REQUEST_QUEUE_ROUTINGKEY_PID10 = EQUIPMENT_NAME_PID10 +"_MES_Request_Queue_RoutingKey";
public static final String MES_RESPONSE_QUEUE_ROUTINGKEY_PID10 = EQUIPMENT_NAME_PID10 +"_MES_Response_Queue_RoutingKey";


@Bean
public DirectExchange EXCHANGE_NAME_PID10(){
return new DirectExchange(EXCHANGE_NAME_PID10);
}

//todo
@Bean
public Queue MES_REQUEST_QUEUE_PID10(){
Queue queue = new Queue(MES_REQUEST_QUEUE_PID10);
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
return queue;
}
@Bean
public Queue MES_RESPONSE_QUEUE_PID10(){
Queue queue = new Queue(MES_RESPONSE_QUEUE_PID10);
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
return queue;
}


@Bean
public Queue EAP_REQUEST_QUEUE_PID10(){
Queue queue = new Queue(EAP_REQUEST_QUEUE_PID10);
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
return queue;

}
@Bean
public Queue EAP_RESPONSE_QUEUE_PID10(){
Queue queue = new Queue(EAP_RESPONSE_QUEUE_PID10);
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
return queue;
}


@Bean
public Binding bindExchangeAndQueueA_PID10(){
return BindingBuilder.bind(EAP_REQUEST_QUEUE_PID10()).to(EXCHANGE_NAME_PID10())
.with(EAP_REQUEST_QUEUE_ROUTINGKEY_PID10);
}
@Bean
public Binding bindExchangeAndQueueB_PID10(){
return BindingBuilder.bind(EAP_RESPONSE_QUEUE_PID10()).to(EXCHANGE_NAME_PID10())
.with(EAP_RESPONSE_QUEUE_ROUTINGKEY_PID10);
}

@Bean
public Binding bindExchangeAndQueueC_PID10(){
return BindingBuilder.bind(MES_REQUEST_QUEUE_PID10()).to(EXCHANGE_NAME_PID10())
.with(MES_REQUEST_QUEUE_ROUTINGKEY_PID10);
}

@Bean
public Binding bindExchangeAndQueueD_PID10(){
return BindingBuilder.bind(MES_RESPONSE_QUEUE_PID10()).to(EXCHANGE_NAME_PID10())
.with(MES_RESPONSE_QUEUE_ROUTINGKEY_PID10);
}
}

+ 98
- 0
src/main/java/com/qgs/dc/mq/configuration/ConfigOfPID2.java View File

@@ -0,0 +1,98 @@
package com.qgs.dc.mq.configuration;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @Desc: "设备:PID2 相关信息定义"
* @Author: caixiang
* @DATE: 2021/6/7 9:11
*/
@Configuration
public class ConfigOfPID2 {

//水平扩展其他设备的时候 只要:control+R 然后 PID2=>00C 然后replace all
public static final String EQUIPMENT_NAME_PID2 = "PID2";

public static final String EXCHANGE_NAME_PID2 = EQUIPMENT_NAME_PID2 +"_Exchange";
public static final String EAP_REQUEST_QUEUE_PID2 = EQUIPMENT_NAME_PID2 +"_EAP_Request_Queue";
public static final String EAP_RESPONSE_QUEUE_PID2 = EQUIPMENT_NAME_PID2 +"_EAP_Response_Queue";
public static final String MES_REQUEST_QUEUE_PID2 = EQUIPMENT_NAME_PID2 +"_MES_Request_Queue";
public static final String MES_RESPONSE_QUEUE_PID2 = EQUIPMENT_NAME_PID2 +"_MES_Response_Queue";
public static final String EAP_REQUEST_QUEUE_ROUTINGKEY_PID2 = EQUIPMENT_NAME_PID2 +"_EAP_Request_Queue_RoutingKey";
public static final String EAP_RESPONSE_QUEUE_ROUTINGKEY_PID2 = EQUIPMENT_NAME_PID2 +"_EAP_Response_Queue_RoutingKey";
public static final String MES_REQUEST_QUEUE_ROUTINGKEY_PID2 = EQUIPMENT_NAME_PID2 +"_MES_Request_Queue_RoutingKey";
public static final String MES_RESPONSE_QUEUE_ROUTINGKEY_PID2 = EQUIPMENT_NAME_PID2 +"_MES_Response_Queue_RoutingKey";


@Bean
public DirectExchange EXCHANGE_NAME_PID2(){
return new DirectExchange(EXCHANGE_NAME_PID2);
}

//todo
@Bean
public Queue MES_REQUEST_QUEUE_PID2(){
Queue queue = new Queue(MES_REQUEST_QUEUE_PID2);
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
return queue;
}
@Bean
public Queue MES_RESPONSE_QUEUE_PID2(){
Queue queue = new Queue(MES_RESPONSE_QUEUE_PID2);
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
return queue;
}


@Bean
public Queue EAP_REQUEST_QUEUE_PID2(){
Queue queue = new Queue(EAP_REQUEST_QUEUE_PID2);
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
return queue;

}
@Bean
public Queue EAP_RESPONSE_QUEUE_PID2(){
Queue queue = new Queue(EAP_RESPONSE_QUEUE_PID2);
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE);
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey);
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY);
return queue;
}


@Bean
public Binding bindExchangeAndQueueA_PID2(){
return BindingBuilder.bind(EAP_REQUEST_QUEUE_PID2()).to(EXCHANGE_NAME_PID2())
.with(EAP_REQUEST_QUEUE_ROUTINGKEY_PID2);
}
@Bean
public Binding bindExchangeAndQueueB_PID2(){
return BindingBuilder.bind(EAP_RESPONSE_QUEUE_PID2()).to(EXCHANGE_NAME_PID2())
.with(EAP_RESPONSE_QUEUE_ROUTINGKEY_PID2);
}

@Bean
public Binding bindExchangeAndQueueC_PID2(){
return BindingBuilder.bind(MES_REQUEST_QUEUE_PID2()).to(EXCHANGE_NAME_PID2())
.with(MES_REQUEST_QUEUE_ROUTINGKEY_PID2);
}

@Bean
public Binding bindExchangeAndQueueD_PID2(){
return BindingBuilder.bind(MES_RESPONSE_QUEUE_PID2()).to(EXCHANGE_NAME_PID2())
.with(MES_RESPONSE_QUEUE_ROUTINGKEY_PID2);
}
}

+ 170
- 0
src/main/java/com/qgs/dc/mq/consumer/PID10Received.java View File

@@ -0,0 +1,170 @@
package com.qgs.dc.mq.consumer;

import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.qgs.dc.common.utils.CommonFunction;
import com.qgs.dc.mq.Constant.Constant;
import com.qgs.dc.mq.configuration.ConfigOfPID10;
import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler;
import com.qgs.dc.mq.entity.MQMessage;
import com.qgs.dc.mq.entity.common.Header;
import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

/**
* @Desc: "PIDPID10设备 接收MQ消息 监听类"
* @Author: caixiang
* @DATE: 2021/6/22 15:30
*
* Ctrl+R 替换设备名
*/
@Component
public class PID10Received {

private static final Logger logger = LoggerFactory.getLogger(PID10Received.class);



@Autowired
MQMessageHandler mqMessageHandler;

@RabbitListener(queues = ConfigOfPID10.EAP_REQUEST_QUEUE_PID10)
@RabbitHandler
public void eapRequest(Message<?> message, Channel channel)throws Exception{

logger.info("==============received message-EAP_REQUEST_QUEUE_PID10=================,priority:"+"equipmentName"+message.getHeaders().get("attr2"));

Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
// //MQMessage 中的 transactionId
// String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation");
// //logger.info("transactionId:"+transactionId);

try {

MQMessage mqMessage = CommonFunction.parse(message);
String transactionId = mqMessage.getHeader().getTransactionId();
//1. 正常情况
//Integer integer = mqMessageHandler.requestHandler(message);
String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage));
Integer result = Integer.valueOf(integer);

//注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。
if(result == 1){
logger.info("在 EAP_REQUEST_QUEUE_PID10 队列中,transitionId"+transactionId+", 这条消息处理成功");
channel.basicAck(deliveryTag,false);
}else {
logger.error("在 EAP_REQUEST_QUEUE_PID10 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列");
channel.basicNack(deliveryTag,false,false);
}
//2.模拟异常 ,然后 拒签消息 ,然后丢到死信队列

//throw new Exception("11111");
}catch (Exception e){
// 第一个false 是 不批量签收;第二个false 是 不重回队列
logger.error(e.getMessage());
channel.basicNack(deliveryTag,false,false);
return;
}



}


public static void main(String[] args) {
//localhost:8001


MQMessage mqMessage = new MQMessage();

Header header = new Header("Request","Execute","QUERYEQPStatus","12");
QueryEQStatusBody queryEQStatusBody = new QueryEQStatusBody();
queryEQStatusBody.setVidType("u4");
List<String> vids = new ArrayList<>();
vids.add("10000");
vids.add("10001");
vids.add("10002");
queryEQStatusBody.setVidList(vids);

//queryEQStatusBodys => {"vidList":["10000","10001","10002"],"vidType":"u4"}
//bytes => [123, 34, 118, 105, 100, 76, 105, 115, 116, 34, 58, 91, 34, 49, 48, 48, 48, 48, 34, 44, 34, 49, 48, 48, 48, 49, 34, 44, 34, 49, 48, 48, 48, 50, 34, 93, 44, 34, 118, 105, 100, 84, 121, 112, 101, 34, 58, 34, 117, 52, 34, 125]
//"body": "eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="
//message => {"header":{"transactionId":"12_20210908141555_d7d88","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 14:15:56","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
//new String(bytes) : {"vidType":"u4","vidList":["10000","10001","10002"]}
String queryEQStatusBodys = JSONObject.toJSONString(queryEQStatusBody);


byte[] bytes = JSONObject.toJSONBytes(queryEQStatusBody);
System.out.println("myString :"+ new String(bytes));
mqMessage.setBody(queryEQStatusBodys);
mqMessage.setHeader(header);

//{"header":{"transactionId":"12_20210908094750_d62d4","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 09:47:50","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
String s = JSONObject.toJSONString(mqMessage);
MQMessage mqMessagessss = JSONObject.parseObject(s.getBytes(StandardCharsets.UTF_8), MQMessage.class);
System.out.println("mqMessagessss : " + mqMessagessss.toString());
System.out.println(s);
//String result= HttpUtil.post("localhost:8001/receivedFromEapRequest",s);

//{"header":{"transactionId":"PIDPID10_20210908093729_95196","messageType":"Request","messageCategory":"Alarm","messageName":"AlarmTimeOut","from":"EAP","to":"MES","equipmentId":"PIDPID10","sendTimestamp":"2021-09-08 09:37:29"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
String s22 = "{\"header\":{\"transactionId\":\"PIDPID10_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID10\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":\"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==\"}";

String s2 = "{\"header\":{\"transactionId\":\"PIDPID10_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID10\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":{\"TransactionId:\":\"PIDPID10_20210908093644_71835\",\"MessageName:\":\"TraceData\"}}";
JSONObject jsonObject = JSON.parseObject(s2);
String header1 = jsonObject.getString("header");
String body = jsonObject.getString("body");
String returns = jsonObject.getString("returns");
System.out.println("header: "+header1);
System.out.println("body: "+body);
System.out.println("returns: "+returns);

byte[] bytes2 = s2.getBytes();
MQMessage mqMessage2 = JSONObject.parseObject(bytes2, MQMessage.class);
System.out.println(mqMessage2.toString());
}

@RabbitListener(queues = ConfigOfPID10.MES_RESPONSE_QUEUE_PID10)
@RabbitHandler
public void mesResponse(Message<?> message, Channel channel)throws Exception{
logger.info("==============PIDPID10_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1"));
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);

//模拟异常 ,然后 拒签消息 ,然后丢到死信队列
try {
MQMessage mqMessage = CommonFunction.parse(message);
String transactionId = mqMessage.getHeader().getTransactionId();
Integer result = mqMessageHandler.responseHandler(message);
if(result == 1){
logger.info("在 MES_RESPONSE_QUEUE_PID10 队列中 , transitionId"+transactionId+" 这条消息处理成功");
channel.basicAck(deliveryTag,false);
}else {
logger.error("在 MES_RESPONSE_QUEUE_PID10 队列中 ,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列");
channel.basicNack(deliveryTag,false,false);
}

//throw new Exception("11111");
}catch (Exception e){
// 第一个false 是 不批量签收;第二个false 是 不重回队列
channel.basicNack(deliveryTag,false,false);
return;
}

//注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。
//channel.basicAck(deliveryTag,false);

}

}

+ 170
- 0
src/main/java/com/qgs/dc/mq/consumer/PID2Received.java View File

@@ -0,0 +1,170 @@
package com.qgs.dc.mq.consumer;

import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.qgs.dc.common.utils.CommonFunction;
import com.qgs.dc.mq.Constant.Constant;
import com.qgs.dc.mq.configuration.ConfigOfPID2;
import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler;
import com.qgs.dc.mq.entity.MQMessage;
import com.qgs.dc.mq.entity.common.Header;
import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

/**
* @Desc: "PIDPID2设备 接收MQ消息 监听类"
* @Author: caixiang
* @DATE: 2021/6/22 15:30
*
* Ctrl+R 替换设备名
*/
@Component
public class PID2Received {

private static final Logger logger = LoggerFactory.getLogger(PID2Received.class);



@Autowired
MQMessageHandler mqMessageHandler;

@RabbitListener(queues = ConfigOfPID2.EAP_REQUEST_QUEUE_PID2)
@RabbitHandler
public void eapRequest(Message<?> message, Channel channel)throws Exception{

logger.info("==============received message-EAP_REQUEST_QUEUE_PID2=================,priority:"+"equipmentName"+message.getHeaders().get("attr2"));

Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
// //MQMessage 中的 transactionId
// String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation");
// //logger.info("transactionId:"+transactionId);

try {

MQMessage mqMessage = CommonFunction.parse(message);
String transactionId = mqMessage.getHeader().getTransactionId();
//1. 正常情况
//Integer integer = mqMessageHandler.requestHandler(message);
String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage));
Integer result = Integer.valueOf(integer);

//注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。
if(result == 1){
logger.info("在 EAP_REQUEST_QUEUE_PID2 队列中,transitionId"+transactionId+", 这条消息处理成功");
channel.basicAck(deliveryTag,false);
}else {
logger.error("在 EAP_REQUEST_QUEUE_PID2 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列");
channel.basicNack(deliveryTag,false,false);
}
//2.模拟异常 ,然后 拒签消息 ,然后丢到死信队列

//throw new Exception("11111");
}catch (Exception e){
// 第一个false 是 不批量签收;第二个false 是 不重回队列
logger.error(e.getMessage());
channel.basicNack(deliveryTag,false,false);
return;
}



}


public static void main(String[] args) {
//localhost:8001


MQMessage mqMessage = new MQMessage();

Header header = new Header("Request","Execute","QUERYEQPStatus","12");
QueryEQStatusBody queryEQStatusBody = new QueryEQStatusBody();
queryEQStatusBody.setVidType("u4");
List<String> vids = new ArrayList<>();
vids.add("10000");
vids.add("10001");
vids.add("10002");
queryEQStatusBody.setVidList(vids);

//queryEQStatusBodys => {"vidList":["10000","10001","10002"],"vidType":"u4"}
//bytes => [123, 34, 118, 105, 100, 76, 105, 115, 116, 34, 58, 91, 34, 49, 48, 48, 48, 48, 34, 44, 34, 49, 48, 48, 48, 49, 34, 44, 34, 49, 48, 48, 48, 50, 34, 93, 44, 34, 118, 105, 100, 84, 121, 112, 101, 34, 58, 34, 117, 52, 34, 125]
//"body": "eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="
//message => {"header":{"transactionId":"12_20210908141555_d7d88","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 14:15:56","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
//new String(bytes) : {"vidType":"u4","vidList":["10000","10001","10002"]}
String queryEQStatusBodys = JSONObject.toJSONString(queryEQStatusBody);


byte[] bytes = JSONObject.toJSONBytes(queryEQStatusBody);
System.out.println("myString :"+ new String(bytes));
mqMessage.setBody(queryEQStatusBodys);
mqMessage.setHeader(header);

//{"header":{"transactionId":"12_20210908094750_d62d4","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 09:47:50","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
String s = JSONObject.toJSONString(mqMessage);
MQMessage mqMessagessss = JSONObject.parseObject(s.getBytes(StandardCharsets.UTF_8), MQMessage.class);
System.out.println("mqMessagessss : " + mqMessagessss.toString());
System.out.println(s);
//String result= HttpUtil.post("localhost:8001/receivedFromEapRequest",s);

//{"header":{"transactionId":"PIDPID2_20210908093729_95196","messageType":"Request","messageCategory":"Alarm","messageName":"AlarmTimeOut","from":"EAP","to":"MES","equipmentId":"PIDPID2","sendTimestamp":"2021-09-08 09:37:29"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="}
String s22 = "{\"header\":{\"transactionId\":\"PIDPID2_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID2\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":\"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==\"}";

String s2 = "{\"header\":{\"transactionId\":\"PIDPID2_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID2\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":{\"TransactionId:\":\"PIDPID2_20210908093644_71835\",\"MessageName:\":\"TraceData\"}}";
JSONObject jsonObject = JSON.parseObject(s2);
String header1 = jsonObject.getString("header");
String body = jsonObject.getString("body");
String returns = jsonObject.getString("returns");
System.out.println("header: "+header1);
System.out.println("body: "+body);
System.out.println("returns: "+returns);

byte[] bytes2 = s2.getBytes();
MQMessage mqMessage2 = JSONObject.parseObject(bytes2, MQMessage.class);
System.out.println(mqMessage2.toString());
}

@RabbitListener(queues = ConfigOfPID2.MES_RESPONSE_QUEUE_PID2)
@RabbitHandler
public void mesResponse(Message<?> message, Channel channel)throws Exception{
logger.info("==============PIDPID2_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1"));
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);

//模拟异常 ,然后 拒签消息 ,然后丢到死信队列
try {
MQMessage mqMessage = CommonFunction.parse(message);
String transactionId = mqMessage.getHeader().getTransactionId();
Integer result = mqMessageHandler.responseHandler(message);
if(result == 1){
logger.info("在 MES_RESPONSE_QUEUE_PID2 队列中 , transitionId"+transactionId+" 这条消息处理成功");
channel.basicAck(deliveryTag,false);
}else {
logger.error("在 MES_RESPONSE_QUEUE_PID2 队列中 ,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列");
channel.basicNack(deliveryTag,false,false);
}

//throw new Exception("11111");
}catch (Exception e){
// 第一个false 是 不批量签收;第二个false 是 不重回队列
channel.basicNack(deliveryTag,false,false);
return;
}

//注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。
//channel.basicAck(deliveryTag,false);

}

}

Loading…
Cancel
Save