网上参考大神们的博客,自身做了贰个RabbitMQ即时发音信的德姆o。

话不多说,直接上代码!

注:那份文书档案是本人和多少个对象学习后一并形成的。

金沙注册送58 1

 

一:搭建三个化解方案框架:RabbitMQ_Demo

目录

  • RabbitMQ 概念
  • exchange调换机机制
    • 什么是调换机
    • binding?
    • Direct Exchange交换机
    • Topic Exchange交换机
    • Fanout Exchange交换机
    • Header Exchange交换机
  • RabbitMQ 的 Hello – Demo(springboot实现)
  • RabbitMQ 的 Hello Demo(spring xml实现)
  • RabbitMQ 在生育条件下利用和产出的标题
    • Spring RabbitMQ 注解
    • 消息的 JSON 传输
    • 音信持久化,断线重连,ACK。

1.引言

RabbitMQ——Rabbit Message
Queue的简写,但无法仅仅精晓其为消息队列,音讯代理更方便。RabbitMQ
是三个由 Erlang
语言开发的AMQP(高级新闻队列协议)的开源完毕,其内部结构如下:

金沙注册送58 2

RabbitMQ作为3个新闻代理,主要和消息交际,负责接收并转账音信。RabbitMQ提供了保障的新闻机制、跟踪机制和灵活的信息路由,支持音信集群和分布式安排。适用于排队算法、秒杀活动、信息分发、异步处理、数据同步、处理耗费时间任务、CQTiggoS等使用场景。

上边大家就来读书下RabbitMQ。

一.用到VS的NuGet安装包管理工具安装RabbitMQ.Client:

其间富含五个部分:

RabbitMQ 概念

RabbitMQ
即三个音信队列,首假如用来贯彻应用程序的异步和解耦,同时也能起到消息缓冲,音信分发的效劳。RabbitMQ使用的是AMQP协议,它是一种贰进制协议。暗许运营端口
5672。

在 RabbitMQ 中,如下图结构:

金沙注册送58 3

rabbitmq

  • 左手 P 代表 生产者,也便是往 RabbitMQ 发音讯的主次。
  • 高级中学档就是 RabbitMQ,中间囊括了 调换机 和 队列。
  • 左侧 C 代表 消费者,约等于往 RabbitMQ 拿音讯的次序。

那么,在那之中相比较根本的概念有 几个,分别为:虚拟主机,沟通机,队列,和绑定。

  • 虚拟主机:一个虚拟主机持有一组交流机、队列和绑定。为啥必要四个虚拟主机呢?很不难,RabbitMQ个中,用户只辛亏虚拟主机的粒度举行权力控制。
    因而,假使必要禁止A组访问B组的调换机/队列/绑定,必须为A和B分别创造一个虚拟主机。每3个RabbitMQ服务器都有1个默许的虚拟主机“/”。
  • 交换机:Exchange 用于转载消息,然则它不会做存款和储蓄 ,假设未有Queue bind 到 Exchange 的话,它会直接放弃掉 Producer
    发送过来的消息。

    • 此处有叁个相比较关键的概念:***路由键 ***
      。音讯到沟通机的时候,交互机会转载到对应的行列中,那么终归转发到哪个队列,就要依照该路由键。
  • 绑定:也正是调换机须要和队列相绑定,那其间如上海教室所示,是多对多的涉嫌。

二. 条件搭建

正文首要依照Windows下选用Vs Code 基于.net
core举办demo演示。开头在此以前大家须要准备好之下条件。

  • 安装Erlang运转条件
    下载安装Erlang。
  • 安装RabbitMQ
    下载安装Windows版本的RabbitMQ。
  • 启动RabbitMQ Server
    点击Windows伊始按钮,输入RabbitMQ找到RabbitMQ Comman Prompt,以管理员身份运转。
  • 次第执行以下命令运转RabbitMQ服务

    rabbitmq-service install
    rabbitmq-service enable
    rabbitmq-service start
    
  • 执行rabbitmqlctl status检查RabbitMQ状态

  • 安装管理平台插件
    执行rabbitmq-plugins enable rabbitmq_management即可成功安装,使用暗中同意账号密码(guest/guest)登录即可。

金沙注册送58 4

壹:RabbitMQ 公用类库项目

初识RabbitMQ连串之3。exchange沟通机机制

3. Hello RabbitMQ

在起来在此以前大家先来询问下音信模型:
金沙注册送58 5
顾客(consumer)订阅有些队列。生产者(producer)创设新闻,然后发表到行列(queue)中,队列再将消息发送到监听的顾客。

下边大家大家因此demo来精晓RabbitMQ的中央用法。

 

二:二个劳动者控制台项目

什么样是调换机

rabbitmq的message
model实际上海消防息不直接发送到queue中,中间有二个exchange是做信息分发,producer甚至不明白音信发送到那个队列中去。因而,当exchange收到message时,必须可信赖了然该怎么分发。是append到一定规则的queue,依然append到多个queue中,还是被撇下?那些规则都以通过exchagne的4种type去定义的。

The core idea in the messaging model in RabbitMQ is that the producer
never sends any messages directly to a queue. Actually, quite often
the producer doesn’t even know if a message will be delivered to any
queue at all.

