一、什么是RabbitMQ
RabbitMQ是什么? --"RabbitMQ是基于AMQP协议的队列服务"。
什么是AMQP?-- Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
可以理解RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(也可以叫面向消息的中间件)
二、RabbitMQ运用场景
消息通讯—因为其本身就是基于AMQP协议的队列服务,也就可以用于单纯的消息通讯,实现点对点的消息通讯或者聊天。
提速提性能—异步处理,不需要及时同步处理并且比较耗时,减少请求响应时间
流量削峰—流量过大,应用容易挂掉,可使用队列来处理。
三、RabbitMQ优势及特点
可靠—RabbitMQ具有持久化,传输确认,发布确认等机制。保证了消息的安全性,一旦发送了消息,就算接收者接收不到,它也会保存信息,一直到接收者接收消息为止
复用性—RabbitMQ可以发送多种类型消息
异步处理(提速)—把消息传给中间件,中间件后续慢慢处理,同时也可达到削峰的效果
解耦--防止引入过多的API给系统的稳定性带来风险;调用方使用不当会给被调用方系统造成压力,被调用方处理不当会降低调用方系统的响应能力。
四、CentOS 7中Docker安装RabbitMQ
如何在Linux中安装可以参考
https://www.linuxidc.com/Linux/2019-07/159426.htm
先拉取镜像(选择带有mangement的版本)
docker pull rabbitmq:management
然后查看镜像拉取情况
docker images
启动容器同时设置账号密码
docker run --name=rabbit -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -d rabbitmq:management
查看容器运行情况
docker ps
运行成功,然后我们就去浏览器看看能否访问ip:15672
然后输入刚刚输入的账号密码登入进去就可以查看消息队列的整体情况,到这里RabbitMQ已经安装好了,接下来我们看看如何在.Net Core中使用RabbitMQ.
--------------------------------------------------------------------------------
五、.Net Core 中使用RabbitMQ
RabbitMQ使用的话可以分为三个步骤
1、 创建RabbitMQ的连接
public class ConnectionMQ | |
{ | |
/// <summary> | |
/// 创建MQ连接 | |
/// </summary> | |
/// <returns></returns> | |
public static IConnection Connection() | |
{ | |
//创建连接工厂 | |
ConnectionFactory factory = new ConnectionFactory | |
{ | |
UserName = “admin”,//用户名 | |
Password =” admin”,//密码 | |
HostName = “127.0.0.1”//rabbitmq ip | |
}; | |
//创建连接 | |
var connection = factory.CreateConnection(); | |
return connection; | |
} | |
} |
2、 RabbitMQ发送消息
public class PushMQ | |
{ | |
/// <summary> | |
/// 发送MQ消息 | |
/// </summary> | |
/// <typeparam name="T"></typeparam> | |
/// <param name="item"></param> | |
/// <param name="queueName"></param> | |
public static void SendMQ<T>(T item,string queueName) | |
{ | |
string input = Newtonsoft.Json.JsonConvert.SerializeObject(item); | |
using (var channel = ConnectionMQ.Connection().CreateModel()) | |
{ | |
//声明一个队列 | |
channel.QueueDeclare( | |
queue: queueName,//队列名称 | |
durable: true,//队列是否持久化 | |
exclusive: false,//是否排外的 | |
autoDelete: false,//是否自动删除 | |
arguments: null//消息什么时候自动 | |
); | |
var sendBytes=Encoding.UTF8.GetBytes(input); | |
var properties = new BasicProperties(); | |
properties.DeliveryMode = 2;// 设置消息是否持久化,1: 非持久化 2:持久化 | |
//发布消息 | |
channel.BasicPublish( | |
exchange: "", | |
routingKey: queueName, | |
mandatory: true, | |
basicProperties: properties, | |
body: sendBytes | |
); | |
} | |
} | |
} | |
在这里连接的ConnectionMQ.Connection().CreateModel()是可以进行重写的,自己定义如何去连接 |
3、 RabbitMQ接收消息
public class ReceiveMQ | |
{ | |
/// <summary> | |
/// 接收MQ消息 | |
/// </summary> | |
/// <typeparam name="T"></typeparam> | |
/// <param name="func"></param> | |
/// <param name="queueName"></param> | |
public static void GetMQ<T>(Func<T,bool> func,string queueName) | |
{ | |
//创建连接 | |
var connection = ConnectionMQ.Connection(); | |
//创建通道 | |
var channel = connection.CreateModel(); | |
//事件基本消费者 | |
EventingBasicConsumer consumer = new EventingBasicConsumer(channel); | |
//接收到消息事件 | |
consumer.Received += (ch, ea) => | |
{ | |
var message = Encoding.UTF8.GetString(ea.Body); | |
try | |
{ | |
var item = JsonConvert.DeserializeObject<T>(message); | |
func(item); | |
} | |
catch (Exception ex) | |
{ | |
LogHelp.Error(ex); | |
} | |
//确认该消息已被消费 | |
channel.BasicAck(ea.DeliveryTag, false); | |
}; | |
//启动消费者 设置为手动应答消息 | |
channel.BasicConsum、e(queueName, false, consumer); | |
} | |
} |
4、查看消息
在这里我们发送消息,然后我们去ip+15672看看是否有未消费的消息。
发现有一条未消费的信息,队列名称是Test,正好是我们刚刚发送的消息。
然后我们去消费这一条信息,再次进入ip+15672看看
刚刚的一条消息的确被消费掉了。其中GetInfo方法可以穿插自己的很多的业务逻辑的处理。到这里也就简单的介绍了下如何在.Net Core中使用RabbitMQ,还有一些其他的属性必要之时都可以加入进来的。可以高度扩展的。