using EasyNetQ.Internals;
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Waste.MessageHandler
{
#region RabbitMQ.Client原生封装类
///
/// RabbitMQ.Client原生封装类
///
public class RabbitMqService : IDisposable
{
#region 初始化
//RabbitMQ建议客户端线程之间不要共用Model,至少要保证共用Model的线程发送消息必须是串行的,但是建议尽量共用Connection。
private static readonly ConcurrentDictionary ModelDic =
new ConcurrentDictionary();
private static RabbitMqAttribute _rabbitMqAttribute;
private const string RabbitMqAttribute = "RabbitMqAttribute";
private static IConnection _conn;
private static readonly object LockObj = new object();
private static void Open(MqConfig config)
{
if (_conn != null) return;
lock (LockObj)
{
var factory = new ConnectionFactory
{
//设置主机名
HostName = config.Host,
//设置心跳时间
RequestedHeartbeat = config.HeartBeat,
//设置自动重连
AutomaticRecoveryEnabled = config.AutomaticRecoveryEnabled,
//重连时间
NetworkRecoveryInterval = config.NetworkRecoveryInterval,
//用户名
UserName = config.UserName,
//密码
Password = config.Password
};
factory.AutomaticRecoveryEnabled = true;
factory.NetworkRecoveryInterval = new TimeSpan(1000);
_conn = _conn ?? factory.CreateConnection();
}
}
private static RabbitMqAttribute GetRabbitMqAttribute()
{
if (_rabbitMqAttribute == null)
{
var typeOfT = typeof(T);
_rabbitMqAttribute = typeOfT.GetAttribute();
}
return _rabbitMqAttribute;
}
public RabbitMqService(MqConfig config)
{
Open(config);
}
#endregion
#region 交换器声明
///
/// 交换器声明
///
///
/// 交换器
/// 交换器类型:
/// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全
/// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的
/// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog
/// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都
/// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout
/// 交换机转发消息是最快的。
/// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多
/// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”
/// 只会匹配到“audit.irs”。
/// 持久化
/// 自动删除
/// 参数
private static void ExchangeDeclare(IModel iModel, string exchange, string type = "fanout",
bool durable = true,
bool autoDelete = false, IDictionary arguments = null)
{
exchange = string.IsNullOrWhiteSpace(exchange) ? "" : exchange.Trim();
iModel.ExchangeDeclare(exchange, type, durable, autoDelete, arguments);
}
#endregion
#region 队列声明
///
/// 队列声明
///
///
/// 队列
/// 持久化
/// 排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,
/// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可
/// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连
/// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者
/// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
/// 自动删除
/// 参数
private static void QueueDeclare(IModel channel, string queue, bool durable = true, bool exclusive = false,
bool autoDelete = false, IDictionary arguments = null)
{
queue = string.IsNullOrWhiteSpace(queue) ? "UndefinedQueueName" : queue.Trim();
channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);
}
#endregion
#region 获取Model
///
/// 获取Model
///
/// 交换机名称
/// 队列名称
///
/// 是否持久化
///
private static IModel GetModel(string exchange, string queue, string routingKey, bool isProperties = false)
{
return ModelDic.GetOrAdd(queue, key =>
{
var model = _conn.CreateModel();
ExchangeDeclare(model, exchange, ExchangeType.Fanout, isProperties);
QueueDeclare(model, queue, isProperties);
model.QueueBind(queue, exchange, routingKey);
ModelDic[queue] = model;
return model;
});
}
///
/// 获取Model
///
/// 队列名称
///
///
private static IModel GetModel(string queue, bool isProperties = false)
{
return ModelDic.GetOrAdd(queue, value =>
{
var model = _conn.CreateModel();
QueueDeclare(model, queue);
//每次消费的消息数
model.BasicQos(0, 1, false);
ModelDic[queue] = model;
return model;
});
}
#endregion
#region 发布消息
///
/// 发布消息
///
/// 指令
///
public void Publish(T command) where T : class
{
var queueInfo = GetRabbitMqAttribute();
if (queueInfo == null)
throw new ArgumentException(RabbitMqAttribute);
var body = JsonConvert.SerializeObject(command);
var exchange = queueInfo.ExchangeName;
var queue = queueInfo.QueueName;
var routingKey = queueInfo.ExchangeName;
var isProperties = queueInfo.IsProperties;
Publish(exchange, queue, routingKey, body, isProperties);
}
///
/// 发布消息
///
/// 路由键
/// 队列信息
/// 交换机名称
/// 队列名
/// 是否持久化
///
public void Publish(string exchange, string queue, string routingKey, string body, bool isProperties = false)
{
var channel = GetModel(exchange, queue, routingKey, isProperties);
try
{
var bytes = Encoding.UTF8.GetBytes(body);
channel.BasicPublish(exchange, routingKey, null, bytes);
}
catch (Exception ex)
{
Console.WriteLine($"消息发送异常:{ex.Message}");
}
}
///
/// 发布消息到死信队列
///
/// 死信信息
/// 异常
/// 死信队列名称
///
private void PublishToDead(string queue, string body, Exception ex) where T : class
{
var queueInfo = typeof(T).GetAttribute();
if (queueInfo == null)
throw new ArgumentException(RabbitMqAttribute);
var deadLetterExchange = queueInfo.ExchangeName;
string deadLetterQueue = queueInfo.QueueName;
var deadLetterRoutingKey = deadLetterExchange;
var deadLetterBody = new DeadLetterQueue
{
Body = body,
CreateDateTime = DateTime.Now,
ExceptionMsg = ex.Message,
Queue = queue,
RoutingKey = deadLetterExchange,
Exchange = deadLetterRoutingKey
};
var data= JsonConvert.SerializeObject(deadLetterBody);
Publish(deadLetterExchange, deadLetterQueue, deadLetterRoutingKey, data);
}
#endregion
#region 订阅消息
///
/// 接收消息
///
///
/// 消费处理
public void Subscribe(Action handler) where T : class
{
var queueInfo = GetRabbitMqAttribute();
if (queueInfo == null)
throw new ArgumentException(RabbitMqAttribute);
var isDeadLetter = typeof(T) == typeof(DeadLetterQueue);
Subscribe(queueInfo.QueueName, queueInfo.IsProperties, handler, isDeadLetter);
}
///
/// 接收消息
///
///
/// 队列名称
///
/// 消费处理
///
public void Subscribe(string queue, bool isProperties, Action handler, bool isDeadLetter) where T : class
{
//队列声明
var channel = GetModel(queue, isProperties);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var msgStr = Encoding.UTF8.GetString(body.ToArray());
var msg = JsonConvert.DeserializeObject(msgStr);
try
{
handler(msg);
}
catch (Exception ex)
{
Console.WriteLine($"队列接收消息异常:{ex.Message}");
if (!isDeadLetter)
PublishToDead(queue, msgStr, ex);
}
finally
{
channel.BasicAck(ea.DeliveryTag, false);
}
};
channel.BasicConsume(queue, false, consumer);
}
#endregion
#region 获取消息
///
/// 获取消息
///
///
/// 消费处理
public void Pull(Action handler) where T : class
{
var queueInfo = GetRabbitMqAttribute();
if (queueInfo == null)
throw new ArgumentException("RabbitMqAttribute");
Pull(queueInfo.ExchangeName, queueInfo.QueueName, queueInfo.ExchangeName, handler);
}
///
/// 获取消息
///
///
///
///
///
/// 消费处理
private void Pull(string exchange, string queue, string routingKey, Action handler) where T : class
{
var channel = GetModel(exchange, queue, routingKey);
var result = channel.BasicGet(queue, false);
if (result == null)
return;
var msgStr = Encoding.UTF8.GetString(result.Body.ToArray());
var msg = JsonConvert.DeserializeObject(msgStr);
try
{
handler(msg);
}
catch (Exception ex)
{
Console.WriteLine($"队列接收消息异常:{ex.Message}");
}
finally
{
channel.BasicAck(result.DeliveryTag, false);
}
}
#endregion
#region 释放资源
///
/// 执行与释放或重置非托管资源关联的应用程序定义的任务。
///
public void Dispose()
{
foreach (var item in ModelDic)
{
item.Value.Dispose();
}
_conn.Dispose();
}
#endregion
}
#endregion
///
/// 自定义的RabbitMq队列信息实体特性
///
public class RabbitMqAttribute : Attribute
{
public RabbitMqAttribute(string queueName)
{
QueueName = queueName ?? string.Empty;
}
///
/// 交换机名称
///
public string ExchangeName { get; set; }
///
/// 队列名称
///
public string QueueName { get; private set; }
///
/// 是否持久化
///
public bool IsProperties { get; set; }
}
///
///
///
public class MqConfig
{
public string Host { get; set; }
public TimeSpan HeartBeat { get; set; }
public bool AutomaticRecoveryEnabled { get; set; }
public TimeSpan NetworkRecoveryInterval { get; set; }
public string UserName { get; set; }
public string Password { get; set; }
}
///
/// 死信队列实体
///
[RabbitMq("dead-letter-{Queue}", ExchangeName = "dead-letter-{exchange}")]
public class DeadLetterQueue
{
public string Body { get; set; }
public string Exchange { get; set; }
public string Queue { get; set; }
public string RoutingKey { get; set; }
public int RetryCount { get; set; }
public string ExceptionMsg { get; set; }
public DateTime CreateDateTime { get; set; }
}
///
/// 交换器类型
///
public static class ExchangeType
{
public static string Direct = "direct";
public static string Fanout = "fanout";
public static string Topic = "topic";
}
}