Instead, the producer can only send messages to an exchange. An
exchange is a very simple thing. On one side it receives messages from
producers and the other side it pushes them to queues. The exchange
must know exactly what to do with a message it receives. Should it be
appended to a particular queue? Should it be appended to many queues?
Or should it get discarded. The rules for that are defined by the
exchange type.

exchange是三个音讯的agent,每一个虚拟的host中都有定义。它的义务是把message路由到区别的queue中。

③.1.音讯的出殡和接受

创建RabbitMQ文件夹,打开命令指示符,分别创造八个控制台项目Send、Receive。

dotnet new console --name Send //创建发送端控制台应用
cd Send //进入Send目录
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢复包

dotnet new console --name Receive //创建接收端控制台应用
cd Receive //进入Receive目录
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢复包

大家先来添加音讯发送端逻辑:

//Send.cs 
public static void Main(string[] args)
{
    //1.1.实例化连接工厂
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立连接
    using (var connection = factory.CreateConnection())
    {
        //3. 创建信道
        using (var channel = connection.CreateModel())
        {
            //4. 申明队列
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            //5. 构建byte消息数据包
            string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
            var body = Encoding.UTF8.GetBytes(message);
            //6. 发送数据包
            channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}

再来完善音信接收端逻辑:

//Receive.cs  省略部分代码
public static void Main()
{
    //1.实例化连接工厂
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立连接
    using (var connection = factory.CreateConnection())
    {
        //3. 创建信道
        using (var channel = connection.CreateModel())
        {
            //4. 申明队列
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            //5. 构造消费者实例
            var consumer = new EventingBasicConsumer(channel);
            //6. 绑定消息接收后的事件委托
            consumer.Received += (model, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);
                Console.WriteLine(" [x] Received {0}", message);
                Thread.Sleep(6000);//模拟耗时
                Console.WriteLine (" [x] Done");
            };
            //7. 启动消费者
            channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

先运转音信接收端,再运营音信发送端,结果如下图。

金沙注册送58 6

从上边的代码中能够看看,发送端和消费端的代码前肆步都以一致的。重要的分别在于发送端调用channel.BasicPublish主意发送音讯;而接收端要求实例化三个EventingBasicConsumer实例来进行音信处理逻辑。此外1些索要留意的是:音信接收端和出殡和埋葬端的队列名称(queue)必须保持1致,那里钦定的行列名为hello。

二.劳动者端代码:

3:三个买主要控制制台项目

binding?

exchange和queue通过routing-key关联,那两者之间的涉嫌是便是binding。如下图所示,X表示交流机,铁蓝表示队列,沟通机通过一个routing-key去binding三个queue,routing-key有哪些功效呢?看Direct
exchange类型沟通机。

金沙注册送58 7

3.二. 循环往复调度

运用工作行列的补益即是它亦可互相的处理队列。假设堆积了好多职分,大家只供给加上更加多的工小编(workers)就足以了。我们先运行多少个接收端,等待音信接收,再起步二个发送端举办音讯发送。

金沙注册送58 8

大家扩充运转3个消费端后的运转结果:

金沙注册送58 9

从图中可见,大家循环发送4条音讯,八个音信接收端按梯次被循环分配。
暗中认可情状下,RabbitMQ将按梯次将每条音信发送给下一个买主。平均各个顾客将获得同样数量的音讯。那种分发音讯的措施叫做循环(round-robin)。

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 using RabbitMQ.Client;
 7 
 8 namespace RabbitMQ.Producter
 9 {
10     class Program
11     {
12         /// <summary>
13         /// 连接配置
14         /// </summary>
15         private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
16         {
17             HostName="localhost",
18             UserName = "guest",
19             Password = "guest",
20             Port = 5672,
21             //VirtualHost = "JentVirtualHost"
22         };
23         /// <summary>
24         /// 路由名称
25         /// </summary>
26         const string ExchangeName = "Jent.Exchange";
27         /// <summary>
28         /// 队列名称
29         /// </summary>
30         const string QueueName = "Jent.Queue";
31         static void Main(string[] args)
32         {
33             DirectExchangeSendMsg();
34             Console.WriteLine("按任意键退出程序!");
35             Console.ReadKey();
36         }
37         /// <summary>
38         /// 单点精确路由模式
39         /// </summary>
40         private static void DirectExchangeSendMsg()
41         {
42             using (IConnection conn = rabbitMqFactory.CreateConnection())
43             {
44                 using (IModel channel = conn.CreateModel())
45                 {
46                     channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
47                     channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
48                     channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
49 
50                     var props = channel.CreateBasicProperties();
51                     props.Persistent = true;
52                     Console.WriteLine("请输入需要发送的消息:");
53                     string vadata = Console.ReadLine();
54                     while (vadata != "exit")
55                     {
56                         var msgBody = Encoding.UTF8.GetBytes(vadata);
57                         channel.BasicPublish(exchange: ExchangeName, routingKey: QueueName, basicProperties: props, body: msgBody);
58                         Console.WriteLine(string.Format("发送时间:{0},发送完毕,输入exit退出消息发送", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")));
59                         vadata = Console.ReadLine();
60                     }
61                 }
62             }
63         }
64     }
65 }

体系结构如图:

Directed Exchange

路由键exchange,该沟通机械收割到新闻后会把新闻发送到钦命routing-key的queue中。那信息调换机是怎么明白的吗?其实,producer
deliver新闻的时候会把routing-key add到 message
header中。routing-key只是3个messgae的attribute。

A direct exchange delivers messages to queues based on a message routing key. The routing key is a message attribute added into the message header by the producer. The routing key can be seen as an "address" that the exchange use to decide how to route the message. A message goes to the queue(s) whose binding key exactly matches the routing key of the message.

Default Exchange
那种是出格的Direct
Exchange,是rabbitmq内部默许的多个调换机。该调换机的name是空字符串,全数queue都暗许binding
到该调换机上。全体binding到该沟通机上的queue,routing-key都和queue的name一样。

三.三. 音信确认

依据大家地点的demo,壹旦RabbitMQ将新闻发送到消费端,新闻就会马上从内部存款和储蓄器中移出,无论消费端是还是不是处理完了。在那种景况下,信息就会丢掉。

为了确认保障1个音讯永远不会丢掉,RabbitMQ协助新闻确认(message
acknowledgments)
。当消费端接收新闻还要处理完毕后,会发送一个ack(音讯确认)复信号到RabbitMQ,RabbitMQ接收到那一个功率信号后,就能够去除掉那条已经处理的新闻任务。但借使消费端挂掉了(比如,通道关闭、连接丢失等)没有发送ack频域信号。RabbitMQ就会通晓有个别新闻并未有符合规律处理,RabbitMQ将会再一次将音信入队,假如有其它2个消费端在线,就会飞速的再一次发送到别的两个消费端。

RabbitMQ中平昔不新闻超时的概念,唯有当消费端关闭或奔溃时,RabbitMQ才会重新分发音讯。

微调下Receive中的代码逻辑:

 //5. 构造消费者实例
 var consumer = new EventingBasicConsumer(channel);
 //6. 绑定消息接收后的事件委托
 consumer.Received += (model, ea) =>
 {
     var message = Encoding.UTF8.GetString(ea.Body);
     Console.WriteLine(" [x] Received {0}", message);
     Thread.Sleep(6000);//模拟耗时
     Console.WriteLine(" [x] Done");
     // 7. 发送消息确认信号(手动消息确认)
     channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
 };
 //8. 启动消费者
 //autoAck:true;自动进行消息确认,当消费端接收到消息后,就自动发送ack信号,不管消息是否正确处理完毕
 //autoAck:false;关闭自动消息确认,通过调用BasicAck方法手动进行消息确认
 channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);

重视改动的是将
autoAck:true修改为autoAck:fasle,以及在新闻处理完成后手动调用BasicAck艺术开始展览手动音讯确认。

金沙注册送58 10

从图中可见,音信发送端连接发送四条音信,个中消费端1先被分配处理第1条新闻,消费端二被循环分配第一条音讯,第一条新闻由于未有空闲消费者依旧在队列中。
在消费端2未处理完第壹条新闻在此以前,手动中断(ctrl+c)。我们能够窥见RabbitMQ在下1回分发时,会优先将被中断的新闻分发给消费端1处理。

三.消费者端代码:

金沙注册送58 11

Topic Exchange

通配符沟通机,exchange会把音信发送到一个要么五个满意通配符规则的routing-key的queue。其中*表号匹配三个word,#相称八个word和途径,路径之间通过.隔断。如满足a.*.c的routing-key有a.hello.c;满足#.hello的routing-key有a.b.c.helo。

三.四. 新闻持久化

音信确认确认保证了固然消费端万分,新闻也不会丢掉可以被重复分发处理。不过一旦RabbitMQ服务端至极,音讯依旧会丢掉。除非大家钦点durable:true,不然当RabbitMQ退出或奔溃时,新闻将依旧会丢掉。通过点名durable:true,并指定Persistent=true,来告诉RabbitMQ将音信持久化。

//send.cs
//4. 申明队列(指定durable:true,告知rabbitmq对消息进行持久化)
channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments
//将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//5. 构建byte消息数据包
string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
//6. 发送数据包(指定basicProperties)
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);

将音信标记为持久性不能完全保障消息不会丢掉。即使它报告RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受新闻还要还并未有保留时​​,如故有三个相当短的小运窗口。RabbitMQ
可能只是将音讯保存到了缓存中,并不曾将其写入到磁盘上。持久化是不可能肯定保障的,不过对于一个简易职责队列来说早已够用。假使须求保险新闻队列的持久化,能够采取publisher
confirms.

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 using RabbitMQ.Client;
 7 
 8 namespace RabbitMQ.Consumer
 9 {
10     class Program
11     {
12         /// <summary>
13         /// 连接配置
14         /// </summary>
15         private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
16         {
17             HostName = "127.0.0.1",
18             UserName = "guest",
19             Password = "guest",
20             Port = 5672,
21             //VirtualHost = "JentVirtualHost"
22         };
23         /// <summary>
24         /// 路由名称
25         /// </summary>
26         const string ExchangeName = "Jent.Exchange";
27         /// <summary>
28         /// 队列名称
29         /// </summary>
30         const string QueueName = "Jent.Queue";
31 
32         static void Main(string[] args)
33         {
34             DirectAcceptExchange();
35 
36             Console.WriteLine("输入任意值退出程序!");
37             Console.ReadKey();
38         }
39 
40         private static void DirectAcceptExchange()
41         {
42             using (IConnection conn = rabbitMqFactory.CreateConnection())
43             {
44                 using (IModel channel = conn.CreateModel())
45                 {
46                     channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
47                     channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
48                     channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
49 
50                     while (true)
51                     {
52                         BasicGetResult msgResponse = channel.BasicGet(QueueName, autoAck: false);
53                         if (msgResponse != null)
54                         {
55                             var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
56                             Console.WriteLine(string.Format("接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
57                         }
58                         //System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
59                     }
60                 }
61             }
62         }
63     }
64 }

 

Fanout Exchange

扇形交流机,该交流机会把音讯发送到全数binding到该沟通机上的queue。那种是publisher/subcribe格局。用来做广播最棒。
拥有该exchagne上点名的routing-key都会被ignore掉。

The fanout copies and routes a received message to all queues that are
bound to it regardless of routing keys or pattern matching as with
direct and topic exchanges. Keys provided will simply be ignored.

3.伍. 持平分发

RabbitMQ的音讯分发默许遵照消费端的数目,按梯次循环分发。那样仅是保证了消费端被平均分发音讯的数额,但却不经意了消费端的闲忙意况。那就大概出现某些消费端直接处理耗费时间职务处于阻塞状态,某些消费端直接处理一般职分处于空置状态,而只是它们分配的职分数量壹样。

金沙注册送58 12

但大家能够通过channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
设置prefetchCount : 1来报告RabbitMQ,在未接到消费端的音讯确认时,不再分发音信,也就保障了当消费端处于勤奋景色时,不再分配职务。

//Receive.cs
//4. 申明队列
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
//设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

此刻你必要留意的是壹旦具有的消费端都地处忙绿景观,你的种类或者会被塞满。你需求专注那一点,要么添加更加多的消费端,要么采纳其余策略。

4.主次结果:

二:开发之前,要求引用RabbitMQ包

Header Exchange

设置header attribute参数类型的交流机。

4. Exchange

仔细的你恐怕发现下面的demo,生产者和买主直接是经过1样队列名称实行相称衔接的。消费者订阅有些队列,生产者创造音讯发表到行列中,队列再将音讯转载到订阅的顾客。那样就会有2个局限性,即消费者1次只可以发送音讯到某3个行列。

那消费者哪些才能发送新闻到多个音信队列呢?
金沙注册送58,RabbitMQ提供了Exchange,它就如于路由器的效应,它用来对音讯举行路由,将音信发送到多少个类别上。Exchange1方面从劳动者接收新闻,另1方面将音信推送到行列。但exchange必须知道什么处理接收到的音信,是将其附加到特定队列依然外加到三个系列,依然一向忽略。而这几个规则由exchange
type定义,exchange的法则如下图所示。
金沙注册送58 13

周围的exchange type 有以下两种:

  • direct(明确的路由规则:消费端绑定的种类名称必须和音讯公布时钦定的路由名称相同)
  • topic (格局相称的路由规则:协助通配符)
  • fanout (音讯广播,将消息分发到exchange上绑定的兼具队列上)

上面我们就来挨家挨户那介绍它们的用法。

金沙注册送58 14

设置相应的Nuget包,恐怕下载相关dll也能够,可是建议在线安装nuget,更方便人民群众

RabbitMQ 的 Hello Demo

设置就隐瞒了,建议遵照法定文书档案上做。先贴代码,稍后解释,代码如下:

布局 交流机,队列,调换机与队列的绑定,新闻监视容器:

@Configuration
@Data
public class RabbitMQConfig {

    final static String queueName = "spring-boot";

    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("spring-boot-exchange");
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(queueName);
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    Receiver receiver() {
        return new Receiver();
    }
    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
}

配备接收音讯者(即消费者):

public class Receiver {

    private CountDownLatch latch = new CountDownLatch(1);

    public void receiveMessage(String message) {
        System.out.println("Received <" + message + ">");
        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }
}

计划发送新闻者(即生产者):

@RestController
public class Test {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping(value = "/test/{abc}",method = RequestMethod.GET)
    public String test(@PathVariable(value = "abc") String abc){
        rabbitTemplate.convertAndSend("spring-boot", abc + " from RabbitMQ!");
        return  "abc";
    }
}

上述便可达成八个简单的 RabbitMQ
Demo,具体代码在:点这里

那正是说,那里,分为七个部分分析:发音讯,沟通机队列,收音讯。

  • 对此发送新闻:咱们一般能够运用 RabbitTemplate,那些是 Spring
    封装给了作者们,便于大家发送音讯,大家调用
    rabbitTemplate.convertAndSend("spring-boot", xxx); 即可发送新闻。
  • 对此沟通机队列:如上代码,大家须要配备调换机
    TopicExchange,配置队列 Queue,并且配备他们中间的绑定 Binding
  • 对此收受新闻:首先须求创建三个音讯监听容器,然后把我们的接受者注册到该容器中,那样,队列中有信息,那么就会调用接收者的照应的格局。如上代码
    container.setMessageListener(listenerAdapter);
    在那之中,MessageListenerAdapter 能够当做是
    我们接收者的二个包装类,new MessageListenerAdapter(receiver, "receiveMessage");
    指明了1旦有音信来,那么调用接收者哪个方法开始展览处理。

4.1 fanout

本着由浅入深的思虑,大家先来精通下fanout的广播路由体制。fanout的路由机制如下图,即发送到
fanout 类型exchange的音信都会散发到拥有绑定该exchange的行列上去。

金沙注册送58 15

劳动者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用fanout exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到指定exchange,fanout类型无需指定routingKey
channel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: null, body: body);

顾客示例代码:

//申明fanout类型exchange
channel.ExchangeDeclare (exchange: "fanoutEC", type: "fanout");
//申明随机队列名称
var queuename = channel.QueueDeclare ().QueueName;
//绑定队列到指定fanout类型exchange,无需指定路由键
channel.QueueBind (queue : queuename, exchange: "fanoutEC", routingKey: "");

 

查找:RabbitMQ.Client  
安装新型版即可,不精通怎么设置nuget,请移步 

RabbitMQ 的 Hello Demo(spring xml实现)

spring
xml情势实现RabbitMQ不难,可读性较好,配置简单,配置和兑现如下所示。

上文已经讲述了rabbitmq的布署,xml格局通过properites文件存放用户配置音讯:

mq.host=127.0.0.1
mq.username=guest
mq.password=guest
mq.port=5672

配置application-mq.xml配置文件,申明连接、沟通机、queue以及consumer监听。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >
    <description>rabbitmq 连接服务配置</description>

    <!-- 连接配置 -->
    <context:property-placeholder location="classpath:mq.properties" />
    <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}"/>
    <rabbit:admin connection-factory="connectionFactory"/>
    <!-- spring template声明-->
    <rabbit:template exchange="amqpExchange" id="amqpTemplate"  connection-factory="connectionFactory" />

    <!--申明queue-->
    <rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" />
    <!--申明exchange交换机并绑定queue-->
    <rabbit:direct-exchange name="amqpExchange" durable="true" auto-delete="false" id="amqpExchange">
        <rabbit:bindings>
            <rabbit:binding queue="test_queue_key" key="test_queue_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>


    <!--consumer配置监听-->
    <bean id="reveiver" class="com.demo.mq.receive.Reveiver" />
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
        <rabbit:listener queues="test_queue_key" ref="reveiver" method="receiveMessage"/>
    </rabbit:listener-container>
</beans>

上述代码中,引进properties文件就不多说了。

<rabbit:connection-factory>标签评释创设connection的factory工厂。

<rabbit-template>声明spring
template,和上文spring中使用template一样。template可声明exchange。

<rabbit:queue>宣示三个queue并设置queue的配置项,直接看标签属性就能够明白queue的配备项。

<rabbit:direct-exchange>声称交流机并绑定queue。

<rabbit:listener-container>表明监听container并配置consumer和监听routing-key。

剩余就大约了,application-context.xml中把rabbitmq配置import进去。

<?xml version="1.0" encoding="UTF-8"?>
<beans
        xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:task="http://www.springframework.org/schema/task"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:aop="http://www.springframework.org/schema/aop"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd
       http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd">

    <context:component-scan base-package="com.demo.**" />
    <import resource="application-mq.xml" />
</beans>

Producer实现,发送信息照旧选择template的convertAndSend() deliver音讯。

@Service
public class Producer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    private final static Logger logger = LoggerFactory.getLogger(Producer.class);

    public void sendDataToQueue(String queueKey, Object object) {
        try {
            amqpTemplate.convertAndSend(queueKey, object);
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("exeception={}",e);
        }

    }
}

配置consumer

package com.demo.mq.receive;

import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;

@Service
public class Reveiver {
    private CountDownLatch latch = new CountDownLatch(1);

    public void receiveMessage(String message) {
        System.out.println("reveice msg=" + message.toString());
        latch.countDown();
    }
}

测试deliver消息

Controller
@RequestMapping("/demo/")
public class TestController {
    private final static Logger logger = LoggerFactory.getLogger(TestController.class);
    @Resource
    private Producer producer;


    @RequestMapping("/test/{msg}")
    public String send(@PathVariable("msg") String msg){
        logger.info("#TestController.send#abc={msg}", msg);
        System.out.println("msg="+msg);
        producer.sendDataToQueue("test_queue_key",msg);
        return "index";
    }
}

4.2. direct

direct相对于fanout就属于完全相称、单播的格局,路由体制如下图,即队列名称和新闻发送时内定的路由完全相配时,音信才会发送到内定队列上。
金沙注册送58 16

生产者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用direct exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "directEC", type: "direct");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到direct类型exchange,必须指定routingKey
channel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: null, body: body);

买主示例代码:

//申明direct类型exchange
channel.ExchangeDeclare (exchange: "directEC", type: "direct");
//绑定队列到direct类型exchange,需指定路由键routingKey
channel.QueueBind (queue : green, exchange: "directEC", routingKey: "green");

注:在首先步事先,你须要安装RabbitMQ客户端,可从

安装好未来,起初激动的写代码啦!

RabbitMQ 在生育条件下利用和产出的题材

在生育环境中,由于 Spring 对 RabbitMQ
提供了某个惠及的诠释,所以率先可以利用那些申明。例如:

  • @EnableRabbit:@EnableRabbit 和 @Configuration
    注明在3个类中结成使用,倘诺此类能够回到一个RabbitListenerContainerFactory 类型的
    bean,那么就也就是能够把该终端(消费端)和 RabbitMQ
    进行再三再四。Ps:(生成端不是透过 RabbitListenerContainerFactory 来和
    RabbitMQ 连接,而是通过 RabbitTemplate )
  • @RabbitListener:当对应的种类中有消息的时候,该申明修饰下的法子会被实施。
  • @RabbitHandler:接收者能够监听多少个体系,不相同的队列新闻的品类可能区别,该阐明能够使得分裂的音讯让差异方法来响应。

切切实实那些注脚的使用,能够参照那里的代码:点这里

先是,生产环境下的 RabbitMQ
可能不会在劳动者也许消费者本机上,所以需求再行定义
ConnectionFactory,即:

@Bean
ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
    connectionFactory.setUsername(userName);
    connectionFactory.setPassword(password);
    connectionFactory.setVirtualHost(vhost);
    return connectionFactory;
}

