# wosperry-rabbit-mqtest **Repository Path**: wosperry/wosperry-rabbit-mqtest ## Basic Information - **Project Name**: wosperry-rabbit-mqtest - **Description**: No description available - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 5 - **Forks**: 2 - **Created**: 2021-12-02 - **Last Updated**: 2023-12-13 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # RabbitMQ 封装 ## 参考Abp事件总线的用法,对拷贝的Demo进行简单封装 ### 定义 `RabbitMQOptions` 用于配置 ``` json { "MyRabbitMQOptions": { "UserName": "admin", "Password": "admin", "Host": "192.168.124.220", "Port": 5672, "ExchangeName": "PerryExchange" } } ``` ``` csharp public class MyRabbitMQOptions { public string UserName { get; set; } public string Password { get; set; } public string Host { get; set; } public int Port { get; set; } public string ExchangeName { get; set; } = ""; } ``` ### 定义 `QueueNameAttribute` 控制队列名字 ``` csharp /// /// 定义队列名字,优先级高于类完整名 /// [AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)] public class QueueNameAttribute : Attribute { public string QueueName { get; } public QueueNameAttribute(string queueName) { QueueName = queueName; } } ``` ### 定义 `IMyPublisher`,通过注入某个类型的 `IMyPublisher`,自动序列化对象并发布到配置好的MQ里 ``` csharp /// /// 用于注入使用 /// public interface IMyPublisher where T : class { Task PublishAsync(T data, Encoding encoding = null); } ``` ``` csharp public class MyPublisher : IMyPublisher, IDisposable where T : class { private readonly MyRabbitMQOptions _myOptions; private readonly IConnection _connection; private readonly IModel _channel; private readonly string _queueName; /// /// 非注入时使用此构造方法 /// public MyPublisher(IConnection connection) { _connection = connection; } /// /// 依赖注入自动走这个构造方法 /// /// /// public MyPublisher(IOptionsMonitor optionsMonitor, ConnectionFactory factory) { _myOptions = optionsMonitor.CurrentValue; _connection = factory.CreateConnection(); // 创建通道 _channel = _connection.CreateModel(); // 声明一个Exchange _channel.ExchangeDeclare(_myOptions.ExchangeName, ExchangeType.Direct, false, false, null); var type = typeof(T); // 获取类上的QueueNameAttribute特性,如果不存在则使用类的完整名 var attr = type.GetCustomAttribute(); _queueName = string.IsNullOrWhiteSpace(attr?.QueueName) ? type.FullName : attr.QueueName; // 声明一个队列 _channel.QueueDeclare(_queueName, false, false, false, null); //将队列绑定到交换机 _channel.QueueBind(_queueName, _myOptions.ExchangeName, _queueName, null); } /// /// 发布消息 /// public Task PublishAsync(T data, Encoding encoding = null) { // 对象转 object[] 发送 var msg = JsonConvert.SerializeObject(data); byte[] bytes = (encoding ?? Encoding.UTF8).GetBytes(msg); _channel.BasicPublish(_myOptions.ExchangeName, _queueName, null, bytes); return Task.CompletedTask; } public void Dispose() { // 结束 _channel.Close(); _connection.Close(); } } ``` ### 定义 `IMyEventHandler` ,供 NetCore 项目注入使用,配置后,可以在程序启动的时候,找到该接口所有的实现类,并开启消费者 ``` csharp /// /// Handler的配置 /// public class MyEventHandlerOptions { /// /// 禁用 byte[] 解析 /// public bool DisableDeserializeObject { get; set; } = false; /// /// 配置Encoding /// public Encoding Encoding { get; set; } = Encoding.UTF8; } ``` ``` csharp public abstract class MyEventHandler : IMyEventHandler where T : class { private IModel _channel; private string _queueName; private EventingBasicConsumer _consumer; public MyEventHandlerOptions Options = new() { DisableDeserializeObject = false }; public void Begin(IConnection connection) { var type = typeof(T); // 获取类上的QueueNameAttribute特性,如果不存在则使用类的完整名 var attr = type.GetCustomAttribute(); _queueName = string.IsNullOrWhiteSpace(attr?.QueueName) ? type.FullName : attr.QueueName; //创建通道 _channel = connection.CreateModel(); _consumer = new EventingBasicConsumer(_channel); _consumer.Received += MyReceivedHandler; //消费者 _channel.BasicConsume(_queueName, false, _consumer); } // 收到消息后 private void MyReceivedHandler(object sender, BasicDeliverEventArgs e) { try { // 如果未配置禁用则不解析,后面抽象方法的data参数会始终为空 if (!Options.DisableDeserializeObject) { T data = null; // 反序列化为对象 var message = Options.Encoding.GetString(e.Body); data = JsonConvert.DeserializeObject(message); OnReceivedAsync(data, message).Wait(); // 确认该消息已被消费 _channel?.BasicAck(e.DeliveryTag, false); } } catch (Exception ex) { OnConsumerException(ex); } } /// /// 收到消息 /// /// 解析后的对象 /// 消息原文 /// Options.DisableDeserializeObject为true时,data始终为null public abstract Task OnReceivedAsync(T data, string message); /// /// 异常 /// /// 派生类不重写的话,异常被隐藏 public virtual void OnConsumerException(Exception ex) { } } ``` ### 给依赖注入写一些拓展方法 ``` csharp public static class MyRabbiteMQExtensions { /// /// 初始化消息队列,并添加Publisher到IoC容器 /// /// 从Configuration读取"MyRabbbitMQOptions配置项" public static IServiceCollection AddMyRabbitMQ(this IServiceCollection services, IConfiguration configuration) { #region 配置项 // 从Configuration读取"MyRabbbitMQOptions配置项 var optionSection = configuration.GetSection("MyRabbitMQOptions"); // 这个myOptions是当前方法使用 MyRabbitMQOptions myOptions = new(); optionSection.Bind(myOptions); // 加了这行,才可以注入IOptions或者IOptionsMonitor services.Configure(optionSection); #endregion // 加了这行,才可以注入任意类型参数的 IMyPublisher<> 使用 services.AddTransient(typeof(IMyPublisher<>), typeof(MyPublisher<>)); // 创建一个工厂对象,并配置单例注入 services.AddSingleton(new ConnectionFactory { UserName = myOptions.UserName, Password = myOptions.Password, HostName = myOptions.Host, Port = myOptions.Port }); return services; } /// /// IServiceCollection的拓展方法,用于发现自定义的EventHandler并添加到服务容器 /// /// 包含了自定义Handler的类集合,可以使用assembly.GetTypes() /// 遍历所有types,将继承自IMyEventHandler的类注册到容器 public static IServiceCollection AddMyRabbitMQEventHandlers(this IServiceCollection services, Type[] types) { var baseType = typeof(IMyEventHandler); foreach (var type in types) { // baseType可以放type,并且type不是baseType if (baseType.IsAssignableFrom(type) && baseType != type) { // 瞬态注入配置 services.AddTransient(typeof(IMyEventHandler), type); } } return services; } /// /// 给app拓展方法 /// /// /// 在IoC容器里获取到所有继承自IMyEvetnHandler的实现类,并开启消费者 /// public static IApplicationBuilder UseMyEventHandler(this IApplicationBuilder app) { var handlers = app.ApplicationServices.GetServices(typeof(IMyEventHandler)); var factory = app.ApplicationServices.GetService(); // 遍历调用自定义的Begin方法 foreach (var h in handlers) { var handler = h as IMyEventHandler; handler?.Begin(factory.CreateConnection()); } return app; } } ``` ## 在Net6 WebApi中使用 program.cs ``` csharp var builder = WebApplication.CreateBuilder(args); builder.Services.AddControllers(); // 添加MyRabbitMQ到services builder.Services.AddMyRabbitMQ(builder.Configuration); builder.Services.AddMyRabbitMQEventHandlers(typeof(PerryTest).Assembly.GetTypes()); var app = builder.Build(); // 使用 MyEventHandler app.UseMyEventHandler(); app.MapControllers(); app.Run(); ``` 定义ETO ``` csharp [QueueName("perry.test")] public class PerryTest { public Guid Id { get; set; } public string? Name { get; set; } public int Count { get; set; } public string? Remark { get; set; } } ``` 定义EventHandler ``` csharp public class PerryTestEventHandler : MyEventHandler { public override Task OnReceivedAsync(PerryTest data, string message) { Console.WriteLine(message); return Task.CompletedTask; } public override void OnConsumerException(Exception ex) { Console.WriteLine(ex.Message); } } ``` 控制器中检查是否可以正常使用 ``` csharp [Route("api")] [ApiController] public class TestController : ControllerBase { public IMyPublisher TestPublisher { get; } public TestController(IMyPublisher testPublisher) { TestPublisher = testPublisher; } [HttpGet("test")] public async Task TestAsync() { var data = new PerryTest() { Id = Guid.NewGuid(), Name = "AAA", Count = 123, Remark = "哈哈哈" }; await TestPublisher.PublishAsync(data); return "发送了一个消息"; } } ``` 运行截图 ![运行截图](MyRabbitMQLib/image.png) ## 参考 [.NET Core 使用RabbitMQ](https://www.cnblogs.com/stulzq/p/7551819.html),拷贝了一些Demo ### 文章里的生产者Demo ``` csharp //创建连接工厂 ConnectionFactory factory = new ConnectionFactory { UserName = "admin",//用户名 Password = "admin",//密码 HostName = "192.168.157.130"//rabbitmq ip }; //创建连接 var connection = factory.CreateConnection(); //创建通道 var channel = connection.CreateModel(); //声明一个队列 channel.QueueDeclare("hello", false, false, false, null); Console.WriteLine("\nRabbitMQ连接成功,请输入消息,输入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //发布消息 channel.BasicPublish("", "hello", null, sendBytes); } while (input.Trim().ToLower()!="exit"); channel.Close(); connection.Close(); ``` ### 文章里的消费者Demo ``` csharp //创建连接工厂 ConnectionFactory factory = new ConnectionFactory { UserName = "admin",//用户名 Password = "admin",//密码 HostName = "192.168.157.130"//rabbitmq ip }; //创建连接 var connection = factory.CreateConnection(); //创建通道 var channel = connection.CreateModel(); //事件基本消费者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //接收到消息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"收到消息: {message}"); //确认该消息已被消费 channel.BasicAck(ea.DeliveryTag, false); }; //启动消费者 设置为手动应答消息 channel.BasicConsume("hello", false, consumer); Console.WriteLine("消费者已启动"); Console.ReadKey(); channel.Dispose(); connection.Close(); ``` --- ## 这次封装的总结 1. 上网找个Demo 2. 先按最简单的写法,写完能正常使用的,功能独立的代码。其实是提醒自己,不要陷入到杂七杂八的各项优化中去,先写完实现再考虑怎么去改成更加好的。 3. 脑袋里想着,我要怎么样使用这个功能,怎么样才能让用的时候写的代码少一些,配置简单一些 5. 检查哪些内容应该分离到配置项,然后抽离出去 6. 考虑要支持哪些类型的项目,如果想要支持低版本,可能需要降级一些依赖包 7. 支持构造函数注入的话,要注意最多参数的构造函数给依赖注入使用,依赖注入用的构造函数不好被非注入时使用的话,考虑多提供一个给Framework用。