LaJiFenLei/Waste.MessageHandler/RabbitMqService.cs

446 lines
16 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using EasyNetQ.Internals;
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Waste.MessageHandler
{
#region RabbitMQ.Client原生封装类
/// <summary>
/// RabbitMQ.Client原生封装类
/// </summary>
public class RabbitMqService : IDisposable
{
#region
//RabbitMQ建议客户端线程之间不要共用Model至少要保证共用Model的线程发送消息必须是串行的但是建议尽量共用Connection。
private static readonly ConcurrentDictionary<string, IModel> ModelDic =
new ConcurrentDictionary<string, IModel>();
private static RabbitMqAttribute _rabbitMqAttribute;
private const string RabbitMqAttribute = "RabbitMqAttribute";
private static IConnection _conn;
private static readonly object LockObj = new object();
private static void Open(MqConfig config)
{
if (_conn != null) return;
lock (LockObj)
{
var factory = new ConnectionFactory
{
//设置主机名
HostName = config.Host,
//设置心跳时间
RequestedHeartbeat = config.HeartBeat,
//设置自动重连
AutomaticRecoveryEnabled = config.AutomaticRecoveryEnabled,
//重连时间
NetworkRecoveryInterval = config.NetworkRecoveryInterval,
//用户名
UserName = config.UserName,
//密码
Password = config.Password
};
factory.AutomaticRecoveryEnabled = true;
factory.NetworkRecoveryInterval = new TimeSpan(1000);
_conn = _conn ?? factory.CreateConnection();
}
}
private static RabbitMqAttribute GetRabbitMqAttribute<T>()
{
if (_rabbitMqAttribute == null)
{
var typeOfT = typeof(T);
_rabbitMqAttribute = typeOfT.GetAttribute<RabbitMqAttribute>();
}
return _rabbitMqAttribute;
}
public RabbitMqService(MqConfig config)
{
Open(config);
}
#endregion
#region
/// <summary>
/// 交换器声明
/// </summary>
/// <param name="iModel"></param>
/// <param name="exchange">交换器</param>
/// <param name="type">交换器类型:
/// 1、Direct Exchange 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全
/// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”则只有被标记为“dog”的
/// 消息才被转发不会转发dog.puppy也不会转发dog.guard只会转发dog
/// 2、Fanout Exchange 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都
/// 会被转发到与该交换机绑定的所有队列上。很像子网广播每台子网内的主机都获得了一份复制的消息。Fanout
/// 交换机转发消息是最快的。
/// 3、Topic Exchange 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多
/// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”但是“audit.*”
/// 只会匹配到“audit.irs”。</param>
/// <param name="durable">持久化</param>
/// <param name="autoDelete">自动删除</param>
/// <param name="arguments">参数</param>
private static void ExchangeDeclare(IModel iModel, string exchange, string type = "fanout",
bool durable = true,
bool autoDelete = false, IDictionary<string, object> arguments = null)
{
exchange = string.IsNullOrWhiteSpace(exchange) ? "" : exchange.Trim();
iModel.ExchangeDeclare(exchange, type, durable, autoDelete, arguments);
}
#endregion
#region
/// <summary>
/// 队列声明
/// </summary>
/// <param name="channel"></param>
/// <param name="queue">队列</param>
/// <param name="durable">持久化</param>
/// <param name="exclusive">排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,
/// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可
/// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连
/// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者
/// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。</param>
/// <param name="autoDelete">自动删除</param>
/// <param name="arguments">参数</param>
private static void QueueDeclare(IModel channel, string queue, bool durable = true, bool exclusive = false,
bool autoDelete = false, IDictionary<string, object> arguments = null)
{
queue = string.IsNullOrWhiteSpace(queue) ? "UndefinedQueueName" : queue.Trim();
channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);
}
#endregion
#region Model
/// <summary>
/// 获取Model
/// </summary>
/// <param name="exchange">交换机名称</param>
/// <param name="queue">队列名称</param>
/// <param name="routingKey"></param>
/// <param name="isProperties">是否持久化</param>
/// <returns></returns>
private static IModel GetModel(string exchange, string queue, string routingKey, bool isProperties = false)
{
return ModelDic.GetOrAdd(queue, key =>
{
var model = _conn.CreateModel();
ExchangeDeclare(model, exchange, ExchangeType.Fanout, isProperties);
QueueDeclare(model, queue, isProperties);
model.QueueBind(queue, exchange, routingKey);
ModelDic[queue] = model;
return model;
});
}
/// <summary>
/// 获取Model
/// </summary>
/// <param name="queue">队列名称</param>
/// <param name="isProperties"></param>
/// <returns></returns>
private static IModel GetModel(string queue, bool isProperties = false)
{
return ModelDic.GetOrAdd(queue, value =>
{
var model = _conn.CreateModel();
QueueDeclare(model, queue);
//每次消费的消息数
model.BasicQos(0, 1, false);
ModelDic[queue] = model;
return model;
});
}
#endregion
#region
/// <summary>
/// 发布消息
/// </summary>
/// <param name="command">指令</param>
/// <returns></returns>
public void Publish<T>(T command) where T : class
{
var queueInfo = GetRabbitMqAttribute<T>();
if (queueInfo == null)
throw new ArgumentException(RabbitMqAttribute);
var body = JsonConvert.SerializeObject(command);
var exchange = queueInfo.ExchangeName;
var queue = queueInfo.QueueName;
var routingKey = queueInfo.ExchangeName;
var isProperties = queueInfo.IsProperties;
Publish(exchange, queue, routingKey, body, isProperties);
}
/// <summary>
/// 发布消息
/// </summary>
/// <param name="routingKey">路由键</param>
/// <param name="body">队列信息</param>
/// <param name="exchange">交换机名称</param>
/// <param name="queue">队列名</param>
/// <param name="isProperties">是否持久化</param>
/// <returns></returns>
public void Publish(string exchange, string queue, string routingKey, string body, bool isProperties = false)
{
var channel = GetModel(exchange, queue, routingKey, isProperties);
try
{
var bytes = Encoding.UTF8.GetBytes(body);
channel.BasicPublish(exchange, routingKey, null, bytes);
}
catch (Exception ex)
{
Console.WriteLine($"消息发送异常:{ex.Message}");
}
}
/// <summary>
/// 发布消息到死信队列
/// </summary>
/// <param name="body">死信信息</param>
/// <param name="ex">异常</param>
/// <param name="queue">死信队列名称</param>
/// <returns></returns>
private void PublishToDead<T>(string queue, string body, Exception ex) where T : class
{
var queueInfo = typeof(T).GetAttribute<RabbitMqAttribute>();
if (queueInfo == null)
throw new ArgumentException(RabbitMqAttribute);
var deadLetterExchange = queueInfo.ExchangeName;
string deadLetterQueue = queueInfo.QueueName;
var deadLetterRoutingKey = deadLetterExchange;
var deadLetterBody = new DeadLetterQueue
{
Body = body,
CreateDateTime = DateTime.Now,
ExceptionMsg = ex.Message,
Queue = queue,
RoutingKey = deadLetterExchange,
Exchange = deadLetterRoutingKey
};
var data= JsonConvert.SerializeObject(deadLetterBody);
Publish(deadLetterExchange, deadLetterQueue, deadLetterRoutingKey, data);
}
#endregion
#region
/// <summary>
/// 接收消息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="handler">消费处理</param>
public void Subscribe<T>(Action<T> handler) where T : class
{
var queueInfo = GetRabbitMqAttribute<T>();
if (queueInfo == null)
throw new ArgumentException(RabbitMqAttribute);
var isDeadLetter = typeof(T) == typeof(DeadLetterQueue);
Subscribe(queueInfo.QueueName, queueInfo.IsProperties, handler, isDeadLetter);
}
/// <summary>
/// 接收消息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="queue">队列名称</param>
/// <param name="isProperties"></param>
/// <param name="handler">消费处理</param>
/// <param name="isDeadLetter"></param>
public void Subscribe<T>(string queue, bool isProperties, Action<T> handler, bool isDeadLetter) where T : class
{
//队列声明
var channel = GetModel(queue, isProperties);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var msgStr = Encoding.UTF8.GetString(body.ToArray());
var msg = JsonConvert.DeserializeObject<T>(msgStr);
try
{
handler(msg);
}
catch (Exception ex)
{
Console.WriteLine($"队列接收消息异常:{ex.Message}");
if (!isDeadLetter)
PublishToDead<DeadLetterQueue>(queue, msgStr, ex);
}
finally
{
channel.BasicAck(ea.DeliveryTag, false);
}
};
channel.BasicConsume(queue, false, consumer);
}
#endregion
#region
/// <summary>
/// 获取消息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="handler">消费处理</param>
public void Pull<T>(Action<T> handler) where T : class
{
var queueInfo = GetRabbitMqAttribute<T>();
if (queueInfo == null)
throw new ArgumentException("RabbitMqAttribute");
Pull(queueInfo.ExchangeName, queueInfo.QueueName, queueInfo.ExchangeName, handler);
}
/// <summary>
/// 获取消息
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="exchange"></param>
/// <param name="queue"></param>
/// <param name="routingKey"></param>
/// <param name="handler">消费处理</param>
private void Pull<T>(string exchange, string queue, string routingKey, Action<T> handler) where T : class
{
var channel = GetModel(exchange, queue, routingKey);
var result = channel.BasicGet(queue, false);
if (result == null)
return;
var msgStr = Encoding.UTF8.GetString(result.Body.ToArray());
var msg = JsonConvert.DeserializeObject<T>(msgStr);
try
{
handler(msg);
}
catch (Exception ex)
{
Console.WriteLine($"队列接收消息异常:{ex.Message}");
}
finally
{
channel.BasicAck(result.DeliveryTag, false);
}
}
#endregion
#region
/// <summary>
/// 执行与释放或重置非托管资源关联的应用程序定义的任务。
/// </summary>
public void Dispose()
{
foreach (var item in ModelDic)
{
item.Value.Dispose();
}
_conn.Dispose();
}
#endregion
}
#endregion
/// <summary>
/// 自定义的RabbitMq队列信息实体特性
/// </summary>
public class RabbitMqAttribute : Attribute
{
public RabbitMqAttribute(string queueName)
{
QueueName = queueName ?? string.Empty;
}
/// <summary>
/// 交换机名称
/// </summary>
public string ExchangeName { get; set; }
/// <summary>
/// 队列名称
/// </summary>
public string QueueName { get; private set; }
/// <summary>
/// 是否持久化
/// </summary>
public bool IsProperties { get; set; }
}
/// <summary>
///
/// </summary>
public class MqConfig
{
public string Host { get; set; }
public TimeSpan HeartBeat { get; set; }
public bool AutomaticRecoveryEnabled { get; set; }
public TimeSpan NetworkRecoveryInterval { get; set; }
public string UserName { get; set; }
public string Password { get; set; }
}
/// <summary>
/// 死信队列实体
/// </summary>
[RabbitMq("dead-letter-{Queue}", ExchangeName = "dead-letter-{exchange}")]
public class DeadLetterQueue
{
public string Body { get; set; }
public string Exchange { get; set; }
public string Queue { get; set; }
public string RoutingKey { get; set; }
public int RetryCount { get; set; }
public string ExceptionMsg { get; set; }
public DateTime CreateDateTime { get; set; }
}
/// <summary>
/// 交换器类型
/// </summary>
public static class ExchangeType
{
public static string Direct = "direct";
public static string Fanout = "fanout";
public static string Topic = "topic";
}
}