此地,可以重新设置须求接二连三的 RabbitMQ 的
ip,端口,虚拟主机,用户名,密码。

接下来,能够先从生产端思念,生产端须求连接 RabbitMQ,那么可以因而RabbitTemplate 进行连接。 Ps:(RabbitTemplate
用于生产端发送新闻到沟通机中),如下代码:

@Bean(name="myTemplate")
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMessageConverter(integrationEventMessageConverter());
    template.setExchange(exchangeName);
    return template;
}

在该代码中,new RabbitTemplate(connectionFactory);
设置了生产端连接到RabbitMQ,template.setMessageConverter(integrationEventMessageConverter());
设置了 生产端发送给沟通机的音讯是以什么格式的,在
integrationEventMessageConverter() 代码中:

public MessageConverter integrationEventMessageConverter() {
    Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
    return messageConverter;
}

如上 Jackson2JsonMessageConverter 指明了 JSON。上述代码的最终
template.setExchange(exchangeName); 指明了
要把劳动者要把新闻发送到哪个交换机上。

有了上述,那么,大家即可使用
rabbitTemplate.convertAndSend("spring-boot", xxx); 发送信息,xxx
表示任意档次,因为上述的装置会帮大家把这一个品种转化成 JSON 传输。

继而,生产端发送我们说过了,那么以后可以看看消费端:

