查看原文
其他

.NET Core Redis 消息队列中间件

DotNet 2022-07-19

↓推荐关注↓

前言


这是一篇拖更很久的博客,不知不觉InitQ在nuget下载量已经过15K了,奈何胸无点墨也不晓得怎么写(懒),随便在github上挂了个md,现在好好唠唠如何在redis里使用队列


队列缓存分布式 异步调优堆配置 ------(来自某位不知名码友)


诞生背景


redis在项目中使用的越来越频繁,通常我们是用来做缓存,使用较多的就是String,Hash这两种类型,以及分布式锁,redis的List类型,就可以用于消息队列,使用起来更加简单,且速度更快,非常适合子服务内部之间的消息流转,创造灵感来自于杨老板的CAP(地址:https://www.cnblogs.com/tibos/p/11858095.html),采用注解的方式消费队列,让业务逻辑更加的清晰,方便维护


安装环

  • .net core版本:2.1
  • redis版本:3.0以上

特点

1.通过注解的方式,订阅队列
2.可以设置消费消息的频次
3.支持消息广播
4.支持延迟队列

使用介绍

1、获取initQ包

  • 方案A. install-package InitQ

  • 方案B. nuget包管理工具搜索 InitQ

2、添加中间件(该中间件依赖 StackExchange.Redis)

  services.AddInitQ(m=> 
  {
      m.SuspendTime = 1000;
      m.IntervalTime = 1000
      m.ConnectionString = "127.0.0.1,connectTimeout=15000,syncTimeout=5000,password=123456";
      m.ListSubscribe = new List<Type>() { typeof(RedisSubscribeA), typeof(RedisSubscribeB) };
      m.ShowLog = false;
  });
3、配置说明
  public class InitQOptions
  {
      /// <summary>
      /// redis连接字符串
      /// </summary>
      public string ConnectionString { getset; }
  
      /// <summary>
      /// 没消息时挂起时长(毫秒)
      /// </summary>
      public int SuspendTime { getset; }
  
      /// <summary>
      /// 每次消费消息间隔时间(毫秒)
      /// </summary>
      public int IntervalTime { getset; }
  
      /// <summary>
      /// 是否显示日志
      /// </summary>
      public bool ShowLog { getset; }
  
      /// <summary>
      /// 需要注入的类型
      /// </summary>
      public IList<Type> ListSubscribe { getset; }
  
      public InitQOptions()
      {
          ConnectionString = "";
          IntervalTime = 0;
          SuspendTime = 1000;
          ShowLog = false;
      }
  }

消息发布/订阅

消息的发布/订阅是最基础的功能,这里做了几个优化

  1. 采用的是长轮询模式,可以控制消息消费的频次,以及轮询空消息的间隔,避免资源浪费
  2. 支持多个类订阅消息,可以很方便的根据业务进行分类,前提是这些类 必须注册
  3. 支持多线程消费消息(在执行耗时任务的时候,非常有用)

示例如下(Thread.Sleep):

    public class RedisSubscribeAIRedisSubscribe
    {
        [Subscribe("tibos_test_1")]
        private async Task SubRedisTest(string msg)
        {
            Console.WriteLine($"A类--->当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg}");
            Thread.Sleep(3000); //使用堵塞线程模式,同步延时
            Console.WriteLine($"A类<---当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg} 完成");
        }
    }
    public class RedisSubscribeAIRedisSubscribe
    {
        [Subscribe("tibos_test_1")]
        private async Task SubRedisTest(string msg)
        {
            Console.WriteLine($"A类--->当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg}");
            Thread.Sleep(3000); //使用堵塞线程模式,同步延时
            Console.WriteLine($"A类<---当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg} 完成");
        }
        [Subscribe("tibos_test_1")]
        private async Task SubRedisTest2(string msg)
        {
            Console.WriteLine($"A类--->当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg}");
            Thread.Sleep(3000); //使用堵塞线程模式,同步延时
            Console.WriteLine($"A类<---当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg} 完成");
        }
    }

示例如下(Task.Delay):

    [Subscribe("tibos_test_1")]
    private async Task SubRedisTest(string msg)
    {
        Console.WriteLine($"A类--->当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg}");
        await Task.Delay(3000); //使用非堵塞线程模式,异步延时
        Console.WriteLine($"A类<---当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg} 完成");
    }

根据业务情况,合理的选择堵塞模式

1、订阅发布者

    using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
    {
        //redis对象
        var _redis = scope.ServiceProvider.GetService<ICacheService>();
        //循环向 tibos_test_1 队列发送消息
        for (int i = 0; i < 1000; i++)
        {
            await _redis.ListRightPushAsync("tibos_test_1"$"我是消息{i + 1}号");
        }
    }
2、定义消费者类 RedisSubscribeA
  public class RedisSubscribeAIRedisSubscribe
  {
      [Subscribe("tibos_test_1")]
      private async Task SubRedisTest(string msg)
      {
          Console.WriteLine($"A类--->订阅者A消息消息:{msg}");
      }
  
      [Subscribe("tibos_test_1")]
      private async Task SubRedisTest1(string msg)
      {
          Console.WriteLine($"A类--->订阅者A1消息消息:{msg}");
      }
  
      [Subscribe("tibos_test_1")]
      private async Task SubRedisTest2(string msg)
      {
          Console.WriteLine($"A类--->订阅者A2消息消息:{msg}");
      }
  
      [Subscribe("tibos_test_1")]
      private async Task SubRedisTest3(string msg)
      {
          Console.WriteLine($"A类--->订阅者A3消息消息:{msg}");
      }
  }
3、定义消费者类 RedisSubscribeB
  public class RedisSubscribeB : IRedisSubscribe
  {
      /// <summary>
      /// 测试
      /// </summary>
      /// <param name="msg"></param>
      /// <returns></returns>
      [Subscribe("tibos_test_1")]
      private async Task SubRedisTest(string msg)
      {
          Console.WriteLine($"B类--->订阅者B消费消息:{msg}");
      }
  }

消息广播/订阅

消息广播是StackExchange.Redis已经封装好的,我们只用起个线程监听即可,只要监听了这个key的线程,都会收到消息

1、订阅消息通道,订阅者需要在程序初始化的时候启动一个线程侦听通道,这里使用HostedService来实现,并注册到容器
    public class ChannelSubscribeA : IHostedServiceIDisposable
    {
        private readonly IServiceProvider _provider;
        private readonly ILogger _logger;
  
        public ChannelSubscribeA(ILogger<TestMain> logger, IServiceProvider provider)
        {
            _logger = logger;
            _provider = provider;
        }
        public void Dispose()
        {
            _logger.LogInformation("退出");
        }
  
        public Task StartAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("程序启动");
            Task.Run(async () =>
            {
                using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
                {
                    //redis对象
                    var _redis = scope.ServiceProvider.GetService<ICacheService>();
                    await _redis.SubscribeAsync("test_channel"new Action<RedisChannel, RedisValue>((channel, message) =>
                    {
                        Console.WriteLine("test_channel" + " 订阅服务A收到消息:" + message);
                    }));
  
                }
            });
            return Task.CompletedTask;
        }
  
        public Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("结束");
            return Task.CompletedTask;
        }
    }
  public class ChannelSubscribeB : IHostedServiceIDisposable
  {
      private readonly IServiceProvider _provider;
      private readonly ILogger _logger;

      public ChannelSubscribeB(ILogger<TestMain> logger, IServiceProvider provider)
      {
          _logger = logger;
          _provider = provider;
      }
      public void Dispose()
      {
          _logger.LogInformation("退出");
      }

      public Task StartAsync(CancellationToken cancellationToken)
      {
          _logger.LogInformation("程序启动");
          Task.Run(async () =>
          {
              using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
              {
                  //redis对象
                  var _redis = scope.ServiceProvider.GetService<ICacheService>();
                  await _redis.SubscribeAsync("test_channel"new Action<RedisChannel, RedisValue>((channel, message) =>
                  {
                      Console.WriteLine("test_channel" + " 订阅服务B收到消息:" + message);
                  }));

              }
          });
          return Task.CompletedTask;
      }

      public Task StopAsync(CancellationToken cancellationToken)
      {
          _logger.LogInformation("结束");
          return Task.CompletedTask;
      }
  }
