< 返回新闻公共列表
云南大王-C#队列学习笔记:RabbitMQ优先级队列
发布时间:2020-04-13 00:00:00
一、引言
在具体业务中可能会遇到一些要提前处理的消息,比如普通客户的消息按先进先出的顺序处理,Vip客户的消息要提前处理。在RabbitMQ中,消息优先级的实现方式是:在声明queue时设置队列的x-max-priority属性,然后在publish消息时,设置消息的优先级即可。
RabbitMQ优先级队列注意事项:
1)RabbitMQ3.5以后才支持优先级队列。
2)只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效。
3)优先级取值范围在0~9之间,数值越大则优先级越高。
二、示例
2.1、发送端(生产端)
新建一个控制台项目Send,并添加一个类RabbitMQConfig。
class RabbitMQConfig
{
public static string Host { get; set; }
public static string VirtualHost { get; set; }
public static string UserName { get; set; }
public static string Password { get; set; }
public static int Port { get; set; }
static RabbitMQConfig()
{
Host = "192.168.2.242";
VirtualHost = "/";
UserName = "hello";
Password = "world";
Port = 5672;
}
}
RabbitMQConfig.cs
class Program
{
static void Main(string[] args)
{
Console.WriteLine("按任意键开始生产。");
Console.ReadLine();
PriorityMessagePublish();
Console.ReadLine();
}
private static void PriorityMessagePublish()
{
const string MessagePrefix = "message_";
const int PublishMessageCount = 6;
byte messagePriority = 0;
var factory = new ConnectionFactory()
{
HostName = RabbitMQConfig.Host,
Port = RabbitMQConfig.Port,
VirtualHost = RabbitMQConfig.VirtualHost,
UserName = RabbitMQConfig.UserName,
Password = RabbitMQConfig.Password,
Protocol = Protocols.DefaultProtocol
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
//设置队列优先级,取值范围在0~255之间。
Dictionary dict = new Dictionary
{
{ "x-max-priority", 255 }
};
//声明队列
channel.QueueDeclare(queue: "priority", durable: true, exclusive: false, autoDelete: false, arguments: dict);
//向该消息队列发送消息message
Random random = new Random();
for (int i = 0; i < PublishMessageCount; i++)
{
var properties = channel.CreateBasicProperties();
messagePriority = (byte)random.Next(0, 9);
properties.Priority = messagePriority;//设置消息优先级,取值范围在0~9之间。
var message = MessagePrefix + i.ToString();
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "priority", basicProperties: properties, body: body);
Console.WriteLine($"{DateTime.Now.ToString()} Send {message} , Priority {messagePriority}");
}
}
}
}
}
Program.cs
2.2、接收端(消费端)
新建一个控制台项目Receive,按住Alt键,将发送端RabbitMQConfig类拖一个快捷方式到Receive项目中。
class Program
{
static void Main(string[] args)
{
Console.WriteLine("按任意键开始消费。");
Console.ReadLine();
PriorityMessageSubscribe();
}
public static void PriorityMessageSubscribe()
{
var factory = new ConnectionFactory()
{
HostName = RabbitMQConfig.Host,
UserName = RabbitMQConfig.UserName,
Password = RabbitMQConfig.Password
};
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += async (model, ea) =>
{
await Task.Run(() =>
{
var message = Encoding.UTF8.GetString(ea.Body);
Thread.Sleep(1000 * 2);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//手动消息确认
Console.WriteLine($"{DateTime.Now.ToString()} Received {message}");
});
};
channel.BasicConsume(queue: "priority", noAck: false, consumer: consumer);//需要启用消息响应,否则priority无效。
Console.ReadKey();
}
}
}
}
Program.cs
2.3、运行结果
从消费情况可以看出,message_2及message_3由于priority优先级最高都是7,所以它们会被最早消费,而message_5的priority是0,所以最后才被消费。