对于消费端,我们得以只创造
SimpleRabbitListenerContainerFactory,它亦可帮大家转变
RabbitListenerContainer,然后我们再使用 @RabbitListener
钦命接收者收到消息时处理的主意。

@Bean(name="myListenContainer")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setMessageConverter(integrationEventMessageConverter());
    factory.setConnectionFactory(connectionFactory());
    return factory;
}

这其中
factory.setMessageConverter(integrationEventMessageConverter());
钦命了大家接受音信的时候,以 JSON
传输的新闻能够转换到对应的品种传入到艺术中。例如:

@Slf4j
@Component
@RabbitListener(containerFactory = "helloRabbitListenerContainer",queues = "spring-boot")
public class Receiver {
    @RabbitHandler
    public void receiveTeacher(Teacher teacher) {
        log.info("##### = {}",teacher);
    }
}

或者出现的题材:

4.3. topic

topic是direct的升迁版,是一种情势相称的路由机制。它扶助选拔三种通配符来举办格局相配:符号#和符号*。其中*相配二个单词,
#则意味着相配0个或多个单词,单词之间用.分开。如下图所示。
金沙注册送58 17

生产者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用topic exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "topicEC", type: "topic");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到topic类型exchange,必须指定routingKey
channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: null, body: body);

消费者示例代码:

