using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Serilog.Core; using RabbitMQ.Client; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using RabbitMQ.Client.Events; using Glorysoft.SECS.EQP.Utilities; using System.IO; using ARI.EAP.HOST.Common; using ARI.EAP.HOST.Handlers.EventHandlers; using ARI.EAP.HOST.SRD; using System.Threading; using Polly.Retry; using System.Net.Sockets; using RabbitMQ.Client.Exceptions; using Polly; namespace ARI.EAP.HOST.MQ { public class Mq { public readonly static ConnectionFactory connectionFactory; private IConnection conn; private IModel Channel; private EventingBasicConsumer consumer; private EventingBasicConsumer consumer2; private object sync_root = new object(); public bool IsConnected => conn != null && conn.IsOpen && !disposed; public bool disposed; static Mq() { connectionFactory = new ConnectionFactory() { HostName = Configuration.conf.mQConnectionCfg.HostName, VirtualHost = Configuration.conf.mQConnectionCfg.VirtualHost, UserName = Configuration.conf.mQConnectionCfg.UserName, Password = Configuration.conf.mQConnectionCfg.Password, Port = Configuration.conf.mQConnectionCfg.Port, ClientProvidedName = Configuration.conf.mQConnectionCfg.ClientProvidedName, RequestedHeartbeat = TimeSpan.FromSeconds(5) }; } public Mq() { Task.Run(() => CreatModel()); } public bool TryConnect() { lock (sync_root) { RetryPolicy policy = Policy.Handle() .Or()//ConnectionFactory.CreateConnection期间无法打开连接时抛出异常 .WaitAndRetryForever(retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => { LoggerService.MQLogger.Error(ex); LoggerService.MQLogger.Debug("MQ connect retry"); });// 永远等待并重试,每次等待2的指数次冥的时间 policy.Execute(() => { this.conn = connectionFactory.CreateConnection(); }); if (this.IsConnected) { //当连接被破坏时引发。如果在添加事件处理程序时连接已经被销毁对于此事件,事件处理程序将立即被触发。 this.conn.ConnectionShutdown += this.OnConnectionShutdown; //在连接调用的回调中发生异常时发出信号。当ConnectionShutdown处理程序抛出异常时,此事件将发出信号。如果将来有更多的事件出现在RabbitMQ.Client.IConnection上,那么这个事件当这些事件处理程序中的一个抛出异常时,它们将被标记。 this.conn.CallbackException += this.OnCallbackException; this.conn.ConnectionBlocked += this.OnConnectionBlocked;//MQ发生阻塞抛出异常 LoggerService.MQLogger.Debug("MQ open success"); return true; } else { LoggerService.MQLogger.Debug("MQ connections could not be created and opened"); return false; } } } public bool CreatModel() { try { if (conn != null) { conn.Close(); conn.Dispose(); } TryConnect(); Channel = conn.CreateModel(); Channel.ExchangeDeclare(Configuration.conf.mQConnectionCfg.Exchange, ExchangeType.Direct, durable: true); Channel.ExchangeDeclare(Configuration.conf.mQConnectionCfg.Exchange_RTDB, ExchangeType.Direct, durable: true); Dictionary arguments = new Dictionary(); arguments.Add("x-dead-letter-exchange", Configuration.conf.mQConnectionCfg.Exchange_Name_Dle); arguments.Add("x-dead-letter-routing-key", Configuration.conf.mQConnectionCfg.Dead_Letter_RoutingKey); Channel.QueueDeclare(Configuration.conf.mQConnectionCfg.EAP_Request_Queue, true, false, false, arguments); Channel.QueueBind(Configuration.conf.mQConnectionCfg.EAP_Request_Queue, Configuration.conf.mQConnectionCfg.Exchange, Configuration.conf.mQConnectionCfg.EAP_Request_Queue_RoutingKey); Channel.QueueDeclare(Configuration.conf.mQConnectionCfg.EAP_Response_Queue, true, false, false, arguments); Channel.QueueBind(Configuration.conf.mQConnectionCfg.EAP_Response_Queue, Configuration.conf.mQConnectionCfg.Exchange, Configuration.conf.mQConnectionCfg.EAP_Response_Queue_RoutingKey); consumer = new EventingBasicConsumer(Channel); consumer.Received += MESResponse; Channel.BasicConsume(Configuration.conf.mQConnectionCfg.EAP_Response_Queue, true, consumer); Channel.QueueDeclare(Configuration.conf.mQConnectionCfg.MES_Request_Queue, true, false, false, arguments); Channel.QueueBind(Configuration.conf.mQConnectionCfg.MES_Request_Queue, Configuration.conf.mQConnectionCfg.Exchange, Configuration.conf.mQConnectionCfg.MES_Request_Queue_RoutingKey); Channel.QueueDeclare(Configuration.conf.mQConnectionCfg.MES_Response_Queue, true, false, false, arguments); Channel.QueueBind(Configuration.conf.mQConnectionCfg.MES_Response_Queue, Configuration.conf.mQConnectionCfg.Exchange, Configuration.conf.mQConnectionCfg.MES_Response_Queue_RoutingKey); Channel.QueueDeclare(Configuration.conf.mQConnectionCfg.EAP_Request_Queue_RTDB, true, false, false, arguments); Channel.QueueBind(Configuration.conf.mQConnectionCfg.EAP_Request_Queue_RTDB, Configuration.conf.mQConnectionCfg.Exchange_RTDB,""); consumer2 = new EventingBasicConsumer(Channel); consumer2.Received += MESRequest; Channel.BasicConsume(Configuration.conf.mQConnectionCfg.MES_Request_Queue, true, consumer2); Global.MF.MQConnectStatusSet(true); return true; } catch(Exception e) { LoggerService.MQLogger.Error("MQ open connection error:" + e); return false; } } public bool Close() { if (conn != null) { try { conn.Close(); conn.Dispose(); disposed = true; Global.MF.MQConnectStatusSet(false); LoggerService.MQLogger.Debug("MQ Connection Closed By User."); return true; } catch (Exception ex) { LoggerService.MQLogger.Error("MQ Connection Close Error:" + ex); return false; } } return true; } void OnConnectionShutdown(object sender, ShutdownEventArgs ea) { Global.MF.MQConnectStatusSet(false); if (this.disposed) return; LoggerService.MQLogger.Debug("MQ is shutdown,try to reconnect"); if (CreatModel()) { Global.MF.MQConnectStatusSet(true); } } void OnCallbackException(object sender, CallbackExceptionEventArgs e) { Global.MF.MQConnectStatusSet(false); if (this.disposed) return; LoggerService.MQLogger.Debug("MQ throw an exception,try to reconnect"); if (CreatModel()) { Global.MF.MQConnectStatusSet(true); } } private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) { Global.MF.MQConnectStatusSet(false); if (this.disposed) return; LoggerService.MQLogger.Debug("MQ is shutdown,try to reconnect"); if (CreatModel()) { Global.MF.MQConnectStatusSet(true); } } private void MESResponse(object sender, BasicDeliverEventArgs ea) { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); MQMessage mQMessage = JsonConvert.DeserializeObject(message); Global.MF.addMQlog(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), "MQ=>H", LoggerService.Receive, mQMessage.header.messageName, mQMessage.header.transactionId); LoggerService.MQLogger.Info(mQMessage.header.transactionId + " MQ Receive:" + SECSUtil.ToJson(mQMessage) + System.Environment.NewLine + "."); } private void MESRequest(object sender, BasicDeliverEventArgs ea) { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); MQMessage mesMessage = JsonConvert.DeserializeObject(message); Global.MF.addMQlog(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), "MQ=>H", LoggerService.Receive, mesMessage.header.messageName, mesMessage.header.transactionId); LoggerService.MQLogger.Info(mesMessage.header.transactionId + " MQ Receive:" + SECSUtil.ToJson(mesMessage) + System.Environment.NewLine + "."); Task.Run(() => MESRequestHandler.Execute(mesMessage)); } public void EAPRequest(MQMessage message) { message.header.sendTimestamp = System.DateTime.Now.ToString("yyyy-MM-ddTHH:mm:ss.fff"); var text = SECSUtil.ToJson(message); var data = Encoding.UTF8.GetBytes(text); try { if (IsConnected) { Channel.BasicPublish(Configuration.conf.mQConnectionCfg.Exchange, Configuration.conf.mQConnectionCfg.EAP_Request_Queue_RoutingKey, null, data); Global.MF.addMQlog(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), "H=>MQ", LoggerService.Send, message.header.messageName, message.header.transactionId); LoggerService.MQLogger.Info(message.header.transactionId + " MQ Send:" + text + System.Environment.NewLine + "."); } else { Global.MF.MQConnectStatusSet(false); LoggerService.MQLogger.Debug("MQ is closed"); return; } } catch(Exception e) { LoggerService.MQLogger.Error("MQ send message error:" + text,e); } } public void EAPResponse(MQMessage message) { message.header.sendTimestamp = System.DateTime.Now.ToString("yyyy-MM-ddTHH:mm:ss.fff"); var text = SECSUtil.ToJson(message); var data = Encoding.UTF8.GetBytes(text); try { if (IsConnected) { Channel.BasicPublish(Configuration.conf.mQConnectionCfg.Exchange, Configuration.conf.mQConnectionCfg.MES_Response_Queue_RoutingKey, null, data); Global.MF.addMQlog(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), "H=>MQ", LoggerService.Send, message.header.messageName, message.header.transactionId); LoggerService.MQLogger.Info(message.header.transactionId + " MQ Send:" + text + System.Environment.NewLine + "."); } else { Global.MF.MQConnectStatusSet(false); LoggerService.MQLogger.Debug("MQ is closed"); return; } } catch (Exception e) { LoggerService.MQLogger.Info("MQ send message error:" + text, e); } } public void EAPRequestToRTDB(MQMessage message) { message.header.sendTimestamp = System.DateTime.Now.ToString("yyyy-MM-ddTHH:mm:ss.fff"); var text = SECSUtil.ToJson(message); var data = Encoding.UTF8.GetBytes(text); try { if (IsConnected) { Channel.BasicPublish(Configuration.conf.mQConnectionCfg.Exchange_RTDB, "", null, data); } else { return; } } catch (Exception e) { LoggerService.MQLogger.Error("MQ send message error:" + text, e); } } } }