Conflicts: src/main/resources/application.ymlmaster
@@ -148,14 +148,20 @@ public enum InfluxClient { | |||||
List<String> dropedTagNames = param.getDropedTagNames(); | List<String> dropedTagNames = param.getDropedTagNames(); | ||||
Range range = param.getRange(); | Range range = param.getRange(); | ||||
String bucket = param.getBucket(); | String bucket = param.getBucket(); | ||||
String tagName = param.getTag().getTagName(); | |||||
String tagValue = param.getTag().getTagValue(); | |||||
String tagName = null; | |||||
String tagValue = null; | |||||
if (param.getTag()!=null){ | |||||
tagName = param.getTag().getTagName(); | |||||
tagValue = param.getTag().getTagValue(); | |||||
} | |||||
PageInfo pageInfo = param.getPageInfo(); | PageInfo pageInfo = param.getPageInfo(); | ||||
String flux = "from(bucket:\""+bucket+"\")"; | String flux = "from(bucket:\""+bucket+"\")"; | ||||
flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")"; | flux += "|> range(start: "+range.getBegin()+",stop:"+range.getEnd()+")"; | ||||
flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")"; | flux += "|> filter(fn: (r) => r[\"_measurement\"] == \""+measurement+"\")"; | ||||
flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; | |||||
if (tagName!=null){ | |||||
flux += "|> filter(fn: (r) => r[\""+tagName+"\"] == \""+tagValue+"\")"; | |||||
} | |||||
for(String dropName:dropedTagNames){ | for(String dropName:dropedTagNames){ | ||||
flux += "|> drop(columns: [\""+dropName+"\"])"; | flux += "|> drop(columns: [\""+dropName+"\"])"; | ||||
} | } | ||||
@@ -163,6 +169,7 @@ public enum InfluxClient { | |||||
if(pageInfo!=null){ | if(pageInfo!=null){ | ||||
flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; | flux += "|> limit(n: "+pageInfo.getSize()+", offset: "+(pageInfo.getCurrent()-1)* pageInfo.getSize()+")"; | ||||
} | } | ||||
System.out.println(flux); | |||||
return queryApi.query(flux); | return queryApi.query(flux); | ||||
} | } | ||||
@@ -221,15 +221,19 @@ public class InfluxController { | |||||
@PostMapping("/queryEvents") | @PostMapping("/queryEvents") | ||||
public JSONArray queryEvents(@RequestBody QueryDataParam queryDataParam) throws InterruptedException{ | public JSONArray queryEvents(@RequestBody QueryDataParam queryDataParam) throws InterruptedException{ | ||||
List<FluxTable> fluxTables = InfluxClient.Client.query(queryDataParam); | List<FluxTable> fluxTables = InfluxClient.Client.query(queryDataParam); | ||||
List<FluxRecord> fluxRecords = fluxTables.get(0).getRecords(); | |||||
List<FluxRecord> fluxRecords = new ArrayList<>(); | |||||
for (FluxTable fluxTable:fluxTables){ | |||||
fluxRecords.addAll(fluxTable.getRecords()); | |||||
} | |||||
JSONArray jsonArray = new JSONArray(); | JSONArray jsonArray = new JSONArray(); | ||||
for (FluxRecord fluxRecord:fluxRecords){ | |||||
for (FluxRecord fluxRecord:fluxRecords) { | |||||
Map<String,Object> map = fluxRecord.getValues(); | Map<String,Object> map = fluxRecord.getValues(); | ||||
System.out.println(map); | System.out.println(map); | ||||
//todo | |||||
JSONObject jsonObject = new JSONObject(); | JSONObject jsonObject = new JSONObject(); | ||||
jsonObject.put(map.get("argName").toString(),map.get("_value").toString()); | jsonObject.put(map.get("argName").toString(),map.get("_value").toString()); | ||||
jsonObject.put("transationId",map.get("transationId").toString()); | |||||
if (map.get("transationId")!=null){ | |||||
jsonObject.put("transationId",map.get("transationId").toString()); | |||||
} | |||||
jsonObject.put("measurement",map.get("_measurement").toString()); | jsonObject.put("measurement",map.get("_measurement").toString()); | ||||
jsonObject.put("time",map.get("_time").toString()); | jsonObject.put("time",map.get("_time").toString()); | ||||
jsonArray.add(jsonObject); | jsonArray.add(jsonObject); | ||||
@@ -1,98 +0,0 @@ | |||||
package com.qgs.dc.mq.configuration; | |||||
import org.springframework.amqp.core.Binding; | |||||
import org.springframework.amqp.core.BindingBuilder; | |||||
import org.springframework.amqp.core.DirectExchange; | |||||
import org.springframework.amqp.core.Queue; | |||||
import org.springframework.context.annotation.Bean; | |||||
import org.springframework.context.annotation.Configuration; | |||||
/** | |||||
* @Desc: "设备:PID13S 相关信息定义" | |||||
* @Author: caixiang | |||||
* @DATE: 2021/6/7 9:11 | |||||
*/ | |||||
@Configuration | |||||
public class ConfigOfPID13S { | |||||
//水平扩展其他设备的时候 只要:control+R 然后 PID13S=>00C 然后replace all | |||||
public static final String EQUIPMENT_NAME_PID13S = "PID13S"; | |||||
public static final String EXCHANGE_NAME_PID13S = EQUIPMENT_NAME_PID13S +"_Exchange"; | |||||
public static final String EAP_REQUEST_QUEUE_PID13S = EQUIPMENT_NAME_PID13S +"_EAP_Request_Queue"; | |||||
public static final String EAP_RESPONSE_QUEUE_PID13S = EQUIPMENT_NAME_PID13S +"_EAP_Response_Queue"; | |||||
public static final String MES_REQUEST_QUEUE_PID13S = EQUIPMENT_NAME_PID13S +"_MES_Request_Queue"; | |||||
public static final String MES_RESPONSE_QUEUE_PID13S = EQUIPMENT_NAME_PID13S +"_MES_Response_Queue"; | |||||
public static final String EAP_REQUEST_QUEUE_ROUTINGKEY_PID13S = EQUIPMENT_NAME_PID13S +"_EAP_Request_Queue_RoutingKey"; | |||||
public static final String EAP_RESPONSE_QUEUE_ROUTINGKEY_PID13S = EQUIPMENT_NAME_PID13S +"_EAP_Response_Queue_RoutingKey"; | |||||
public static final String MES_REQUEST_QUEUE_ROUTINGKEY_PID13S = EQUIPMENT_NAME_PID13S +"_MES_Request_Queue_RoutingKey"; | |||||
public static final String MES_RESPONSE_QUEUE_ROUTINGKEY_PID13S = EQUIPMENT_NAME_PID13S +"_MES_Response_Queue_RoutingKey"; | |||||
@Bean | |||||
public DirectExchange EXCHANGE_NAME_PID13S(){ | |||||
return new DirectExchange(EXCHANGE_NAME_PID13S); | |||||
} | |||||
//todo | |||||
@Bean | |||||
public Queue MES_REQUEST_QUEUE_PID13S(){ | |||||
Queue queue = new Queue(MES_REQUEST_QUEUE_PID13S); | |||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); | |||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); | |||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); | |||||
return queue; | |||||
} | |||||
@Bean | |||||
public Queue MES_RESPONSE_QUEUE_PID13S(){ | |||||
Queue queue = new Queue(MES_RESPONSE_QUEUE_PID13S); | |||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); | |||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); | |||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); | |||||
return queue; | |||||
} | |||||
@Bean | |||||
public Queue EAP_REQUEST_QUEUE_PID13S(){ | |||||
Queue queue = new Queue(EAP_REQUEST_QUEUE_PID13S); | |||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); | |||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); | |||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); | |||||
return queue; | |||||
} | |||||
@Bean | |||||
public Queue EAP_RESPONSE_QUEUE_PID13S(){ | |||||
Queue queue = new Queue(EAP_RESPONSE_QUEUE_PID13S); | |||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); | |||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); | |||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); | |||||
return queue; | |||||
} | |||||
@Bean | |||||
public Binding bindExchangeAndQueueA_PID13S(){ | |||||
return BindingBuilder.bind(EAP_REQUEST_QUEUE_PID13S()).to(EXCHANGE_NAME_PID13S()) | |||||
.with(EAP_REQUEST_QUEUE_ROUTINGKEY_PID13S); | |||||
} | |||||
@Bean | |||||
public Binding bindExchangeAndQueueB_PID13S(){ | |||||
return BindingBuilder.bind(EAP_RESPONSE_QUEUE_PID13S()).to(EXCHANGE_NAME_PID13S()) | |||||
.with(EAP_RESPONSE_QUEUE_ROUTINGKEY_PID13S); | |||||
} | |||||
@Bean | |||||
public Binding bindExchangeAndQueueC_PID13S(){ | |||||
return BindingBuilder.bind(MES_REQUEST_QUEUE_PID13S()).to(EXCHANGE_NAME_PID13S()) | |||||
.with(MES_REQUEST_QUEUE_ROUTINGKEY_PID13S); | |||||
} | |||||
@Bean | |||||
public Binding bindExchangeAndQueueD_PID13S(){ | |||||
return BindingBuilder.bind(MES_RESPONSE_QUEUE_PID13S()).to(EXCHANGE_NAME_PID13S()) | |||||
.with(MES_RESPONSE_QUEUE_ROUTINGKEY_PID13S); | |||||
} | |||||
} |
@@ -0,0 +1,98 @@ | |||||
package com.qgs.dc.mq.configuration; | |||||
import org.springframework.amqp.core.Binding; | |||||
import org.springframework.amqp.core.BindingBuilder; | |||||
import org.springframework.amqp.core.DirectExchange; | |||||
import org.springframework.amqp.core.Queue; | |||||
import org.springframework.context.annotation.Bean; | |||||
import org.springframework.context.annotation.Configuration; | |||||
/** | |||||
* @Desc: "设备:PID13a1415a_1 相关信息定义" | |||||
* @Author: caixiang | |||||
* @DATE: 2021/6/7 9:11 | |||||
*/ | |||||
@Configuration | |||||
public class ConfigOfPID13a1415a_1 { | |||||
//水平扩展其他设备的时候 只要:control+R 然后 PID13a1415a_1=>00C 然后replace all | |||||
public static final String EQUIPMENT_NAME_PID13a1415a_1 = "PID13a1415a_1"; | |||||
public static final String EXCHANGE_NAME_PID13a1415a_1 = EQUIPMENT_NAME_PID13a1415a_1 +"_Exchange"; | |||||
public static final String EAP_REQUEST_QUEUE_PID13a1415a_1 = EQUIPMENT_NAME_PID13a1415a_1 +"_EAP_Request_Queue"; | |||||
public static final String EAP_RESPONSE_QUEUE_PID13a1415a_1 = EQUIPMENT_NAME_PID13a1415a_1 +"_EAP_Response_Queue"; | |||||
public static final String MES_REQUEST_QUEUE_PID13a1415a_1 = EQUIPMENT_NAME_PID13a1415a_1 +"_MES_Request_Queue"; | |||||
public static final String MES_RESPONSE_QUEUE_PID13a1415a_1 = EQUIPMENT_NAME_PID13a1415a_1 +"_MES_Response_Queue"; | |||||
public static final String EAP_REQUEST_QUEUE_ROUTINGKEY_PID13a1415a_1 = EQUIPMENT_NAME_PID13a1415a_1 +"_EAP_Request_Queue_RoutingKey"; | |||||
public static final String EAP_RESPONSE_QUEUE_ROUTINGKEY_PID13a1415a_1 = EQUIPMENT_NAME_PID13a1415a_1 +"_EAP_Response_Queue_RoutingKey"; | |||||
public static final String MES_REQUEST_QUEUE_ROUTINGKEY_PID13a1415a_1 = EQUIPMENT_NAME_PID13a1415a_1 +"_MES_Request_Queue_RoutingKey"; | |||||
public static final String MES_RESPONSE_QUEUE_ROUTINGKEY_PID13a1415a_1 = EQUIPMENT_NAME_PID13a1415a_1 +"_MES_Response_Queue_RoutingKey"; | |||||
@Bean | |||||
public DirectExchange EXCHANGE_NAME_PID13a1415a_1(){ | |||||
return new DirectExchange(EXCHANGE_NAME_PID13a1415a_1); | |||||
} | |||||
//todo | |||||
@Bean | |||||
public Queue MES_REQUEST_QUEUE_PID13a1415a_1(){ | |||||
Queue queue = new Queue(MES_REQUEST_QUEUE_PID13a1415a_1); | |||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); | |||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); | |||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); | |||||
return queue; | |||||
} | |||||
@Bean | |||||
public Queue MES_RESPONSE_QUEUE_PID13a1415a_1(){ | |||||
Queue queue = new Queue(MES_RESPONSE_QUEUE_PID13a1415a_1); | |||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); | |||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); | |||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); | |||||
return queue; | |||||
} | |||||
@Bean | |||||
public Queue EAP_REQUEST_QUEUE_PID13a1415a_1(){ | |||||
Queue queue = new Queue(EAP_REQUEST_QUEUE_PID13a1415a_1); | |||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); | |||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); | |||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); | |||||
return queue; | |||||
} | |||||
@Bean | |||||
public Queue EAP_RESPONSE_QUEUE_PID13a1415a_1(){ | |||||
Queue queue = new Queue(EAP_RESPONSE_QUEUE_PID13a1415a_1); | |||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); | |||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); | |||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); | |||||
return queue; | |||||
} | |||||
@Bean | |||||
public Binding bindExchangeAndQueueA_PID13a1415a_1(){ | |||||
return BindingBuilder.bind(EAP_REQUEST_QUEUE_PID13a1415a_1()).to(EXCHANGE_NAME_PID13a1415a_1()) | |||||
.with(EAP_REQUEST_QUEUE_ROUTINGKEY_PID13a1415a_1); | |||||
} | |||||
@Bean | |||||
public Binding bindExchangeAndQueueB_PID13a1415a_1(){ | |||||
return BindingBuilder.bind(EAP_RESPONSE_QUEUE_PID13a1415a_1()).to(EXCHANGE_NAME_PID13a1415a_1()) | |||||
.with(EAP_RESPONSE_QUEUE_ROUTINGKEY_PID13a1415a_1); | |||||
} | |||||
@Bean | |||||
public Binding bindExchangeAndQueueC_PID13a1415a_1(){ | |||||
return BindingBuilder.bind(MES_REQUEST_QUEUE_PID13a1415a_1()).to(EXCHANGE_NAME_PID13a1415a_1()) | |||||
.with(MES_REQUEST_QUEUE_ROUTINGKEY_PID13a1415a_1); | |||||
} | |||||
@Bean | |||||
public Binding bindExchangeAndQueueD_PID13a1415a_1(){ | |||||
return BindingBuilder.bind(MES_RESPONSE_QUEUE_PID13a1415a_1()).to(EXCHANGE_NAME_PID13a1415a_1()) | |||||
.with(MES_RESPONSE_QUEUE_ROUTINGKEY_PID13a1415a_1); | |||||
} | |||||
} |
@@ -0,0 +1,98 @@ | |||||
package com.qgs.dc.mq.configuration; | |||||
import org.springframework.amqp.core.Binding; | |||||
import org.springframework.amqp.core.BindingBuilder; | |||||
import org.springframework.amqp.core.DirectExchange; | |||||
import org.springframework.amqp.core.Queue; | |||||
import org.springframework.context.annotation.Bean; | |||||
import org.springframework.context.annotation.Configuration; | |||||
/** | |||||
* @Desc: "设备:PID2_1 相关信息定义" | |||||
* @Author: caixiang | |||||
* @DATE: 2021/6/7 9:11 | |||||
*/ | |||||
@Configuration | |||||
public class ConfigOfPID2_1 { | |||||
//水平扩展其他设备的时候 只要:control+R 然后 PID2_1=>00C 然后replace all | |||||
public static final String EQUIPMENT_NAME_PID2_1 = "PID2_1"; | |||||
public static final String EXCHANGE_NAME_PID2_1 = EQUIPMENT_NAME_PID2_1 +"_Exchange"; | |||||
public static final String EAP_REQUEST_QUEUE_PID2_1 = EQUIPMENT_NAME_PID2_1 +"_EAP_Request_Queue"; | |||||
public static final String EAP_RESPONSE_QUEUE_PID2_1 = EQUIPMENT_NAME_PID2_1 +"_EAP_Response_Queue"; | |||||
public static final String MES_REQUEST_QUEUE_PID2_1 = EQUIPMENT_NAME_PID2_1 +"_MES_Request_Queue"; | |||||
public static final String MES_RESPONSE_QUEUE_PID2_1 = EQUIPMENT_NAME_PID2_1 +"_MES_Response_Queue"; | |||||
public static final String EAP_REQUEST_QUEUE_ROUTINGKEY_PID2_1 = EQUIPMENT_NAME_PID2_1 +"_EAP_Request_Queue_RoutingKey"; | |||||
public static final String EAP_RESPONSE_QUEUE_ROUTINGKEY_PID2_1 = EQUIPMENT_NAME_PID2_1 +"_EAP_Response_Queue_RoutingKey"; | |||||
public static final String MES_REQUEST_QUEUE_ROUTINGKEY_PID2_1 = EQUIPMENT_NAME_PID2_1 +"_MES_Request_Queue_RoutingKey"; | |||||
public static final String MES_RESPONSE_QUEUE_ROUTINGKEY_PID2_1 = EQUIPMENT_NAME_PID2_1 +"_MES_Response_Queue_RoutingKey"; | |||||
@Bean | |||||
public DirectExchange EXCHANGE_NAME_PID2_1(){ | |||||
return new DirectExchange(EXCHANGE_NAME_PID2_1); | |||||
} | |||||
//todo | |||||
@Bean | |||||
public Queue MES_REQUEST_QUEUE_PID2_1(){ | |||||
Queue queue = new Queue(MES_REQUEST_QUEUE_PID2_1); | |||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); | |||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); | |||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); | |||||
return queue; | |||||
} | |||||
@Bean | |||||
public Queue MES_RESPONSE_QUEUE_PID2_1(){ | |||||
Queue queue = new Queue(MES_RESPONSE_QUEUE_PID2_1); | |||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); | |||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); | |||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); | |||||
return queue; | |||||
} | |||||
@Bean | |||||
public Queue EAP_REQUEST_QUEUE_PID2_1(){ | |||||
Queue queue = new Queue(EAP_REQUEST_QUEUE_PID2_1); | |||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); | |||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); | |||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); | |||||
return queue; | |||||
} | |||||
@Bean | |||||
public Queue EAP_RESPONSE_QUEUE_PID2_1(){ | |||||
Queue queue = new Queue(EAP_RESPONSE_QUEUE_PID2_1); | |||||
queue.addArgument("x-dead-letter-exchange",ConfigOfDeadLetterQueue.EXCHANGE_NAME_DLE); | |||||
queue.addArgument("x-dead-letter-routing-key",ConfigOfDeadLetterQueue.Dead_Letter_RoutingKey); | |||||
//queue.addArgument("x-max-priority",ConfigOfDeadLetterQueue.MAX_PRIORITY); | |||||
return queue; | |||||
} | |||||
@Bean | |||||
public Binding bindExchangeAndQueueA_PID2_1(){ | |||||
return BindingBuilder.bind(EAP_REQUEST_QUEUE_PID2_1()).to(EXCHANGE_NAME_PID2_1()) | |||||
.with(EAP_REQUEST_QUEUE_ROUTINGKEY_PID2_1); | |||||
} | |||||
@Bean | |||||
public Binding bindExchangeAndQueueB_PID2_1(){ | |||||
return BindingBuilder.bind(EAP_RESPONSE_QUEUE_PID2_1()).to(EXCHANGE_NAME_PID2_1()) | |||||
.with(EAP_RESPONSE_QUEUE_ROUTINGKEY_PID2_1); | |||||
} | |||||
@Bean | |||||
public Binding bindExchangeAndQueueC_PID2_1(){ | |||||
return BindingBuilder.bind(MES_REQUEST_QUEUE_PID2_1()).to(EXCHANGE_NAME_PID2_1()) | |||||
.with(MES_REQUEST_QUEUE_ROUTINGKEY_PID2_1); | |||||
} | |||||
@Bean | |||||
public Binding bindExchangeAndQueueD_PID2_1(){ | |||||
return BindingBuilder.bind(MES_RESPONSE_QUEUE_PID2_1()).to(EXCHANGE_NAME_PID2_1()) | |||||
.with(MES_RESPONSE_QUEUE_ROUTINGKEY_PID2_1); | |||||
} | |||||
} |
@@ -8,6 +8,7 @@ import com.qgs.dc.mq.Constant.Constant; | |||||
import com.qgs.dc.mq.configuration.ConfigOfPID10_1; | import com.qgs.dc.mq.configuration.ConfigOfPID10_1; | ||||
import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; | import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; | ||||
import com.qgs.dc.mq.entity.MQMessage; | import com.qgs.dc.mq.entity.MQMessage; | ||||
import com.qgs.dc.mq.entity.MQToMesMessage; | |||||
import com.qgs.dc.mq.entity.common.Header; | import com.qgs.dc.mq.entity.common.Header; | ||||
import com.qgs.dc.mq.entity.common.Returns; | import com.qgs.dc.mq.entity.common.Returns; | ||||
import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody; | import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody; | ||||
@@ -66,23 +67,24 @@ public class PID10_1Received{ | |||||
reply(mqMessage); | reply(mqMessage); | ||||
// //回调给MES --- 开始 | // //回调给MES --- 开始 | ||||
// //1. 正常情况 | |||||
// String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage)); | |||||
// Integer result = Integer.valueOf(integer); | |||||
// | |||||
// //注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 | |||||
// if(result == 1){ | |||||
// logger.info("在 EAP_REQUEST_QUEUE_PID4B 队列中,transitionId"+transactionId+", 这条消息处理成功"); | |||||
// channel.basicAck(deliveryTag,false); | |||||
// }else { | |||||
// logger.error("在 EAP_REQUEST_QUEUE_PID4B 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); | |||||
// channel.basicNack(deliveryTag,false,false); | |||||
// } | |||||
//1. 正常情况 | |||||
MQToMesMessage mqToMesMessage = new MQToMesMessage(mqMessage.getHeader(),mqMessage.getBody(),mqMessage.getReturns()); | |||||
String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqToMesMessage)); | |||||
Integer result = Integer.valueOf(integer); | |||||
//注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 | |||||
if(result == 1){ | |||||
logger.info("在 EAP_REQUEST_QUEUE_PID4B 队列中,transitionId"+transactionId+", 这条消息处理成功"); | |||||
channel.basicAck(deliveryTag,false); | |||||
}else { | |||||
logger.error("在 EAP_REQUEST_QUEUE_PID4B 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); | |||||
channel.basicNack(deliveryTag,false,false); | |||||
} | |||||
// //回调给MES --- 结束 | // //回调给MES --- 结束 | ||||
channel.basicAck(deliveryTag,false); | |||||
// channel.basicAck(deliveryTag,false); | |||||
}catch (Exception e){ | }catch (Exception e){ | ||||
// 第一个false 是 不批量签收;第二个false 是 不重回队列 | // 第一个false 是 不批量签收;第二个false 是 不重回队列 | ||||
logger.error(e.getMessage()); | logger.error(e.getMessage()); | ||||
@@ -0,0 +1,169 @@ | |||||
package com.qgs.dc.mq.consumer; | |||||
import cn.hutool.http.HttpUtil; | |||||
import com.alibaba.fastjson.JSON; | |||||
import com.alibaba.fastjson.JSONObject; | |||||
import com.qgs.dc.common.utils.CommonFunction; | |||||
import com.qgs.dc.mq.Constant.Constant; | |||||
import com.qgs.dc.mq.configuration.ConfigOfPID13a1415a_1; | |||||
import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; | |||||
import com.qgs.dc.mq.entity.MQMessage; | |||||
import com.qgs.dc.mq.entity.common.Header; | |||||
import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody; | |||||
import com.rabbitmq.client.Channel; | |||||
import org.slf4j.Logger; | |||||
import org.slf4j.LoggerFactory; | |||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler; | |||||
import org.springframework.amqp.rabbit.annotation.RabbitListener; | |||||
import org.springframework.amqp.support.AmqpHeaders; | |||||
import org.springframework.beans.factory.annotation.Autowired; | |||||
import org.springframework.messaging.Message; | |||||
import org.springframework.stereotype.Component; | |||||
import java.nio.charset.StandardCharsets; | |||||
import java.util.ArrayList; | |||||
import java.util.List; | |||||
/** | |||||
* @Desc: "PIDPID13a1415a_1设备 接收MQ消息 监听类" | |||||
* @Author: caixiang | |||||
* @DATE: 2021/6/22 15:30 | |||||
* | |||||
* Ctrl+R 替换设备名 | |||||
*/ | |||||
@Component | |||||
public class PID13a1415a_1Received { | |||||
private static final Logger logger = LoggerFactory.getLogger(PID13a1415a_1Received.class); | |||||
@Autowired | |||||
MQMessageHandler mqMessageHandler; | |||||
@RabbitListener(queues = ConfigOfPID13a1415a_1.EAP_REQUEST_QUEUE_PID13a1415a_1) | |||||
@RabbitHandler | |||||
public void eapRequest(Message<?> message, Channel channel)throws Exception{ | |||||
logger.info("==============received message-EAP_REQUEST_QUEUE_PID13a1415a_1=================,priority:"+"equipmentName"+message.getHeaders().get("attr2")); | |||||
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); | |||||
// //MQMessage 中的 transactionId | |||||
// String transactionId = (String)message.getHeaders().get("spring_returned_message_correlation"); | |||||
// //logger.info("transactionId:"+transactionId); | |||||
try { | |||||
MQMessage mqMessage = CommonFunction.parse(message); | |||||
String transactionId = mqMessage.getHeader().getTransactionId(); | |||||
//1. 正常情况 | |||||
//Integer integer = mqMessageHandler.requestHandler(message); | |||||
String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage)); | |||||
Integer result = Integer.valueOf(integer); | |||||
//注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 | |||||
if(result == 1){ | |||||
logger.info("在 EAP_REQUEST_QUEUE_PID13a1415a_1 队列中,transitionId"+transactionId+", 这条消息处理成功"); | |||||
channel.basicAck(deliveryTag,false); | |||||
}else { | |||||
logger.error("在 EAP_REQUEST_QUEUE_PID13a1415a_1 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); | |||||
channel.basicNack(deliveryTag,false,false); | |||||
} | |||||
//2.模拟异常 ,然后 拒签消息 ,然后丢到死信队列 | |||||
}catch (Exception e){ | |||||
// 第一个false 是 不批量签收;第二个false 是 不重回队列 | |||||
logger.error(e.getMessage()); | |||||
channel.basicNack(deliveryTag,false,false); | |||||
return; | |||||
} | |||||
} | |||||
public static void main(String[] args) { | |||||
//localhost:8001 | |||||
MQMessage mqMessage = new MQMessage(); | |||||
Header header = new Header("Request","Execute","QUERYEQPStatus","12"); | |||||
QueryEQStatusBody queryEQStatusBody = new QueryEQStatusBody(); | |||||
queryEQStatusBody.setVidType("u4"); | |||||
List<String> vids = new ArrayList<>(); | |||||
vids.add("10000"); | |||||
vids.add("10001"); | |||||
vids.add("10002"); | |||||
queryEQStatusBody.setVidList(vids); | |||||
//queryEQStatusBodys => {"vidList":["10000","10001","10002"],"vidType":"u4"} | |||||
//bytes => [123, 34, 118, 105, 100, 76, 105, 115, 116, 34, 58, 91, 34, 49, 48, 48, 48, 48, 34, 44, 34, 49, 48, 48, 48, 49, 34, 44, 34, 49, 48, 48, 48, 50, 34, 93, 44, 34, 118, 105, 100, 84, 121, 112, 101, 34, 58, 34, 117, 52, 34, 125] | |||||
//"body": "eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==" | |||||
//message => {"header":{"transactionId":"12_20210908141555_d7d88","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 14:15:56","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} | |||||
//new String(bytes) : {"vidType":"u4","vidList":["10000","10001","10002"]} | |||||
String queryEQStatusBodys = JSONObject.toJSONString(queryEQStatusBody); | |||||
byte[] bytes = JSONObject.toJSONBytes(queryEQStatusBody); | |||||
System.out.println("myString :"+ new String(bytes)); | |||||
mqMessage.setBody(queryEQStatusBodys); | |||||
mqMessage.setHeader(header); | |||||
//{"header":{"transactionId":"12_20210908094750_d62d4","messageType":"Request","messageCategory":"Execute","messageName":"QUERYEQPStatus","equipmentId":"12","sendTimestamp":"2021-09-08 09:47:50","from":"mes","to":"eap"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} | |||||
String s = JSONObject.toJSONString(mqMessage); | |||||
MQMessage mqMessagessss = JSONObject.parseObject(s.getBytes(StandardCharsets.UTF_8), MQMessage.class); | |||||
System.out.println("mqMessagessss : " + mqMessagessss.toString()); | |||||
System.out.println(s); | |||||
//String result= HttpUtil.post("localhost:8001/receivedFromEapRequest",s); | |||||
//{"header":{"transactionId":"PIDPID13a1415a_1_20210908093729_95196","messageType":"Request","messageCategory":"Alarm","messageName":"AlarmTimeOut","from":"EAP","to":"MES","equipmentId":"PIDPID13a1415a_1","sendTimestamp":"2021-09-08 09:37:29"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} | |||||
String s22 = "{\"header\":{\"transactionId\":\"PIDPID13a1415a_1_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID13a1415a_1\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":\"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==\"}"; | |||||
String s2 = "{\"header\":{\"transactionId\":\"PIDPID13a1415a_1_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID13a1415a_1\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":{\"TransactionId:\":\"PIDPID13a1415a_1_20210908093644_71835\",\"MessageName:\":\"TraceData\"}}"; | |||||
JSONObject jsonObject = JSON.parseObject(s2); | |||||
String header1 = jsonObject.getString("header"); | |||||
String body = jsonObject.getString("body"); | |||||
String returns = jsonObject.getString("returns"); | |||||
System.out.println("header: "+header1); | |||||
System.out.println("body: "+body); | |||||
System.out.println("returns: "+returns); | |||||
byte[] bytes2 = s2.getBytes(); | |||||
MQMessage mqMessage2 = JSONObject.parseObject(bytes2, MQMessage.class); | |||||
System.out.println(mqMessage2.toString()); | |||||
} | |||||
@RabbitListener(queues = ConfigOfPID13a1415a_1.MES_RESPONSE_QUEUE_PID13a1415a_1) | |||||
@RabbitHandler | |||||
public void mesResponse(Message<?> message, Channel channel)throws Exception{ | |||||
logger.info("==============PIDPID13a1415a_1_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1")); | |||||
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); | |||||
//模拟异常 ,然后 拒签消息 ,然后丢到死信队列 | |||||
try { | |||||
MQMessage mqMessage = CommonFunction.parse(message); | |||||
String transactionId = mqMessage.getHeader().getTransactionId(); | |||||
Integer result = mqMessageHandler.responseHandler(message); | |||||
if(result == 1){ | |||||
logger.info("在 MES_RESPONSE_QUEUE_PID13a1415a_1 队列中 , transitionId"+transactionId+" 这条消息处理成功"); | |||||
channel.basicAck(deliveryTag,false); | |||||
}else { | |||||
logger.error("在 MES_RESPONSE_QUEUE_PID13a1415a_1 队列中 ,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); | |||||
channel.basicNack(deliveryTag,false,false); | |||||
} | |||||
//throw new Exception("11111"); | |||||
}catch (Exception e){ | |||||
// 第一个false 是 不批量签收;第二个false 是 不重回队列 | |||||
channel.basicNack(deliveryTag,false,false); | |||||
return; | |||||
} | |||||
//注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 | |||||
//channel.basicAck(deliveryTag,false); | |||||
} | |||||
} |
@@ -5,7 +5,7 @@ import com.alibaba.fastjson.JSON; | |||||
import com.alibaba.fastjson.JSONObject; | import com.alibaba.fastjson.JSONObject; | ||||
import com.qgs.dc.common.utils.CommonFunction; | import com.qgs.dc.common.utils.CommonFunction; | ||||
import com.qgs.dc.mq.Constant.Constant; | import com.qgs.dc.mq.Constant.Constant; | ||||
import com.qgs.dc.mq.configuration.ConfigOfPID13S; | |||||
import com.qgs.dc.mq.configuration.ConfigOfPID18; | import com.qgs.dc.mq.configuration.ConfigOfPID18; | ||||
import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; | import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; | ||||
import com.qgs.dc.mq.entity.MQMessage; | import com.qgs.dc.mq.entity.MQMessage; | ||||
@@ -5,9 +5,10 @@ import com.alibaba.fastjson.JSON; | |||||
import com.alibaba.fastjson.JSONObject; | import com.alibaba.fastjson.JSONObject; | ||||
import com.qgs.dc.common.utils.CommonFunction; | import com.qgs.dc.common.utils.CommonFunction; | ||||
import com.qgs.dc.mq.Constant.Constant; | import com.qgs.dc.mq.Constant.Constant; | ||||
import com.qgs.dc.mq.configuration.ConfigOfPID13S; | |||||
import com.qgs.dc.mq.configuration.ConfigOfPID2_1; | |||||
import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; | import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; | ||||
import com.qgs.dc.mq.entity.MQMessage; | import com.qgs.dc.mq.entity.MQMessage; | ||||
import com.qgs.dc.mq.entity.MQToMesMessage; | |||||
import com.qgs.dc.mq.entity.common.Header; | import com.qgs.dc.mq.entity.common.Header; | ||||
import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody; | import com.qgs.dc.mq.entity.specificBody.QueryEQStatusBody; | ||||
import com.rabbitmq.client.Channel; | import com.rabbitmq.client.Channel; | ||||
@@ -25,27 +26,27 @@ import java.util.ArrayList; | |||||
import java.util.List; | import java.util.List; | ||||
/** | /** | ||||
* @Desc: "PIDPID13S设备 接收MQ消息 监听类" | |||||
* @Desc: "PIDPID2_1设备 接收MQ消息 监听类" | |||||
* @Author: caixiang | * @Author: caixiang | ||||
* @DATE: 2021/6/22 15:30 | * @DATE: 2021/6/22 15:30 | ||||
* | * | ||||
* Ctrl+R 替换设备名 | * Ctrl+R 替换设备名 | ||||
*/ | */ | ||||
@Component | @Component | ||||
public class PID13SReceived { | |||||
public class PID2_1Received { | |||||
private static final Logger logger = LoggerFactory.getLogger(PID13SReceived.class); | |||||
private static final Logger logger = LoggerFactory.getLogger(PID2_1Received.class); | |||||
@Autowired | @Autowired | ||||
MQMessageHandler mqMessageHandler; | MQMessageHandler mqMessageHandler; | ||||
@RabbitListener(queues = ConfigOfPID13S.EAP_REQUEST_QUEUE_PID13S) | |||||
@RabbitListener(queues = ConfigOfPID2_1.EAP_REQUEST_QUEUE_PID2_1) | |||||
@RabbitHandler | @RabbitHandler | ||||
public void eapRequest(Message<?> message, Channel channel)throws Exception{ | public void eapRequest(Message<?> message, Channel channel)throws Exception{ | ||||
logger.info("==============received message-EAP_REQUEST_QUEUE_PID13S=================,priority:"+"equipmentName"+message.getHeaders().get("attr2")); | |||||
logger.info("==============received message-EAP_REQUEST_QUEUE_PID2_1=================,priority:"+"equipmentName"+message.getHeaders().get("attr2")); | |||||
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); | Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); | ||||
// //MQMessage 中的 transactionId | // //MQMessage 中的 transactionId | ||||
@@ -58,19 +59,22 @@ public class PID13SReceived { | |||||
String transactionId = mqMessage.getHeader().getTransactionId(); | String transactionId = mqMessage.getHeader().getTransactionId(); | ||||
//1. 正常情况 | //1. 正常情况 | ||||
//Integer integer = mqMessageHandler.requestHandler(message); | //Integer integer = mqMessageHandler.requestHandler(message); | ||||
String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqMessage)); | |||||
MQToMesMessage mqToMesMessage = new MQToMesMessage(mqMessage.getHeader(),mqMessage.getBody(),mqMessage.getReturns()); | |||||
String integer= HttpUtil.post(Constant.mesCallBackUrl,JSONObject.toJSONString(mqToMesMessage)); | |||||
Integer result = Integer.valueOf(integer); | Integer result = Integer.valueOf(integer); | ||||
//注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 | //注意 这里特别注意 已经拒收的消息 再签收是要出错的,这里要特别注意。 | ||||
if(result == 1){ | if(result == 1){ | ||||
logger.info("在 EAP_REQUEST_QUEUE_PID13S 队列中,transitionId"+transactionId+", 这条消息处理成功"); | |||||
logger.info("在 EAP_REQUEST_QUEUE_PID2_1 队列中,transitionId"+transactionId+", 这条消息处理成功"); | |||||
channel.basicAck(deliveryTag,false); | channel.basicAck(deliveryTag,false); | ||||
}else { | }else { | ||||
logger.error("在 EAP_REQUEST_QUEUE_PID13S 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); | |||||
logger.error("在 EAP_REQUEST_QUEUE_PID2_1 队列中,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); | |||||
channel.basicNack(deliveryTag,false,false); | channel.basicNack(deliveryTag,false,false); | ||||
} | } | ||||
//2.模拟异常 ,然后 拒签消息 ,然后丢到死信队列 | //2.模拟异常 ,然后 拒签消息 ,然后丢到死信队列 | ||||
//throw new Exception("11111"); | |||||
}catch (Exception e){ | }catch (Exception e){ | ||||
// 第一个false 是 不批量签收;第二个false 是 不重回队列 | // 第一个false 是 不批量签收;第二个false 是 不重回队列 | ||||
logger.error(e.getMessage()); | logger.error(e.getMessage()); | ||||
@@ -118,10 +122,10 @@ public class PID13SReceived { | |||||
System.out.println(s); | System.out.println(s); | ||||
//String result= HttpUtil.post("localhost:8001/receivedFromEapRequest",s); | //String result= HttpUtil.post("localhost:8001/receivedFromEapRequest",s); | ||||
//{"header":{"transactionId":"PIDPID13S_20210908093729_95196","messageType":"Request","messageCategory":"Alarm","messageName":"AlarmTimeOut","from":"EAP","to":"MES","equipmentId":"PIDPID13S","sendTimestamp":"2021-09-08 09:37:29"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} | |||||
String s22 = "{\"header\":{\"transactionId\":\"PIDPID13S_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID13S\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":\"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==\"}"; | |||||
//{"header":{"transactionId":"PIDPID2_1_20210908093729_95196","messageType":"Request","messageCategory":"Alarm","messageName":"AlarmTimeOut","from":"EAP","to":"MES","equipmentId":"PIDPID2_1","sendTimestamp":"2021-09-08 09:37:29"},"body":"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ=="} | |||||
String s22 = "{\"header\":{\"transactionId\":\"PIDPID2_1_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID2_1\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":\"eyJ2aWRMaXN0IjpbIjEwMDAwIiwiMTAwMDEiLCIxMDAwMiJdLCJ2aWRUeXBlIjoidTQifQ==\"}"; | |||||
String s2 = "{\"header\":{\"transactionId\":\"PIDPID13S_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID13S\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":{\"TransactionId:\":\"PIDPID13S_20210908093644_71835\",\"MessageName:\":\"TraceData\"}}"; | |||||
String s2 = "{\"header\":{\"transactionId\":\"PIDPID2_1_20210908093729_95196\",\"messageType\":\"Request\",\"messageCategory\":\"Alarm\",\"messageName\":\"AlarmTimeOut\",\"from\":\"EAP\",\"to\":\"MES\",\"equipmentId\":\"PIDPID2_1\",\"sendTimestamp\":\"2021-09-08 09:37:29\"},\"body\":{\"TransactionId:\":\"PIDPID2_1_20210908093644_71835\",\"MessageName:\":\"TraceData\"}}"; | |||||
JSONObject jsonObject = JSON.parseObject(s2); | JSONObject jsonObject = JSON.parseObject(s2); | ||||
String header1 = jsonObject.getString("header"); | String header1 = jsonObject.getString("header"); | ||||
String body = jsonObject.getString("body"); | String body = jsonObject.getString("body"); | ||||
@@ -135,10 +139,10 @@ public class PID13SReceived { | |||||
System.out.println(mqMessage2.toString()); | System.out.println(mqMessage2.toString()); | ||||
} | } | ||||
@RabbitListener(queues = ConfigOfPID13S.MES_RESPONSE_QUEUE_PID13S) | |||||
@RabbitListener(queues = ConfigOfPID2_1.MES_RESPONSE_QUEUE_PID2_1) | |||||
@RabbitHandler | @RabbitHandler | ||||
public void mesResponse(Message<?> message, Channel channel)throws Exception{ | public void mesResponse(Message<?> message, Channel channel)throws Exception{ | ||||
logger.info("==============PIDPID13S_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1")); | |||||
logger.info("==============PIDPID2_1_Exchange-MES_Response_Queue=================,priority:"+message.getHeaders().get("priority")+",attr1"+message.getHeaders().get("attr1")); | |||||
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); | Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); | ||||
//模拟异常 ,然后 拒签消息 ,然后丢到死信队列 | //模拟异常 ,然后 拒签消息 ,然后丢到死信队列 | ||||
@@ -147,10 +151,10 @@ public class PID13SReceived { | |||||
String transactionId = mqMessage.getHeader().getTransactionId(); | String transactionId = mqMessage.getHeader().getTransactionId(); | ||||
Integer result = mqMessageHandler.responseHandler(message); | Integer result = mqMessageHandler.responseHandler(message); | ||||
if(result == 1){ | if(result == 1){ | ||||
logger.info("在 MES_RESPONSE_QUEUE_PID13S 队列中 , transitionId"+transactionId+" 这条消息处理成功"); | |||||
logger.info("在 MES_RESPONSE_QUEUE_PID2_1 队列中 , transitionId"+transactionId+" 这条消息处理成功"); | |||||
channel.basicAck(deliveryTag,false); | channel.basicAck(deliveryTag,false); | ||||
}else { | }else { | ||||
logger.error("在 MES_RESPONSE_QUEUE_PID13S 队列中 ,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); | |||||
logger.error("在 MES_RESPONSE_QUEUE_PID2_1 队列中 ,transitionId"+transactionId+" 处理消息的时候 出现异常,然后 拒签消息 ,然后丢到死信队列"); | |||||
channel.basicNack(deliveryTag,false,false); | channel.basicNack(deliveryTag,false,false); | ||||
} | } | ||||
@@ -5,7 +5,7 @@ import com.alibaba.fastjson.JSON; | |||||
import com.alibaba.fastjson.JSONObject; | import com.alibaba.fastjson.JSONObject; | ||||
import com.qgs.dc.common.utils.CommonFunction; | import com.qgs.dc.common.utils.CommonFunction; | ||||
import com.qgs.dc.mq.Constant.Constant; | import com.qgs.dc.mq.Constant.Constant; | ||||
import com.qgs.dc.mq.configuration.ConfigOfPID13S; | |||||
import com.qgs.dc.mq.configuration.ConfigOfPID4B; | import com.qgs.dc.mq.configuration.ConfigOfPID4B; | ||||
import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; | import com.qgs.dc.mq.consumer.commonHandler.MQMessageHandler; | ||||
import com.qgs.dc.mq.entity.MQMessage; | import com.qgs.dc.mq.entity.MQMessage; | ||||
@@ -0,0 +1,27 @@ | |||||
package com.qgs.dc.mq.entity; | |||||
import com.alibaba.fastjson.annotation.JSONField; | |||||
import com.alibaba.fastjson.annotation.JSONType; | |||||
import com.qgs.dc.mq.entity.common.Header; | |||||
import com.qgs.dc.mq.entity.common.Returns; | |||||
import lombok.Data; | |||||
/** | |||||
* @Desc: "" | |||||
* @Author: caixiang | |||||
* @DATE: 2021/8/12 15:38 | |||||
*/ | |||||
@Data | |||||
@JSONType(orders={"header","body","returns"}) | |||||
public class MQToMesMessage { | |||||
private Header header; | |||||
private String body; | |||||
//在序列化和反序列化 的时候 都会把 json 里面的return字段转成 object 里面的returns字段 | |||||
private Returns returns; | |||||
public MQToMesMessage(Header header, String body, Returns returns) { | |||||
this.header = header; | |||||
this.body = body; | |||||
this.returns = returns; | |||||
} | |||||
} |
@@ -34,6 +34,17 @@ spring: | |||||
#multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间 | #multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间 | ||||
#================重试机制 结束 | #================重试机制 结束 | ||||
#influx: | |||||
# influxUrl: 'http://192.168.0.170:8086' | |||||
# bucket: 'qgs-bucket' | |||||
# org: 'qgs' | |||||
# token: 'lkBsC27QZr1W50BSPlGxpTqNNpwuUk5uz1dZZRPSPbCG5VmNDDUo8P3UkZIhGWwfJwkuz6ZGZ7Et4_KBaG3gHw==' | |||||
influx: | |||||
influxUrl: 'http://192.168.0.170:8086' | |||||
bucket: 'qgs-bucket' | |||||
org: 'qgs' | |||||
token: 'lkBsC27QZr1W50BSPlGxpTqNNpwuUk5uz1dZZRPSPbCG5VmNDDUo8P3UkZIhGWwfJwkuz6ZGZ7Et4_KBaG3gHw==' | |||||
# /health point | # /health point | ||||
#management: | #management: | ||||
# health: | # health: | ||||