//申明topic类型exchange
channel.ExchangeDeclare (exchange: "topicEC", type: "topic");
//申明随机队列名称
var queuename = channel.QueueDeclare ().QueueName;
//绑定队列到topic类型exchange,需指定路由键routingKey
channel.QueueBind (queue : queuename, exchange: "topicEC", routingKey: "#.*.fast");

        然则RabbitMQ又是借助于Erlang
OTP平台,所以,安装RabbitMQ以前,须要先从

 

音讯持久化

在生育条件中,大家要求思考万一劳动者挂了,消费者挂了,或然 rabbitmq
挂了什么。1般的话,倘诺劳动者挂了可能消费者挂了,其实是尚未影响,因为音讯就在队列之中。那么万1rabbitmq
挂了,从前在队列之中的音讯如何做,其实能够做消息持久化,RabbitMQ
会把音信保存在磁盘上。

做法是能够先从 Connection 对象中得到1个 Channel
信道对象,然后再能够透过该对象设置 音讯持久化。

5. RPC

兰德PAJEROPC——Remote Procedure Call,远程进度调用。
那RabbitMQ如何实行远程调用呢?示意图如下:
金沙注册送58 18
第贰步,主假使开始展览长途调用的客户端需求内定接收远程回调的类别,并发明消费者监听此行列。
第1步,远程调用的服务端除了要表明消费端接收远程调用请求外,还要将结果发送到客户端用来监听的结果的行列中去。