2、将HostedService类注入到容器
    services.AddHostedService<ChannelSubscribeA>();
    services.AddHostedService<ChannelSubscribeB>();
3、广播消息
    using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
    {
        //redis对象
        var _redis = scope.ServiceProvider.GetService<ICacheService>();
        for (int i = 0; i < 1000; i++)
        {
            await _redis.PublishAsync("test_channel"$"往通道发送第{i}条消息");
        }
    }

延迟消息

延迟消息非常适用处理一些定时任务的场景,如订单15分钟未付款,自动取消, xxx天后,自动续费...... 这里使用zset+redis锁来实现,这里的操作方式,跟发布/订阅非常类似写入延迟消息:SortedSetAddAsync注解使用:SubscribeDelay

1、定义发布者
    Task.Run(async () =>
    {
  
        using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
        {
            //redis对象
            var _redis = scope.ServiceProvider.GetService<ICacheService>();
  
            for (int i = 0; i < 100; i++)
            {
                var dt = DateTime.Now.AddSeconds(3 * (i + 1));
                //key:redis里的key,唯一
                //msg:任务
                //time:延时执行的时间
                await _redis.SortedSetAddAsync("test_0625"$"延迟任务,第{i + 1}个元素,执行时间:{dt.ToString("yyyy-MM-dd HH:mm:ss")}", dt);
            }
        }
    });
2、定义消费者
    //延迟队列
    [SubscribeDelay("test_0625")]
    private async Task SubRedisTest1(string msg)
    {
        Console.WriteLine($"A类--->当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者延迟队列消息开始--->{msg}");
        //模拟任务执行耗时
        await Task.Delay(TimeSpan.FromSeconds(3));
        Console.WriteLine($"A类--->{msg} 结束<---");
    }

版本

V1.0 更新时间:2019-12-30

版本库:

Git获取:https://github.com/wmowm/InitQ


转自:提伯斯

链接:cnblogs.com/tibos/p/14944832.html

- EOF -

推荐阅读  点击标题可跳转

1、.NET Core/.NET5/.NET6 开源项目:框架与架构设计

2、.NET 与树莓派 LED 数码管驱动模块
3、.NET Core/.NET5/.NET6 开源项目汇总:(权限)管理系统

看完本文有收获?请转发分享给更多人

推荐关注「DotNet」,提升.Net技能 

点赞和在看就是最大的支持❤️

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存