一文浅析node中如何使用消息队列
时间:2023-01-17 19:48
什么是消息队列?下面本篇文章带大家了解一下消息队列的基本概念,介绍一下node中如何使用消息队列,希望对大家有所帮助! 什么是消息队列 消息队列就是消息的传输过程中保存消息的容器,本质是一个队列(先进先出) 消息队列能做什么 解耦 上面介绍了,消息队列将消息的生产者和消息的接收者分开,彼此都不受影响。 异步 异步就是为了减少请求的响应时间,消息的生产者只需要处理简单的逻辑,并将数据放到消息队列中即可返回,复杂的逻辑,比如:数据库操作,IO操作由消息的接收者处理。 削峰 消息队列应用在服务时,能将瞬时大量涌入的请求信息保存到消息队列中,并立即返回。再由消息的接收者根据数据处理请求。 应用场景 游戏活动,秒杀活动,下单等会造成瞬时流量暴增的应用。 介绍完消息队列的基本信息,在开发消息队列之前先介绍一下消息队列的一些基本概念~ 消息的生产者(producer)与消费者(customer) 上文提到的 链接,通道与队列 链接(connection):表示服务程序与消息队列之间的一条链接。一个服务程序可以创建多条链接。 通道(channel):消息队列链接之间的一个通,一个链接可以有多个通道。 队列(queue):消息队列中存放数据的队列,一个消息队列服务可以有多个队列。 总结一下,链接,通道队列之间的关系是这样的 交换机(exchange) 消息队列发送消息时必须要有一个交换机,如果没有指定则用的是默认的交换机。交换机的作用就是将消息才推到对应的队列中。消息队列中一共有4种交换机 Direct: 指定队列模式,消息来了,只发给指定的Queue,其他Queue都收不到。 fanout: 广播模式,消息来了,就会发送给所有的队列。 topic: 模糊匹配模式,通过模糊匹配的方式进行相应转发。 header: 与Direct模式类似。 安装rabbitMQ 然后再本地中访问 http://localhost:15672/ 就可以看到rabbitmq服务的后台。初始的账号密码均为 node项目安装amqplib amqplib是node中使用消息队列的一套工具,可以让我们快速地使用消息队列 地址:https://www.npmjs.com/package/amqplib 创建生产者 运行后在后台可以看到新增了一个有100条消息的队列 创建消费者 执行后可以看到,两个通道可以同时工作接收消息 更多node相关知识,请访问:nodejs 教程! 以上就是一文浅析node中如何使用消息队列的详细内容,更多请关注gxlsystem.com其它相关文章!1.消息队列
消息
指的是需要传输的数据,可以是一些文本,字符串,或者是对象等信息。消息队列
则是两个应用间的通信服务,消息的产生者
将数据存放到消息队列中就可以立即返回,不需要等待消息的接收者
应答。即:生产者
保证数据插入队列,谁来取这条消息不需要管。消息的接收者
则只专注于接受消息并处理。【相关教程推荐:nodejs视频教程、编程教学】2.消息队列的概念
生产者
与消费者
,提供的是3.node使用rabbitMQ
brew install rabbitmq
guest
/** product.js 消费者 */
const amqplib = require('amqplib');
const config = require('./config');
const { connectUrl } = config;
(async () => {
const connection = await amqplib.connect(connectUrl);
const channel = await connection.createChannel();
const exchangeName = 'testExchange';
const key = 'testQueue';
const sendMsg = 'hello rabbitmq';
// 知道交换机类型
await channel.assertExchange(exchangeName, 'fanout', {
durable: true,
});
// 指定一个队列
await channel.assertQueue(key);
for (let i = 0; i < 100; i++) {
channel.publish(exchangeName, key, Buffer.from(`${sendMsg} ${i}`));
}
await channel.close();
await connection.close();
})();
/** customer.js 消费者 */
const amqplib = require('amqplib');
const config = require('./config');
const { connectUrl } = config;
(async () => {
let connection = await amqplib.connect(connectUrl);
const exchangeName = 'testExchange';
const key = 'testQueue';
// 创建两个通道
const channel1 = await connection.createChannel();
const channel2 = await connection.createChannel();
// 指定一个交换机
await channel1.assertExchange(exchangeName, 'fanout', {
durable: true,
});
// 指定一个队列
await channel1.assertQueue(key);
await channel1.bindQueue(key, exchangeName, key);
channel1.consume(key, (msg) => {
console.log('channel 1', msg.content.toString());
});
await channel2.assertExchange(exchangeName, 'fanout', {
durable: true,
});
await channel2.assertQueue(key);
await channel2.bindQueue(key, exchangeName, key);
channel2.consume(key, (msg) => {
console.log('channel 2', msg.content.toString());
});
})();