长距离调用客户端:

 //申明唯一guid用来标识此次发送的远程调用请求
 var correlationId = Guid.NewGuid().ToString();
 //申明需要监听的回调队列
 var replyQueue = channel.QueueDeclare().QueueName;
 var properties = channel.CreateBasicProperties();
 properties.ReplyTo = replyQueue;//指定回调队列
 properties.CorrelationId = correlationId;//指定消息唯一标识
 string number = args.Length > 0 ? args[0] : "30";
 var body = Encoding.UTF8.GetBytes(number);
 //发布消息
 channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);
 Console.WriteLine($"[*] Request fib({number})");
 // //创建消费者用于处理消息回调(远程调用返回结果)
 var callbackConsumer = new EventingBasicConsumer(channel);
 channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer);
 callbackConsumer.Received += (model, ea) =>
 {
      //仅当消息回调的ID与发送的ID一致时,说明远程调用结果正确返回。
     if (ea.BasicProperties.CorrelationId == correlationId)
     {
         var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";
         Console.WriteLine($"[x]: {responseMsg}");
     }
 };

长途调用服务端:

//申明队列接收远程调用请求
channel.QueueDeclare(queue: "rpc_queue", durable: false,
    exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine("[*] Waiting for message.");
//请求处理逻辑
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);
    int n = int.Parse(message);
    Console.WriteLine($"Receive request of Fib({n})");
    int result = Fib(n);
    //从请求的参数中获取请求的唯一标识,在消息回传时同样绑定
    var properties = ea.BasicProperties;
    var replyProerties = channel.CreateBasicProperties();
    replyProerties.CorrelationId = properties.CorrelationId;
    //将远程调用结果发送到客户端监听的队列上
    channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo,
        basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString()));
    //手动发回消息确认
    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine($"Return result: Fib({n})= {result}");
};
channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);

       
关于那有个别的剧情,推荐阅读:

