关于我们

质量为本、客户为根、勇于拼搏、务实创新

< 返回新闻公共列表

云南大王-C#队列学习笔记:RabbitMQ延迟队列

发布时间:2020-04-13 00:00:00
    一、引言     日常生活中,很多的APP都有延迟队列的影子。比如在手机淘宝上,经常遇到APP派发的限时消费红包,一般有几个小时或24小时不等。假如在红包倒计时的过程中,没有消费掉红包的话,红包会自动失效。假如上述行为使用RabbitMQ延时队列来理解的话,就是在你收到限时消费红包的时候,手机淘宝会自动发一条延时消息到队列中以供消费。在规定时间内,则可正常消费,否则依TTL自动失效。     在RabbitMQ中,有两种方式来实现延时队列:一种是基于队列方式,另外一种是基于消息方式。     二、示例     2.1、发送端(生产端)     新建一个控制台项目Send,并添加一个类RabbitMQConfig。 class RabbitMQConfig { public static string Host { get; set; } public static string VirtualHost { get; set; } public static string UserName { get; set; } public static string Password { get; set; } public static int Port { get; set; } static RabbitMQConfig() { Host = "192.168.2.242"; VirtualHost = "/"; UserName = "hello"; Password = "world"; Port = 5672; } } RabbitMQConfig.cs class Program { static void Main(string[] args) { Console.WriteLine("C# RabbitMQ实现延迟队列有以下两种方式:"); Console.WriteLine("1、基于队列方式实现延迟队列,请按1开始生产。"); Console.WriteLine("2、基于消息方式实现延迟队列,请按2开始生产。"); string chooseChar = Console.ReadLine(); if (chooseChar == "1") { DelayMessagePublishByQueueExpires(); } else if (chooseChar == "2") { DelayMessagePublishByMessageTTL(); } Console.ReadLine(); } /// /// 基于队列方式实现延迟队列 /// 将队列中所有消息的TTL(Time To Live,即过期时间)设置为一样 /// private static void DelayMessagePublishByQueueExpires() { const string MessagePrefix = "message_"; const int PublishMessageCount = 6; const int QuequeExpirySeconds = 1000 * 30; const int MessageExpirySeconds = 1000 * 10; var factory = new ConnectionFactory() { HostName = RabbitMQConfig.Host, Port = RabbitMQConfig.Port, VirtualHost = RabbitMQConfig.VirtualHost, UserName = RabbitMQConfig.UserName, Password = RabbitMQConfig.Password, Protocol = Protocols.DefaultProtocol }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //当同时指定了queue和message的TTL值,则两者中较小的那个才会起作用。 Dictionary dict = new Dictionary { { "x-expires", QuequeExpirySeconds },//队列过期时间 { "x-message-ttl", MessageExpirySeconds },//消息过期时间 { "x-dead-letter-exchange", "dead exchange 1" },//过期消息转向路由 { "x-dead-letter-routing-key", "dead routing key 1" }//过期消息转向路由的routing key }; //声明队列 channel.QueueDeclare(queue: "delay1", durable: true, exclusive: false, autoDelete: false, arguments: dict); //向该消息队列发送消息message for (int i = 0; i < PublishMessageCount; i++) { var message = MessagePrefix + i.ToString(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "delay1", basicProperties: null, body: body); Thread.Sleep(1000 * 2); Console.WriteLine($"{DateTime.Now.ToString()} Send {message} MessageExpirySeconds {MessageExpirySeconds / 1000}"); } } } } /// /// 基于消息方式实现延迟队列 /// 对队列中消息进行单独设置,每条消息的TTL可以不同。 /// private static void DelayMessagePublishByMessageTTL() { const string MessagePrefix = "message_"; const int PublishMessageCount = 6; int MessageExpirySeconds = 0; var factory = new ConnectionFactory() { HostName = RabbitMQConfig.Host, Port = RabbitMQConfig.Port, VirtualHost = RabbitMQConfig.VirtualHost, UserName = RabbitMQConfig.UserName, Password = RabbitMQConfig.Password, Protocol = Protocols.DefaultProtocol }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { Dictionary dict = new Dictionary { { "x-dead-letter-exchange", "dead exchange 2" },//过期消息转向路由 { "x-dead-letter-routing-key", "dead routing key 2" }//过期消息转向路由的routing key }; //声明队列 channel.QueueDeclare(queue: "delay2", durable: true, exclusive: false, autoDelete: false, arguments: dict); //向该消息队列发送消息message Random random = new Random(); for (int i = 0; i < PublishMessageCount; i++) { MessageExpirySeconds = i * 1000; var properties = channel.CreateBasicProperties(); properties.Expiration = MessageExpirySeconds.ToString(); var message = MessagePrefix + i.ToString(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "delay2", basicProperties: properties, body: body); Console.WriteLine($"{DateTime.Now.ToString()} Send {message} MessageExpirySeconds {MessageExpirySeconds / 1000}"); } } } } } Program.cs     2.2、接收端(消费端)     新建一个控制台项目Receive,按住Alt键,将发送端RabbitMQConfig类拖一个快捷方式到Receive项目中。 class Program { static void Main(string[] args) { Console.WriteLine("C# RabbitMQ实现延迟队列有以下两种方式:"); Console.WriteLine("1、基于队列方式实现延迟队列,请按1开始消费。"); Console.WriteLine("2、基于消息方式实现延迟队列,请按2开始消费。"); string chooseChar = Console.ReadLine(); if (chooseChar == "1") { DelayMessageConsumeByQueueExpires(); } else if (chooseChar == "2") { DelayMessageConsumeByMessageTTL(); } Console.ReadLine(); } public static void DelayMessageConsumeByQueueExpires() { var factory = new ConnectionFactory() { HostName = RabbitMQConfig.Host, Port = RabbitMQConfig.Port, VirtualHost = RabbitMQConfig.VirtualHost, UserName = RabbitMQConfig.UserName, Password = RabbitMQConfig.Password, Protocol = Protocols.DefaultProtocol }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "dead exchange 1", type: "direct"); string name = channel.QueueDeclare().QueueName; channel.QueueBind(queue: name, exchange: "dead exchange 1", routingKey: "dead routing key 1"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"{DateTime.Now.ToString()} Received {message}"); }; channel.BasicConsume(queue: name, noAck: true, consumer: consumer); Console.ReadKey(); } } } public static void DelayMessageConsumeByMessageTTL() { var factory = new ConnectionFactory() { HostName = RabbitMQConfig.Host, Port = RabbitMQConfig.Port, VirtualHost = RabbitMQConfig.VirtualHost, UserName = RabbitMQConfig.UserName, Password = RabbitMQConfig.Password, Protocol = Protocols.DefaultProtocol }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "dead exchange 2", type: "direct"); string name = channel.QueueDeclare().QueueName; channel.QueueBind(queue: name, exchange: "dead exchange 2", routingKey: "dead routing key 2"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"{DateTime.Now.ToString()} Received {message}"); }; channel.BasicConsume(queue: name, noAck: true, consumer: consumer); Console.ReadKey(); } } } } Program.cs     2.3、运行结果 -----------------------------------------------------------------------------------------------------------

相关阅读

centos7系统中忘记了root管理员账号密码的解决方式公司管理必须的20条军规[参考]云南昆明天猫旗舰店如何开_怎么开_要什么条件云南大王-通俗理解spring源码(三)—— 获取xml的验证模式 云南大王-用户登录 云南大王-【Golang进阶】指针的详细讲解 云南大王-Java 单线程代码ThreadLocal串值问题 云南大王-Java 实例级别的锁和类级别的锁 云南大王-工作流引擎会签,加签,主持人,组长模式专题讲解 云南大王-Android连载5云南大王-NTP对时器(NTP对时服务器)重要性!京准电子科技 云南大王-关于redis单线程的分析 云南大王-CVE云南大王-PHP SESSION反序列化本地样例分析 云南大王-这不就是多线程ThreadPoolExecutor和阻塞队列吗 云南大王-Tomcat AJP 文件包含漏洞(CVE云南大王-讲真,这两款idea插件,能治愈你英语不好的病 云南大王-消息中间件ActiveMQ、RabbitMQ、RocketMQ、ZeroMQ、Kafka如何选型? 云南大王-JVM系列十三(类加载器). 云南大王-Win10安装MySQL8压缩包版 云南大王-初始WebApi(2) 云南大王-初识人工智能(二):机器学习(一):sklearn特征抽取 云南大王-Popup中ListBox的SelectChange事件关闭弹出窗体后主窗体点击无效BUG 云南大王-基础知识记录 云南大王-FastDFS搭建图片服务器 云南大王-git/sourcetree解决本地仓库历史合并到线上仓的历史数据合并问题_refusing to merge unrelated histories 云南大王-js判断字符是否在数组中【转】 云南大王-Python 云南大王-面向对象之多线程(可捎带电梯调度) 云南大王-Python练习题2.5求奇数分之一序列前N项和(存在问题) 云南大王-React 中的前端路由 react云南大王-VSCODE 远程开发树莓派 云南大王-React新闻网站云南大王-vs .net CS0006 C# 未能找到元数据文件 .dll 云南大王-Vue.js 技术揭秘 云南大王-流程控制语句云南大王-Python学习笔记:Python的时间操作(time,datetime,timedelta,calendar) 云南大王-流程控制语句云南大王-golang Gin framework with websocket 云南大王-多重判断if..else嵌套语句 云南大王-用户登录 云南大王-流程控制语句云南大王-密码类 云南大王-Unity2018发布WebGL注意事项总结 云南大王-web系统安全运营之基础云南大王- 流程控制语句云南大王-中型WPF客户端开发项目总结(3.3.4) 云南大王-流程控制语句云南大王-流程控制语句云南大王-流程控制语句云南大王-中型WPF客户端开发项目总结(4) 云南大王-流程控制语句云南大王-ASP.NET Core笔记(4) 云南大王-C# 基础知识系列云南大王-让 .NET 轻松构建中间件模式代码(二) 云南大王-基于 HTML5 WebGL 的 水泥工厂可视化系统 云南大王-.NET Core 3 WPF MVVM框架 Prism系列之导航系统 云南大王-《JavaScript异步编程》精读笔记 云南大王-合理使用CSS框架,加速UI设计进程 云南大王-CLSID 为 {000209FF云南大王-从零基础转行到前端大牛,需要经过哪几个阶段? 云南大王-一个简单的例子看明白 async await Task 云南大王-【目前】宇宙第一IDE Visual Studio 合并压缩css、js扩展组件云南大王-写一个通用的List集合导出excel的通用方法 云南大王-Bootstrap4 按钮组+徽章(Badges)+进度条+分页+列表组 云南大王-Web前端工程师需要学些什么? 云南大王-react嵌套路由 云南大王-【java框架】Struts2(2) 云南大王-javaSE笔记云南大王-.net core 集成 sentry 进行异常报警 云南大王-Java の 四种引用 云南大王-JVM 虚拟机&&类加载(一) 云南大王-使用Fastjson实现JSON与JavaBean之间互相转换 云南大王-Python操作Oracle数据库:cx_Oracle 云南大王-为什么要用内插字符串代替string.format 云南大王-作为字节跳动的面试官,有些话我不得不说! 云南大王-微信公众号自定义菜单与启用服务器配置冲突(图文消息、链接及文本回复) 云南大王-C#队列学习笔记:RabbitMQ延迟队列 云南大王-Disruptor 基础篇 云南大王-C#获取设备(Audio和Video)名称 简单整理 云南大王-基于注解的IOC配置 云南大王-C#调用EnumDevice获取设备信息 云南大王-Jenkins基础系统之更换镜像源 云南大王-Jenkins基础系统之完整的.net项目编译 云南大王-Scala学习系列(二)——环境安装配置 云南大王-WinForm中DataGridView复制选中单元格内容解决方案 云南大王-关键词匹配优化(第0篇)—— 问题和思路 云南大王-ASP.NET CORE WEBAPI文件下载 云南大王-GC垃圾回收器 云南大王-多线程之旅(Task 任务) 云南大王-当模板方法遇到了委托函数,你的代码又可以精简了 云南大王-基于.NetCore3.1搭建项目系列 —— 使用Swagger导出文档 (补充篇) 云南大王-关键词匹配优化(第1篇)—— 测试计算过程 云南大王-原理解密 → Spring AOP 实现动态数据源(读写分离),底层原理是什么 云南大王-Navicat 密码加密算法 云南大王-【WPF学习】第六十六章 支持可视化状态 云南大王-composer安装 windows 云南大王-ASP.NET Core中的Controller 云南大王-HttpClient来自官方的JSON扩展方法 云南大王-Python3标准库:http.cookies HTTP cookie
/template/Home/Zkeys/PC/Static