using EasyNetQ.Internals; using Newtonsoft.Json; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Waste.Socket { #region RabbitMQ.Client原生封装类 /// /// RabbitMQ.Client原生封装类 /// public class RabbitMqService : IDisposable { #region 初始化 //RabbitMQ建议客户端线程之间不要共用Model,至少要保证共用Model的线程发送消息必须是串行的,但是建议尽量共用Connection。 private static readonly ConcurrentDictionary ModelDic = new ConcurrentDictionary(); private static RabbitMqAttribute _rabbitMqAttribute; private const string RabbitMqAttribute = "RabbitMqAttribute"; private static IConnection _conn; private static readonly object LockObj = new object(); private static void Open(MqConfig config) { if (_conn != null) return; lock (LockObj) { var factory = new ConnectionFactory { //设置主机名 HostName = config.Host, //设置心跳时间 RequestedHeartbeat = config.HeartBeat, //设置自动重连 AutomaticRecoveryEnabled = config.AutomaticRecoveryEnabled, //重连时间 NetworkRecoveryInterval = config.NetworkRecoveryInterval, //用户名 UserName = config.UserName, //密码 Password = config.Password }; factory.AutomaticRecoveryEnabled = true; factory.NetworkRecoveryInterval = new TimeSpan(1000); _conn = _conn ?? factory.CreateConnection(); } } private static RabbitMqAttribute GetRabbitMqAttribute() { if (_rabbitMqAttribute == null) { var typeOfT = typeof(T); _rabbitMqAttribute = typeOfT.GetAttribute(); } return _rabbitMqAttribute; } public RabbitMqService(MqConfig config) { Open(config); } #endregion #region 交换器声明 /// /// 交换器声明 /// /// /// 交换器 /// 交换器类型: /// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全 /// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的 /// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog /// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都 /// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout /// 交换机转发消息是最快的。 /// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多 /// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” /// 只会匹配到“audit.irs”。 /// 持久化 /// 自动删除 /// 参数 private static void ExchangeDeclare(IModel iModel, string exchange, string type = "fanout", bool durable = true, bool autoDelete = false, IDictionary arguments = null) { try { exchange = string.IsNullOrWhiteSpace(exchange) ? "" : exchange.Trim(); iModel.ExchangeDeclare(exchange, type, durable, autoDelete, arguments); } catch (Exception ex) { Console.WriteLine($"{DateTime.Now},交换器声明错误:{ex.Message}"); } } #endregion #region 队列声明 /// /// 队列声明 /// /// /// 队列 /// 持久化 /// 排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见, /// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可 /// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连 /// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者 /// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。 /// 自动删除 /// 参数 private static void QueueDeclare(IModel channel, string queue, bool durable = true, bool exclusive = false, bool autoDelete = false, IDictionary arguments = null) { try { queue = string.IsNullOrWhiteSpace(queue) ? "UndefinedQueueName" : queue.Trim(); channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments); } catch (Exception ex) { Console.WriteLine($"{DateTime.Now},队列声明错误:{ex.Message}"); } } #endregion #region 获取Model /// /// 获取Model /// /// 交换机名称 /// 队列名称 /// /// 是否持久化 /// private static IModel GetModel(string exchange, string queue, string routingKey, bool isProperties = false) { return ModelDic.GetOrAdd(queue, key => { var model = _conn.CreateModel(); ExchangeDeclare(model, exchange, ExchangeType.Fanout,isProperties); QueueDeclare(model, queue,isProperties); model.QueueBind(queue, exchange, routingKey); ModelDic[queue] = model; return model; }); } /// /// 获取Model /// /// 队列名称 /// /// private static IModel GetModel(string queue, bool isProperties = false) { return ModelDic.GetOrAdd(queue, value => { var model = _conn.CreateModel(); QueueDeclare(model, queue, isProperties); //每次消费的消息数 model.BasicQos(0, 1, false); ModelDic[queue] = model; return model; }); } #endregion #region 发布消息 /// /// 发布消息 /// /// 指令 /// public void Publish(T command) where T : class { var queueInfo = GetRabbitMqAttribute(); if (queueInfo == null) throw new ArgumentException(RabbitMqAttribute); var body = JsonConvert.SerializeObject(command); var exchange = queueInfo.ExchangeName; var queue = queueInfo.QueueName; var routingKey = queueInfo.ExchangeName; var isProperties = queueInfo.IsProperties; Publish(exchange, queue, routingKey, body, isProperties); } /// /// 发布消息 /// /// 路由键 /// 队列信息 /// 交换机名称 /// 队列名 /// 是否持久化 /// public void Publish(string exchange, string queue, string routingKey, string body, bool isProperties = true) { var channel = GetModel(exchange, queue, routingKey, isProperties); try { var bytes = Encoding.UTF8.GetBytes(body); channel.BasicPublish(exchange, routingKey, null, bytes); } catch (Exception ex) { Console.WriteLine($"消息发送异常:{ex.Message}"); } } /// /// 发布消息到死信队列 /// /// 死信信息 /// 异常 /// 死信队列名称 /// private void PublishToDead(string queue, string body, Exception ex) where T : class { var queueInfo = typeof(T).GetAttribute(); if (queueInfo == null) throw new ArgumentException(RabbitMqAttribute); var deadLetterExchange = queueInfo.ExchangeName; string deadLetterQueue = queueInfo.QueueName; var deadLetterRoutingKey = deadLetterExchange; var deadLetterBody = new DeadLetterQueue { Body = body, CreateDateTime = DateTime.Now, ExceptionMsg = ex.Message, Queue = queue, RoutingKey = deadLetterExchange, Exchange = deadLetterRoutingKey }; var data= JsonConvert.SerializeObject(deadLetterBody); Publish(deadLetterExchange, deadLetterQueue, deadLetterRoutingKey, data); } #endregion #region 订阅消息 /// /// 接收消息 /// /// /// 消费处理 public void Subscribe(Action handler) where T : class { var queueInfo = GetRabbitMqAttribute(); if (queueInfo == null) throw new ArgumentException(RabbitMqAttribute); var isDeadLetter = typeof(T) == typeof(DeadLetterQueue); Subscribe(queueInfo.QueueName, queueInfo.IsProperties, handler, isDeadLetter); } /// /// 接收消息 /// /// /// 队列名称 /// /// 消费处理 /// public void Subscribe(string queue, bool isProperties, Action handler, bool isDeadLetter) where T : class { //队列声明 var channel = GetModel(queue, isProperties); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var msgStr = Encoding.UTF8.GetString(body.ToArray()); var msg = JsonConvert.DeserializeObject(msgStr); try { handler(msg); } catch (Exception ex) { Console.WriteLine($"队列接收消息异常:{ex.Message}"); if (!isDeadLetter) PublishToDead(queue, msgStr, ex); } finally { channel.BasicAck(ea.DeliveryTag, false); } }; channel.BasicConsume(queue, false, consumer); } #endregion #region 获取消息 /// /// 获取消息 /// /// /// 消费处理 public void Pull(Action handler) where T : class { var queueInfo = GetRabbitMqAttribute(); if (queueInfo == null) throw new ArgumentException("RabbitMqAttribute"); Pull(queueInfo.ExchangeName, queueInfo.QueueName, queueInfo.ExchangeName, handler); } /// /// 获取消息 /// /// /// /// /// /// 消费处理 private void Pull(string exchange, string queue, string routingKey, Action handler) where T : class { var channel = GetModel(exchange, queue, routingKey); var result = channel.BasicGet(queue, false); if (result == null) return; var msgStr = Encoding.UTF8.GetString(result.Body.ToArray()); var msg = JsonConvert.DeserializeObject(msgStr); try { handler(msg); } catch (Exception ex) { Console.WriteLine($"队列接收消息异常:{ex.Message}"); } finally { channel.BasicAck(result.DeliveryTag, false); } } #endregion #region 释放资源 /// /// 执行与释放或重置非托管资源关联的应用程序定义的任务。 /// public void Dispose() { foreach (var item in ModelDic) { item.Value.Dispose(); } _conn.Dispose(); } #endregion } #endregion /// /// 自定义的RabbitMq队列信息实体特性 /// public class RabbitMqAttribute : Attribute { public RabbitMqAttribute(string queueName) { QueueName = queueName ?? string.Empty; } /// /// 交换机名称 /// public string ExchangeName { get; set; } /// /// 队列名称 /// public string QueueName { get; private set; } /// /// 是否持久化 /// public bool IsProperties { get; set; } } /// /// /// public class MqConfig { public string Host { get; set; } public TimeSpan HeartBeat { get; set; } public bool AutomaticRecoveryEnabled { get; set; } public TimeSpan NetworkRecoveryInterval { get; set; } public string UserName { get; set; } public string Password { get; set; } } /// /// 死信队列实体 /// [RabbitMq("dead-letter-{Queue}", ExchangeName = "dead-letter-{exchange}")] public class DeadLetterQueue { public string Body { get; set; } public string Exchange { get; set; } public string Queue { get; set; } public string RoutingKey { get; set; } public int RetryCount { get; set; } public string ExceptionMsg { get; set; } public DateTime CreateDateTime { get; set; } } /// /// 交换器类型 /// public static class ExchangeType { public static string Direct = "direct"; public static string Fanout = "fanout"; public static string Topic = "topic"; } }