三:达成RabbitMQ基本收发新闻的效果

劳动者可能消费者断线重连

此间 Spring 有活动重连机制。

6. 总结

听大人说下边包车型客车demo和对两种分化exchange路由体制的上学,大家发现RabbitMQ主假诺关乎到以下几个基本概念:

  1. Publisher:生产者,音信的发送方。
  2. Connection:网络连接。
  3. Channel:信道,多路复用连接中的一条独立的双向数据流通道。
  4. Exchange:交流器(路由器),负责消息的路由到相应队列。
  5. Binding:队列与交流器间的涉嫌绑定。消费者将关爱的行列绑定到内定交换器上,以便Exchange能准确分发音信到钦命队列。
  6. Queue:队列,新闻的缓冲存款和储蓄区。
  7. Virtual
    Host:虚拟主机,虚拟主机提供财富的逻辑分组和分手。包涵连接,交流,队列,绑定,用户权限,策略等。
  8. Broker:新闻队列的服务器实体。
  9. Consumer:消费者,音讯的接收方。

本次作为入门就讲到那里,下次大家来上课下EventBus +
RabbitMQ
怎么完成事件的散发。

参考资料:
RabbitMQ Tutorials
Demo路径——RabbitMQ

 

1:在RabbitMQ_Lib类库中新建类:MyRabbitMQ.cs

ACK 确认机制

每一个Consumer大概要求1段时间才能处理完收到的数目。要是在那个进度中,Consumer出错了,至极退出了,而数据还尚无拍卖达成,那么
至极不幸,那段数据就不见了。因为我们利用no-ack的方式展开确认,相当于说,每一回Consumer接到数据后,而任由是不是处理完结,RabbitMQ Server会立刻把这么些Message标记为形成,然后从queue中除去了。

假诺二个Consumer分外退出了,它处理的数量可见被其它的Consumer处理,那样数据在那种情况下就不会丢掉了(注意是那种状态下)。
为了保险数据不被丢掉,RabbitMQ补助音信确认机制,即acknowledgments。为了保障数据能被正确处理而不光是被Consumer收到,那么大家无法利用no-ack。而相应是在拍卖完数据后发送ack。

在拍卖数量后发送的ack,就是报告RabbitMQ数据现已被接收,处理到位,RabbitMQ能够去安全的删减它了。
倘诺Consumer退出了可是未有发送ack,那么RabbitMQ就会把那一个Message发送到下二个Consumer。那样就确认保证了在Consumer极度退出的意况下多少也不会丢掉。

  此德姆o只是‘direct’方式的新闻发送接收格局。

金沙注册送58 19金沙注册送58 20

村办对 RabbitMQ ACK 的一部分疑点,求助:点这里

public class MyRabbitMQ
    {
        //连接工厂
        private ConnectionFactory factory { get; set; } = new ConnectionFactory
        {
            HostName = "localhost",
            Port = 5672,
            UserName = "guest",
            Password = "guest"
        };

        private IConnection connection { get; set; }

        private IModel channel { get; set; }

        //生产方 构造函数
        public MyRabbitMQ(string exchangeName = "",string exchangeType = ExchangeType.Direct)
        {
            //创建一个连接
            connection = factory.CreateConnection();
            channel = connection.CreateModel();

            //创建一个转发器
            channel.ExchangeDeclare(exchangeName, exchangeType);
        }

        //消费方 构造函数
        public MyRabbitMQ(string exchangeName = "", string queueName = "", string routingKey = "")
        {
            //创建一个连接
            connection = factory.CreateConnection();
            channel = connection.CreateModel();

            //创建一个队列
            channel.QueueDeclare(queueName, true, false, false);

            //队列绑定
            channel.QueueBind(queueName, exchangeName, routingKey);
        }

        public void SendMessage(string message="", string exchangeName = "", string routingKey = "")
        {
            channel.BasicPublish(exchangeName, routingKey, null, Encoding.UTF8.GetBytes(message));
        }

        public QueueingBasicConsumer ReceiveMessage(string queueName = "")
        {
            //EventingBasicConsumer
            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queueName, true, consumer);
            return consumer;
        }
    }

总结

  1. RabbitMQ 成效:异步,解耦,缓冲,新闻分发。
  2. RabbitMQ 首要分为3个部分,生产者,调换机和队列,消费者。
  3. 亟需小心音信持久化,目标为了防患 RabbitMQ 宕机;怀想 ACK
    机制,目标为了假使消费者对音信的拍卖退步了,那么继续要哪些处理。

View Code

写在终极

  1. 写出来,说出来才清楚对不对,知道不对才能核查,勘误了才能成才。
  2. 在技术方面,希望大家眼里都容不得沙子。倘若有不规则的地点或然必要勘误的地点希望得以提出,极度感激。

可以观察,首先有三个构造函数,二个是给劳动者选取,二个是给消费者选取,注意参数有所不相同,能够看出生产者与顾客关注的点是不等同的。

不管生产大概花费,都以二个客户端,都亟需创制三个RabbitMQ连接并成立二个channel,才足以展开有关的操作,那个操作都以由channel发起的,那样说应该相比白话了。

结构生产者的时候,主借使创建2个转载器,转发器的名字及项目供给定义,

转载器常用类型包蕴三种:direct、fanout、topic,

那二种档次这里说的更精通:

本例子中是以topic为例子的

 

2:MQ_Producter项目中发送音信(生产者中发送消息)

金沙注册送58 21金沙注册送58 22

class Program
    {
        static void Main(string[] args)
        {
            string exchangeName = "07281616_exchange_topic";
            string routingkeya = "0728.a.c.routingkey";
            string routingkeyb = "0728.b.routingkey";
            MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, ExchangeType.Topic);
            for (int i = 0; i < 3600; i++)
            {
                System.Threading.Thread.Sleep(1000);
                if (i % 2 == 0)
                {
                    var message = $"{routingkeyb} -- {DateTime.Now.ToLongTimeString()}";
                    Console.WriteLine($"auto send: {message}  for {routingkeyb}");
                    myMQ.SendMessage(message, exchangeName, routingkeyb);
                }
                else
                {
                    var message = $"{routingkeya} -- {DateTime.Now.ToLongTimeString()}";
                    Console.WriteLine($"auto send: {message}  for {routingkeya}");
                    myMQ.SendMessage(message, exchangeName, routingkeya);
                }

            }
        }
    }

View Code

此处发送了3600次,每秒发二遍新闻,奇数和偶数发送的路由规则不同,会有八个例外的客户端来接受,那样方便我们测试信息是或不是被分发到了差异的队列上

 

3:七个买主项目开始展览音讯的选用

买主1:

金沙注册送58 23金沙注册送58 24

class Program
    {
        static void Main(string[] args)
        {
            string queueName = "07281616_queue";
            string exchangeName = "07281616_exchange_topic";
            var routingRule = "0728.*.routingkey";
            MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, queueName, routingRule);
            var consumer = myMQ.ReceiveMessage(queueName);
            while (true)
            {
                //BasicConsume 方法是可阻塞的,比较好
                var msgResponse = consumer.Queue.Dequeue();
                //这种方法不好,没有阻塞等待
                //var msgResponse = channel.BasicGet("zzs_queue", true);
                var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
                Console.WriteLine($"Received: {msgBody}  (only for {routingRule})");
            }
        }
    }

View Code

 

顾客2:

金沙注册送58 25金沙注册送58 26

class Program
    {
        static void Main(string[] args)
        {
            string queueName = "07281626_queue";
            string exchangeName = "07281616_exchange_topic";
            var routingRule = "0728.a.*.routingkey";
            MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, queueName, routingRule);
            var consumer = myMQ.ReceiveMessage(queueName);
            while (true)
            {
                //BasicConsume 方法是可阻塞的,比较好
                var msgResponse = consumer.Queue.Dequeue();
                //这种方法不好,没有阻塞等待
                //var msgResponse = channel.BasicGet("zzs_queue", true);
                var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
                Console.WriteLine($"Received: {msgBody} (only for {routingRule})");
            }
        }
    }

View Code

 

八个买主分别收受不相同队列上的新闻

 

4:运行!

先编写翻译一下,到bin目录下先运维生产者,在运维多个买主

金沙注册送58 27

 

也足以先关掉消费端,过柒秒再关闭生产端,在web
管理界面能够看来今后有1个连串里有消息,1个三条二个四条

金沙注册送58 28

 

 四:总结

总体例子的拥有代码都在此处了,代码里有关切释也很领悟,是本身要好实现的首先个RabbitMQ收发作用,实际运用中必然能够有这些恢宏,新手们有狐疑或然本身了解的有畸形的地点,烦请评论处提议哈,我们共同升高!

相关文章

网站地图xml地图