RabbitMQ队列

先是咱们在讲rabbitMQ在此之前大家要说一下python里的queue:二者干的事务是千篇一律的,都以队列,用于传递音讯

在python的queue中有多少个三个是线程queue,二个是经过queue(multiprocessing中的queue)。线程queue不能跨进度,用于八个线程之间打开数据同步交互;进程queue只是用来父进度与子进程,或许同属于同意父进度下的多少个子进度实行交互。相当于说倘诺是七个精光独立的主次,就算是python程序,也照例不能用这一个进度queue来通讯。那即使大家有两个单身的python程序,分属于多少个进程,或然是python和别的语言

安装:windows下

先是须求设置 Erlang环境

官网: 

Windows版下载地址:

Linux版:     使用yum安装

 

然后安装RabbitMQ了 

率先下载RabbitMQ 的Windows版本

下载地址:

安装pika:

事先安装过了pip,直接展开cmd,运转pip install pika

设置收尾之后,达成一个最简易的连串通讯:

金沙注册送58 1

producer:

 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 #声明一个管道
 5 channel = connection.channel()
 6 
 7 #声明queue
 8 channel.queue_declare(queue = 'hello')
 9 #routing_key是queue的名字
10 channel.basic_publish(exchange='',
11                       routing_key='hello',#queue的名字
12                       body='Hello World!',
13                       )
14 print("[x] Send 'Hello World!'")
15 connection.close()

 

先创造六在那之中央的socket,然后建立一个管道,在管道中发音信,然后声贝拉米(Bellamy)(Nutrilon)个queue,起个系列的名字,之后真正的发音讯(basic_publish)

consumer:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 3 channel = connection.channel()
 4 
 5 channel.queue_declare(queue='hello')
 6 
 7 
 8 def callback(ch, method, properties, body):#回调函数
 9     print("---->",ch,method,properties)
10     print(" [x] Received %r" % body)
11 
12 channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
13                       queue='hello',
14                       no_ack=True
15                        )
16 
17 print(' [*] Waiting for messages. To exit press CTRL+C')
18 channel.start_consuming()

 

 start_consuming()只要一运维就间接运营下去,他持续收一条,长久在此间卡住。

在地方不管是produce依旧consume,里面都宣示了贰个queue,这么些是为啥吗?因为我们不晓得是主顾先开头运营依然生产者先运营,那样只要未有表明的话就会报错。

上边大家来看一下一对多,即2个劳动者对应多少个顾客:

第二大家运行二个顾客,然后不断的用produce去发送数据,大家得以看出顾客是通过1种轮询的不二秘技进行持续的收受多少,每一个消费者消费1个。

这正是说1旦大家顾客接受了信息,然后处理这一个消息须要30分钟,在拍卖的经过中,消费者断电了宕机了,那消费者还并未有处理完,大家设这一个职分大家不可能不处理完,那大家应有有3个认可的音讯,说这几个职分成功了也许是未有产生,所以自身的生产者要肯定消费者是不是把这几个任务处理完了,消费者处理完以往要给那一个生产者服务器端发送三个认可新闻,生产者才会把这么些职分从音信队列中剔除。假使未有处理完,消费者宕机了,未有给劳动者发送确认新闻,那就代表从没处理完,这大家看看rabbitMQ是怎么处理的

笔者们能够在顾客的callback中增多一个time.sleep()实行模拟宕机。callback是3个回调函数,只要事件一触发就会调用这些函数。函数实行完了就象征消息处理完了,倘若函数没有拍卖完,那就认证。。。。

大家得以看看在消费者代码中的basic_consume()中有3个参数叫no_ack=True,这些意思是这条新闻是或不是被处理完都不会发送确认音信,壹般大家不加那个参数,rabbitMQ暗中同意就会给您设置成音讯处理完了就自行发送确认,大家今后把这一个参数去掉,并且在callback中增加一句话运维:ch.basic_ack(delivery_tag=method.delivery_tag)(手动处理)

def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

 

金沙注册送58 2金沙注册送58 3金沙注册送58 4

运营的结果正是,笔者先运营一遍生产者,数据被消费者一抽出到了,然而本身把顾客1宕机,停止运维,那么消费者2就接到了音讯,即只要消费者绝非发送确认新闻,生产者就不会把音信删除。

RabbitMQ消息持久化:

我们得以生成大多的信息队列,那我们怎么查看音讯队列的境况呢:rabbitmqctl.bat
list_queues

金沙注册送58 5

如今的动静是,新闻队列中还有消息,然而服务器宕机了,那这些新闻就丢了,那本身就要求以此消息强制的持久化:

channel.queue_declare(queue='hello2',durable=True)

 

在每便注脚队列的时候加多3个durable参数(客户端和劳务器端都要加上这一个参数),

金沙注册送58 6

在那一个状态下,大家把rabbitMQ服务重视启,发现唯有队列名留下了,但是队列中的新闻并未有了,那样大家还亟需在劳动者basic_publish中增添3个参数:properties

producer:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#声明一个管道
channel = connection.channel()

#声明queue
channel.queue_declare(queue = 'hello2',durable=True)
#routing_key是queue的名字
channel.basic_publish(exchange='',
                      routing_key='hello2',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,#make message persistent
                      )
                      )
print("[x] Send 'Hello World!'")
connection.close()

 

诸如此类就可以使得音信持久化

当今是三个劳动者对应八个顾客,很公正的收发收发,可是实际的场合是,我们机器的布置是分裂样的,有的配置是单核的局地配置是多核的,可能i七处理器处理4条消息的时候和其余的计算机处理一条音信的时间差不离,那差的计算机那里就会堆积音信,而好的微型Computer那里就会形成闲置,在现实中做运行的,大家会在负载均衡中设置权重,哪个人的配置高权重高,职责就多1些,不过在rabbitMQ中,我们只做了一个归纳的拍卖就足以兑现公平的新闻分发,你有多大的本事就处理多少音讯

即:server端给客户端发送音讯的时候,先反省未来还有多少音信,假诺当前音信并未有处理实现,就不会发送给那个消费者音讯。如若当前的主顾绝非音信就发送

那么些只必要在消费者端进行改造加代码:

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello2',durable=True)


def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
                      queue='hello2',
                      #no_ack=False
                       )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

 我们在扭转二个consume二,在callback中sleep20秒来效仿

金沙注册送58 7金沙注册送58 8金沙注册送58 9

本身先运行七个produce,被consume接受,然后在开发银行多少个,就被consumer二接受,可是因为consumer第22中学sleep20秒,处理慢,所以那时候在起步produce,就又给了consume实行处理

 

python学习之RabbitMQ—–音信队列,

RabbitMQ队列

前言:这一次整治写一篇关于rabbitMQ的博客,相比上一篇redis,感到rabbitMQ难度是提升不少。那篇博客会插入一些英文解说,然则轻便精晓的。rabbitMQ的下载与安装,请参见redis&rabbitMQ安装。

Publish\Subscrible(新闻表露\订阅)

近日皆以壹对壹的出殡和埋葬接收数据,那自身想壹对多,想广播一样,生产者发送二个音讯,全部顾客都接受音信。那我们如何做啊?那个时候大家就要用到exchange了

exchange在1端收新闻,在另1端就把音信放进queue,exchange必须精确的驾驭收到的音信要干什么,是还是不是合宜发到1个特定的queue依然发给繁多queue,或许说把他摒弃,这么些都被exchange的门类所定义

exchange在概念的时候是有档次的,以决定到底是那多个queue符合条件,能够承受新闻:

fanout:全体bind到此exchange的queue都足以承受音讯

direct:通过rounroutingKey和exchange决定的可怜唯壹的queue能够收起新闻

topic:全体符合routingKey的routingKey所bind的queue还不错消息

headers:通过headers来调节把新闻发给哪些queue

音讯队列。消息publisher:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='log',type = 'fanout')
 9 
10 message = ' '.join(sys.argv[1:]) or 'info:Hello World!'
11 channel.basic_publish(exchange='logs',routing_key='',body=message)
12 print("[x] Send %r " % message)
13 connection.close()

 

此处的exchange从前是空的,未来赋值log;在那里也未有证明queue,广播不须求写queue

 消息subscriber:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 3 channel = connection.channel()
 4 channel.exchange_declare(exchange='logs',exchange_type='fanout')
 5 
 6 #exclusive唯一的,不指定queue名字,rabbit会随机分配一个名字
 7 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
 8 result = channel.queue_declare(exclusive=True)
 9 queue_name = result.method.queue
10 
11 channel.queue_bind(exchange='logs',queue=queue_name)
12 
13 print('[*] Waiting for logs,To exit press CTRL+C')
14 
15 def callback(ch,method,properties,body):
16     print("[X] %r" % body)
17 channel.basic_consume(callback,queue = queue_name,no_ack=True)
18 channel.start_consuming()

 

在顾客那里大家有定义了三个queue,注意一下评释中的内容。可是大家在发送端未有评释queue,为什么发送端不必要接收端要求呢?在consume里有二个channel.queue_bind()函数,里面绑定了exchange转变器上,当然里面还亟需一个queue_name

运转结果:

金沙注册送58 10金沙注册送58 11金沙注册送58 12金沙注册送58 13

就也便是收音机一样,实时播报,展开七个买主,生产者发送一条数据,然后二个顾客同时接收到

RabbitMQ队列

率先大家在讲rabbitMQ此前大家要说一下python里的queue:二者干的事务是均等的,都以队列,用于传递新闻

在python的queue中有八个3个是线程queue,三个是经过queue(multiprocessing中的queue)。线程queue不能跨进度,用于多少个线程之间展开数据同步交互;进度queue只是用来父进程与子进度,大概同属于同意父进度下的两个子进程举行交互。也正是说倘若是多少个精光独立的主次,尽管是python程序,也依旧不可能用那一个进程queue来通讯。那假设我们有三个单身的python程序,分属于八个进程,大概是python和此外语言

安装:windows下

第1需求安装 Erlang环境 官方网站: 
Windows版下载地址:
Linux版:     使用yum安装   然后安装RabbitMQ了  首先下载RabbitMQ
的Windows版本 下载地址:

安装pika:

事先安装过了pip,直接张开cmd,运转pip install pika

设置收尾之后,完毕1个最简便易行的行列通讯:

金沙注册送58 14

producer:

 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 #声明一个管道
 5 channel = connection.channel()
 6 
 7 #声明queue
 8 channel.queue_declare(queue = 'hello')
 9 #routing_key是queue的名字
10 channel.basic_publish(exchange='',
11                       routing_key='hello',#queue的名字
12                       body='Hello World!',
13                       )
14 print("[x] Send 'Hello World!'")
15 connection.close()

 

先创制一其中央的socket,然后建立一个管道,在管道中发信息,然后声美赞臣(Meadjohnson)个queue,起个体系的名字,之后真正的发音讯(basic_publish)

consumer:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 3 channel = connection.channel()
 4 
 5 channel.queue_declare(queue='hello')
 6 
 7 
 8 def callback(ch, method, properties, body):#回调函数
 9     print("---->",ch,method,properties)
10     print(" [x] Received %r" % body)
11 
12 channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
13                       queue='hello',
14                       no_ack=True
15                        )
16 
17 print(' [*] Waiting for messages. To exit press CTRL+C')
18 channel.start_consuming()

 

 start_consuming()只要1运营就直接运转下去,他时时刻刻收一条,永久在此处卡住。

在地点不管是produce依旧consume,里面都声称了2个queue,那个是为啥呢?因为大家不精通是主顾头阵轫运维依旧生产者先运转,那样只要未有声明的话就会报错。

上面大家来看一下一对多,即3个劳动者对应八个顾客:

先是大家运转三个顾客,然后不断的用produce去发送数据,大家得以看看顾客是经过一种轮询的章程开始展览持续的收受多少,每种顾客消费三个。

那么只要大家顾客收到了音信,然后处理这几个新闻供给30分钟,在处理的历程中,消费者断电了宕机了,那消费者还不曾拍卖完,大家设那几个职分我们亟须处理完,那我们应该有3个确认的音信,说这些职分成功了依旧是尚未完结,所以笔者的劳动者要明确消费者是不是把那些职责处理完了,消费者处理完未来要给这些生产者服务器端发送三个承认音信,生产者才会把这几个任务从消息队列中剔除。假若未有拍卖完,消费者宕机了,未有给劳动者发送确认音信,那就意味着并未拍卖完,那我们看看rabbitMQ是怎么处理的

我们得以在消费者的callback中加多一个time.sleep()举行效仿宕机。callback是2个回调函数,只要事件一触发就会调用那个函数。函数施行完了就象征新闻处理完了,假若函数未有拍卖完,那就证明。。。。

我们得以观察在顾客代码中的basic_consume()中有叁个参数叫no_ack=True,那些意思是那条新闻是或不是被处理完都不会发送确认音讯,一般大家不加那么些参数,rabbitMQ暗许就会给您设置成音信处理完了就自动发送确认,我们今日把那几个参数去掉,并且在callback中添加一句话运营:ch.basic_ack(delivery_tag=method.delivery_tag)(手动处理)

def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

 

金沙注册送58 15金沙注册送58 16金沙注册送58 17

运转的结果就是,作者先运转一次生产者,数据被消费者壹收下到了,可是自身把顾客一宕机,结束运维,那么消费者2就接收了消息,即要是消费者未有发送确认消息,生产者就不会把音信删除。

RabbitMQ消息持久化:

笔者们能够改造加好些个的音信队列,那大家怎么查看新闻队列的意况呢:rabbitmqctl.bat
list_queues

金沙注册送58 18

最近的情形是,新闻队列中还有音讯,但是服务器宕机了,那那一个信息就丢了,那笔者就必要那么些音信强制的持久化:

channel.queue_declare(queue='hello2',durable=True)

 

在历次评释队列的时候增添三个durable参数(客户端和劳务器端都要抬高那一个参数),

金沙注册送58 19

在这么些情状下,大家把rabbitMQ服务注重启,发现唯有队列名留下了,可是队列中的消息尚未了,那样大家还亟需在劳动者basic_publish中增多几个参数:properties

producer:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#声明一个管道
channel = connection.channel()

#声明queue
channel.queue_declare(queue = 'hello2',durable=True)
#routing_key是queue的名字
channel.basic_publish(exchange='',
                      routing_key='hello2',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,#make message persistent
                      )
                      )
print("[x] Send 'Hello World!'")
connection.close()

 

那样就能够使得信息持久化

当今是二个劳动者对应多个买主,很公道的收发收发,然而实际上的事态是,大家机器的配备是不均等的,有的配置是单核的壹些配置是多核的,也许i7处理器处理4条音讯的时候和其它的Computer处理一条音讯的大运基本上,那差的微机那里就会堆积音讯,而好的微处理器那里就会变成闲置,在具体中做运转的,大家会在负载均衡中装置权重,什么人的布置高权重高,职责就多1些,但是在rabbitMQ中,我们只做了多个总结的处理就足以兑现公道的新闻分发,你有多大的技巧就处理多少新闻

即:server端给客户端发送音信的时候,先反省今后还有多少消息,如若当前新闻尚未处理完成,就不会发送给那些消费者新闻。如若当前的主顾绝非音信就发送

这些只须求在消费者端进行改换加代码:

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello2',durable=True)


def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
                      queue='hello2',
                      #no_ack=False
                       )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

 我们在变化一个consume2,在callback中sleep20秒来效仿

金沙注册送58 20金沙注册送58 21金沙注册送58 22

笔者先运转多个produce,被consume接受,然后在运行1个,就被consumer二接受,然而因为consumer第22中学sleep20秒,处理慢,所以那时在起步produce,就又给了consume进行处理

 

rabbitMQ是消息队列;想想此前的大家学过队列queue:threading
queue(线程queue,多少个线程之间展开数据交互)、进程queue(父进度与子进度打开互相或然同属于同一父进程下的多少个子进度张开交互);假若多少个单身的主次,那么之间是不可能透过queue实行相互的,这时候大家就要求壹在那之中档代理即rabbitMQ

rabbitMQ是音信队列;想想以前的大家学过队列queue:threading
queue(线程queue,多个线程之间张开数据交互)、进程Queue(父进度与子进度张开彼此可能同属于同一父进程下的三个子进度打开互动);借使多少个单身的主次,那么之间是不能够透过queue实行互相的,那时候我们就需要三其中档代理即rabbitMQ.

有选择的抽出音讯(exchange_type = direct)

RabbitMQ还协助依据重点字发送,即:队列绑定关键字,发送者将数据依照主要字发送到音信exchange,exchange依照重大字推断应该将数据发送到钦定的系列

金沙注册送58 23

publisher:

 1 import pika
 2 import sys
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
 7 
 8 severity = sys.argv[1] if len(sys.argv)>1 else 'info'
 9 message = ' '.join(sys.argv[2:]) or 'Hello World!'
10 channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
11 
12 print("[X] Send %r:%r" %(severity,message))
13 connection.close()

 

subscriber:

import pika
import sys
connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connect.channel()

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]#
if not severities:
    sys.stderr.write("Usage:%s [info] [warning] [error]\n" %sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

print('[*]Waiting for logs.To exit press CTRL+c')

def callback(ch,method,properties,body):
    print("[x] %r:%r"%(method.routing_key,body))

channel.basic_consume(callback,queue = queue_name,no_ack=True)
channel.start_consuming()

 

更是周到的过滤(exchange_type=topic)

金沙注册送58 24

 

publish:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

 

subscriber:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel = connection.channel()
 6 
 7 channel.exchange_declare(exchange='topic_logs',
 8                          exchange_type='topic')
 9 
10 result = channel.queue_declare(exclusive=True)
11 queue_name = result.method.queue
12 
13 binding_keys = sys.argv[1:]
14 if not binding_keys:
15     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
16     sys.exit(1)
17 
18 for binding_key in binding_keys:
19     channel.queue_bind(exchange='topic_logs',
20                        queue=queue_name,
21                        routing_key=binding_key)
22 
23 print(' [*] Waiting for logs. To exit press CTRL+C')
24 
25 
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28 
29 
30 channel.basic_consume(callback,
31                       queue=queue_name,
32                       no_ack=True)
33 
34 channel.start_consuming()

 

 

如上都以服务器端发新闻,客户端收音讯,新闻流是单向的,这假如大家想要发一条命令给长途的客户端去实行,然后想让客户端奉行的结果重返,则那种情势叫做rpc

RabbitMQ RPC

金沙注册送58 25

rpc server:

 1 import pika
 2 import time
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.queue_declare(queue='rpc_queue')
 7 def fib(n):
 8     if n==0:
 9         return 0
10     elif n==1:
11         return 1
12     else:
13         return fib(n-1)+fib(n-2)
14 
15 def on_request(ch,method,props,body):
16     n = int(body)
17     print("[.] fib(%s)" %n)
18     response = fib(n)
19 
20     ch.basic_publish(exchange='',routing_key=props.reply_to,
21                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
22                      body = str(response))
23     ch.basic_ack(delivery_tag=method.delivery_tag)25 channel.basic_consume(on_request,queue='rpc_queue')
26 
27 print("[x] Awaiting rpc requests")
28 channel.start_consuming()

 

 

rpc client:

 1 import pika
 2 import uuid,time
 3 class FibonacciRpcClient(object):
 4     def __init__(self):
 5         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 6 
 7         self.channel = self.connection.channel()
 8 
 9         result = self.channel.queue_declare(exclusive=True)
10         self.callback_queue =  result.method.queue
11 
12         self.channel.basic_consume(self.on_response,#回调函数,只要一收到消息就调用
13                                    no_ack=True,queue=self.callback_queue)
14 
15     def on_response(self,ch,method,props,body):
16         if self.corr_id == props.correlation_id:
17             self.response = body
18 
19     def call(self,n):
20         self.response = None
21         self.corr_id = str(uuid.uuid4())
22         self.channel.basic_publish(exchange='',routing_key='rpc_queue',
23                                    properties=pika.BasicProperties(
24                                        reply_to=self.callback_queue,
25                                        correlation_id=self.corr_id
26                                    ),
27                                    body=str(n),#传的消息,必须是字符串
28                                    )
29         while self.response is None:
30             self.connection.process_data_events()#非阻塞版的start_consuming
31             print("no message....")
32             time.sleep(0.5)
33         return int(self.response)
34 fibonacci_rpc = FibonacciRpcClient()
35 print("[x] Requesting fib(30)")
36 response = fibonacci_rpc.call(30)
37 print("[.] Got %r"%response)

 

之前的start_consuming是跻身四个封堵格局,未有音信就等候音信,有新闻就收过来

self.connection.process_data_events()是多个非阻塞版的start_consuming,就是说发了多少个事物给客户端,每过一点时刻去反省有未有消息,假诺未有新闻,能够去干其他作业

reply_to = self.callback_queue是用来选用反应队列的名字

corr_id =
str(uuid.uuid4()),correlation_id第贰在客户端会通过uuid4生成,第叁在劳务器端再次回到施行结果的时候也会传过来七个,所以说假设服务器端发过来的correlation_id与自个儿的id一样,那么服务器端发出来的结果就必定是自笔者刚好客户端发过去的通令的实施结果。以往就1个服务器端1个客户端,无所谓缺人不确认。以后客户端是非阻塞版的,大家能够不让它打字与印刷未有音信,而是进行新的一声令下,这样就两条新闻,不自然按顺序达成,那大家就要求去断定各种再次来到的结果是哪个命令的实践结果。

一体化的方式是那般的:生产者发了3个下令给买主,不亮堂客户端哪一天回来,依然要去收结果的,可是它又不想进去阻塞格局,想每过一段时间看那个消息收回来未有,如果新闻收回来了,就意味着收完了。 

运作结果:

金沙注册送58 26金沙注册送58 27

劳务器端开启,然后在运营客户端,客户端先是等待音讯的出殡,然后做出反应,直到算出斐波这契

 

 

 

 

 

 

 

 

 

 

Publish\Subscrible(信息表露\订阅)

后边都以一对1的出殡和埋葬接收数据,那作者想一对多,想广播同样,生产者发送多少个新闻,全体消费者都收到音信。那我们如何是好吗?今年大家将在用到exchange了

exchange在一端收音信,在另一端就把新闻放进queue,exchange必须规范的精晓收到的音信要怎么,是不是相应发到二个特定的queue照旧发给多数queue,也许说把她遗弃,那几个都被exchange的花色所定义

exchange在概念的时候是有档次的,以调整到底是那些queue符合条件,还行新闻:

fanout:全数bind到此exchange的queue都能够承受消息

direct:通过rounroutingKey和exchange决定的老大唯一的queue能够接收新闻

topic:全体符合routingKey的routingKey所bind的queue能够承受消息

headers:通过headers来支配把音讯发给哪些queue

消息publisher:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='log',type = 'fanout')
 9 
10 message = ' '.join(sys.argv[1:]) or 'info:Hello World!'
11 channel.basic_publish(exchange='logs',routing_key='',body=message)
12 print("[x] Send %r " % message)
13 connection.close()

 

那里的exchange在此之前是空的,今后赋值log;在那边也尚无申明queue,广播不须求写queue

 消息subscriber:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 3 channel = connection.channel()
 4 channel.exchange_declare(exchange='logs',exchange_type='fanout')
 5 
 6 #exclusive唯一的,不指定queue名字,rabbit会随机分配一个名字
 7 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
 8 result = channel.queue_declare(exclusive=True)
 9 queue_name = result.method.queue
10 
11 channel.queue_bind(exchange='logs',queue=queue_name)
12 
13 print('[*] Waiting for logs,To exit press CTRL+C')
14 
15 def callback(ch,method,properties,body):
16     print("[X] %r" % body)
17 channel.basic_consume(callback,queue = queue_name,no_ack=True)
18 channel.start_consuming()

 

在顾客那里大家有定义了3个queue,注意一下申明中的内容。可是大家在发送端未有申明queue,为啥发送端不需求接收端需求呢?在consume里有三个channel.queue_bind()函数,里面绑定了exchange调换器上,当然里面还须求贰个queue_name

运作结果:

金沙注册送58 28金沙注册送58 29金沙注册送58 30金沙注册送58 31

就相当于收音机一样,实时播报,展开八个买主,生产者发送一条数据,然后二个顾客同时吸收到

音信队列:

 

有选用的选拔新闻(exchange_type = direct)

RabbitMQ还帮忙依据重点字发送,即:队列绑定关键字,发送者将数据依照重大字发送到信息exchange,exchange依照重大字决断应该将数据发送到钦赐的类别

金沙注册送58 32

publisher:

 1 import pika
 2 import sys
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
 7 
 8 severity = sys.argv[1] if len(sys.argv)>1 else 'info'
 9 message = ' '.join(sys.argv[2:]) or 'Hello World!'
10 channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
11 
12 print("[X] Send %r:%r" %(severity,message))
13 connection.close()

 

subscriber:

import pika
import sys
connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connect.channel()

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]#
if not severities:
    sys.stderr.write("Usage:%s [info] [warning] [error]\n" %sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

print('[*]Waiting for logs.To exit press CTRL+c')

def callback(ch,method,properties,body):
    print("[x] %r:%r"%(method.routing_key,body))

channel.basic_consume(callback,queue = queue_name,no_ack=True)
channel.start_consuming()

 

愈来愈缜密的过滤(exchange_type=topic)

金沙注册送58 33

 

publish:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

 

subscriber:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel = connection.channel()
 6 
 7 channel.exchange_declare(exchange='topic_logs',
 8                          exchange_type='topic')
 9 
10 result = channel.queue_declare(exclusive=True)
11 queue_name = result.method.queue
12 
13 binding_keys = sys.argv[1:]
14 if not binding_keys:
15     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
16     sys.exit(1)
17 
18 for binding_key in binding_keys:
19     channel.queue_bind(exchange='topic_logs',
20                        queue=queue_name,
21                        routing_key=binding_key)
22 
23 print(' [*] Waiting for logs. To exit press CTRL+C')
24 
25 
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28 
29 
30 channel.basic_consume(callback,
31                       queue=queue_name,
32                       no_ack=True)
33 
34 channel.start_consuming()

 

 

上述都以服务器端发音信,客户端收音信,音讯流是单向的,那假若大家想要发一条命令给长途的客户端去推行,然后想让客户端实施的结果回到,则那种形式叫做rpc

RabbitMQ RPC

金沙注册送58 34

rpc server:

 1 import pika
 2 import time
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.queue_declare(queue='rpc_queue')
 7 def fib(n):
 8     if n==0:
 9         return 0
10     elif n==1:
11         return 1
12     else:
13         return fib(n-1)+fib(n-2)
14 
15 def on_request(ch,method,props,body):
16     n = int(body)
17     print("[.] fib(%s)" %n)
18     response = fib(n)
19 
20     ch.basic_publish(exchange='',routing_key=props.reply_to,
21                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
22                      body = str(response))
23     ch.basic_ack(delivery_tag=method.delivery_tag)25 channel.basic_consume(on_request,queue='rpc_queue')
26 
27 print("[x] Awaiting rpc requests")
28 channel.start_consuming()

 

 

rpc client:

 1 import pika
 2 import uuid,time
 3 class FibonacciRpcClient(object):
 4     def __init__(self):
 5         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 6 
 7         self.channel = self.connection.channel()
 8 
 9         result = self.channel.queue_declare(exclusive=True)
10         self.callback_queue =  result.method.queue
11 
12         self.channel.basic_consume(self.on_response,#回调函数,只要一收到消息就调用
13                                    no_ack=True,queue=self.callback_queue)
14 
15     def on_response(self,ch,method,props,body):
16         if self.corr_id == props.correlation_id:
17             self.response = body
18 
19     def call(self,n):
20         self.response = None
21         self.corr_id = str(uuid.uuid4())
22         self.channel.basic_publish(exchange='',routing_key='rpc_queue',
23                                    properties=pika.BasicProperties(
24                                        reply_to=self.callback_queue,
25                                        correlation_id=self.corr_id
26                                    ),
27                                    body=str(n),#传的消息,必须是字符串
28                                    )
29         while self.response is None:
30             self.connection.process_data_events()#非阻塞版的start_consuming
31             print("no message....")
32             time.sleep(0.5)
33         return int(self.response)
34 fibonacci_rpc = FibonacciRpcClient()
35 print("[x] Requesting fib(30)")
36 response = fibonacci_rpc.call(30)
37 print("[.] Got %r"%response)

金沙注册送58 , 

之前的start_consuming是跻身1个绿灯情势,未有讯息就等候音信,有音讯就收过来

self.connection.process_data_events()是2个非阻塞版的start_consuming,正是说发了二个东西给客户端,每过一点光阴去检查有未有音讯,假诺未有消息,能够去干别的事体

reply_to = self.callback_queue是用来收纳反应队列的名字

corr_id =
str(uuid.uuid4()),correlation_id第3在客户端会通过uuid四生成,第壹在劳动器端重回试行结果的时候也会传过来贰个,所以说假如服务器端发过来的correlation_id与和谐的id一样,那么服务器端发出来的结果就认定是小编正要客户端发过去的一声令下的实行结果。未来就二个服务器端3个客户端,无所谓缺人不认账。今后客户端是非阻塞版的,大家能够不让它打印未有音信,而是实行新的通令,那样就两条信息,不自然按顺序达成,那大家就须要去确定种种重临的结果是哪个命令的实行结果。

总体的形式是这样的:生产者发了多少个限令给买主,不知底客户端什么日期回来,照旧要去收结果的,可是它又不想进入阻塞格局,想每过一段时间看那一个消息收回来未有,借使消息收回来了,就表示收完了。 

运作结果:

金沙注册送58 35金沙注册送58 36

服务器端开启,然后在运营客户端,客户端先是等待新闻的出殡,然后做出反应,直到算出斐波那契

 

 

 

 

 

 

 

 

 

 

RabbitMQ队列
首先大家在讲rabbitMQ以前大家要说一下python里的queue:2者干的工作是一模同样的,都以队列,用于…

  • RabbitMQ
  • ZeroMQ
  • ActiveMQ
  • ………..

一、简单的rabbitMQ队列通讯

金沙注册送58 37

由上航海用体育场合能够,数据是首发给exchange沟通器,exchage再发放相应队列。pika模块是python对rabbitMQ的API接口。接收端有一个回调函数,一接收到数量就调用该函数。一条音信被二个消费者收到后,该音讯就从队列删除。OK,了然上边的学问后,先来看望二个简易的rabbitMQ列队通讯。

send端:

 1 import pika
 2 #连上rabbitMQ
 3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 channel=connection.channel()       #生成管道,在管道里跑不同的队列
 5 
 6 #声明queue
 7 channel.queue_declare(queue='hello1')
 8 
 9 #n RabbitMQ a message can never be sent directly to the queue,it always needs to go through an exchange.
10 #向队列里发数据
11 channel.basic_publish(exchange='',      #先把数据发给exchange交换器,exchage再发给相应队列
12                       routing_key='hello1', #向"hello'队列发数据
13                       body='HelloWorld!!')  #发的消息
14 print("[x]Sent'HelloWorld!'")
15 connection.close()

receive端:

 1 import pika
 2 
 3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 channel=connection.channel()
 5 
 6 # You may ask why we declare the queue again ‒ we have already declared it in our previous code.
 7 # We could avoid that if we were sure that the queue already exists. For example if send.py program
 8 # was run before. But we're not yet sure which program to run first. In such cases it's a good
 9 # practice to repeat declaring the queue in both programs.
10 channel.queue_declare(queue='hello1')#声明队列,保证程序不出错
11 
12 
13 def callback(ch,method,properties,body):
14     print("-->ch",ch)
15     print("-->method",method)
16     print("-->properties",properties)
17     print("[x] Received %r" % body)         #一条消息被一个消费者接收后,该消息就从队列删除
18 
19 
20 channel.basic_consume(callback,              #回调函数,一接收到消息就调用回调函数
21                       queue='hello1',
22                       no_ack=False)    #消费完毕后向服务端发送一个确认,默认为False
23 
24 print('[*] Waiting for messages.To exit press CTRL+C')
25 channel.start_consuming()

运行结果:(下面的代码对应本身写的笺注相信是看得懂的~)

金沙注册送58 38金沙注册送58 39

rabbitMQ_1_send.py
 [x] Sent 'Hello World!'


rabbitMQ_2_receive.py
 [*] Waiting for messages. To exit press CTRL+C
-->ch <pika.adapters.blocking_connection.BlockingChannel object at 0x000000000250AEB8>
-->method <Basic.Deliver(['consumer_tag=ctag1.f9533f4c8c59473c8096817670ad69d6', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello1'])>
-->properties <BasicProperties>
 [x] Received b'Hello World!!'

View Code

经过深刻的测试,有以下多少个意识:

  1. 先运行rabbitMQ_1_send.py发送数据,rabbitMQ_2_receive.py未运转。发现当receive运维时仍是可以接收数据。
  2. 运营多少个(eg:一个)接收数据的客户端,再运维发送端,客户端一接受数量,再运转载送端,客户端2收到数量,再运营发送端,客户端三吸收数额。

RabbitMQ会私下认可把p发的音讯依次分发给各类消费者(c),跟负载均衡差不离。

 

原理:

二、全英文ack

在看上边的例证,你会意识有一句代码no_ack=False(消费实现后向服务端发送三个分明,默感到False),以自己土耳其语4级飘过的水平,看完下边关于ack的教学以为写得很牛啊!!于是分享一下:

Doing a task can take a few seconds. You
may wonder what happens if one of the consumers starts a long task and
dies with it only partly done. With our current code once RabbitMQ
delivers message to the customer it immediately removes it from memory.
In this case, if you kill a worker we will lose the message it was just
processing. We’ll also lose all the messages that were dispatched to
this particular worker but were not yet handled.

But we don’t want to lose any tasks. If a
worker dies, we’d like the task to be delivered to another
worker.

In order to make sure a message is never
lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is
sent back from the consumer to tell RabbitMQ that a particular message
had been received, processed and that RabbitMQ is free to delete
it.

If a consumer dies (its channel is
closed, connection is closed, or TCP connection is lost) without sending
an ack, RabbitMQ will understand that a message wasn’t processed fully
and will re-queue it. If there are other consumers online at the same
time, it will then quickly redeliver it to another consumer. That way
you can be sure that no message is lost, even if the workers
occasionally die.

There aren’t any message timeouts;
RabbitMQ will redeliver the message when the consumer dies. It’s fine
even if processing a message takes a very, very long time.

Message
acknowledgments are turned on by default. In previous examples we
explicitly turned them off via the no_ack=True flag. It’s time to
remove this flag and send a proper acknowledgment from the worker, once
we’re done with a task.

Using this code we can be sure that even
if you kill a worker using CTRL+C while it was processing a message,
nothing will be lost. Soon after the worker dies all unacknowledged
messages will be redelivered.

自己把发送端和接收端分别比作生产者与顾客。生产者发送任务A,消费者接受职务A并拍卖,处理完后生产者将新闻队列中的职分A删除。今后大家相遇了1个主题素材:假若顾客接受职责A,但在处理的长河中出其不意宕机了。而那时生产者将新闻队列中的职务A删除。实际上任务A并未有得逞拍卖完,相当于丢失了职责/信息。为化解这几个标题,应使消费者收到职务并打响拍卖完后发送2个ack到生产者!生产者收到ack后就通晓任务A已被成功拍卖,那时才从消息队列中将任务A删除,如若未有吸收ack,就要求把职务A发送给下一个主顾,直到义务A被成功拍卖。

 

金沙注册送58 40

3、消息持久化

近来已经清楚,生产者生产数据,消费者再起步是能够接收数据的。

唯独,生产者生产数量,然后重启rabbitMQ,消费者是心有余而力不足接收数据。

eg:消息在传输进程中rabbitMQ服务器宕机了,会发觉此前的音讯队列就不设有了,那时大家将要用到新闻持久化,音讯持久化会让队列不随着服务器宕机而化为乌有,会恒久的保留下来。下边看下关于音讯持久化的英文疏解:

We have learned how to make sure that
even if the consumer dies, the task isn’t lost(by default, if wanna
disable  use no_ack=True). But our tasks will still be lost if RabbitMQ
server stops.

When RabbitMQ quits or crashes it will forget the
queues and messages unless you tell it not to. Two things are
required to make sure that messages aren’t lost: we need to mark both
the queue and messages as durable.

First, we
need to make sure that RabbitMQ will never lose our queue. In order to
do so, we need to declare it as durable:

      1 channel.queue_declare(queue=’hello’,
durable=True)

Although this command is correct by
itself, it won’t work in our setup. That’s because we’ve already defined
a queue called hello which is not durable. RabbitMQ doesn’t allow you to redefine an
existing queue with different parameters and will return an
error(会曝错) to any program that tries to do that. But there is
a quick workaround – let’s declare a queue with different name, for
exampletask_queue:

      1
channel.queue_declare(queue=’task_queue’, durable=True)

This queue_declare change needs to be
applied to both the producer and consumer code.

At that point we’re sure that
the task_queue queue won’t be lost even if RabbitMQ restarts. Now we
need to mark our messages as persistent –
by supplying a delivery_mode property with a value 2.

      1
channel.basic_publish(exchange=”,
      2
                      routing_key=”task_queue”,
      3
                      body=message,
      4
                      properties=pika.BasicProperties(
      5
                         delivery_mode = 2,      # make message
persistent
      6
                      ))

地点的英文对消息持久化讲得很好。音信持久化分为两步:

  • 持久化队列。通过代码达成持久化hello队列:channel.queue_declare(queue=’hello’,
    durable=True)
  • 持久化队列中的新闻。通过代码达成:properties=pika.BasicProperties( delivery_mode = 2, )

那边有个点要小心下:

若是您在代码中已兑现持久化hello队列与队列中的音讯。那么您重启rabbitMQ后再一次运转代码恐怕会爆错!

因为: RabbitMQ doesn’t allow you to
redefine an existing queue with different parameters and will return an
error.

为了缓解那几个主题材料,能够证明1(Wissu)个与重启rabbitMQ以前区别的队列名(queue_name).

 

壹、安装和着力使用

四、新闻公平分发

倘诺Rabbit只管按梯次把音讯发到各种消费者身上,不怀念消费者负载的话,很恐怕出现,二个机器配置不高的顾客那里堆积了繁多新闻处理不完,同时布置高的主顾却平素很轻巧。为消除此主题材料,能够在各种消费者端,配置perfetch=一,意思正是报告RabbitMQ在自家这几个消费者当前音讯还没处理完的时候就绝不再给小编发新新闻了。

金沙注册送58 41

 

带音信持久化+公平分发的总体代码

劳动者端:

金沙注册送58 42金沙注册送58 43

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.queue_declare(queue='task_queue', durable=True)  #队列持久化
 9  
10 message = ' '.join(sys.argv[1:]) or"Hello World!"
11 channel.basic_publish(exchange='',
12                       routing_key='task_queue',
13                       body=message,
14                       properties=pika.BasicProperties(
15                          delivery_mode = 2, # make message persistent消息持久化
16                       ))
17 print(" [x] Sent %r" % message)
18 connection.close()

View Code

买主端:

金沙注册送58 44金沙注册送58 45

 1 #!/usr/bin/env python
 2 import pika
 3 import time
 4  
 5 connection =pika.BlockingConnection(pika.ConnectionParameters(
 6         host='localhost'))
 7 channel = connection.channel()
 8  
 9 channel.queue_declare(queue='task_queue', durable=True)
10 print(' [*] Waiting for messages. To exit press CTRL+C')
11  
12 def callback(ch, method, properties, body):
13     print(" [x] Received %r" % body)
14     time.sleep(body.count(b'.'))
15     print(" [x] Done")
16     ch.basic_ack(delivery_tag =method.delivery_tag)   
17  
18 channel.basic_qos(prefetch_count=1)
19 channel.basic_consume(callback,
20                       queue='task_queue')
21  
22 channel.start_consuming()

View Code

自个儿在运营方面程序时对消费者端里回调函数的一句代码(ch.basic_ack(delivery_tag
=method.delivery_tag))十三分狐疑。那句代码去掉消费者端也能1如既往收到信息啊。那句代码有毛线用处??

劳动者端新闻持久后,要求在消费者端加上(ch.basic_ack(delivery_tag
=method.delivery_tag)): 有限帮助新闻被消费后,消费端发送三个ack,然后服务端从队列删除该音讯.

 

安装RabbitMQ服务
 

5、新闻揭橥与订阅

事先的例证都基本都以一对一的音信发送和接到,即音信只能发送到钦定的queue里,但有点时候你想让您的信息被抱有的queue收到,类似广播的成效,那时候就要用到exchange了。PS:风乐趣的垂询redis的公布与订阅,能够看看自家写的博客python之redis。

An exchange is a very simple thing. On
one side it receives messages from producers and the other side it
pushes them to queues. The exchange must know exactly what to do with a
message it receives. Should it be appended to a particular queue? Should
it be appended to many queues? Or should it get discarded(丢弃). The
rules for that are defined by the exchange type.

Exchange在概念的时候是有档次的,以调控到底是怎样Queue符合条件,能够收起新闻

 

fanout: 全部bind到此exchange的queue都能够接收新闻

direct: 通过routingKey和exchange决定的要命唯1的queue能够吸收接纳新闻

topic:全数符合routingKey(此时能够是二个表明式)的routingKey所bind的queue能够接到新闻

 

表达式符号表明: #意味着2个或八个字符,*代表任何字符
     
    例:#.a会匹配a.a,aa.a,aaa.a等
               
*.a会匹配a.a,b.a,c.a等
          
 注:使用RoutingKey为#,Exchange
Type为topic的时候一定于选拔fanout

 

上边笔者分别讲下fanout,direct,topic:

1、fanout

fanout: 全数bind到此exchange的queue都足以吸收接纳音讯

金沙注册送58 46

send端:

金沙注册送58 47金沙注册送58 48

 1 import pika
 2 import sys
 3 
 4 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel=connection.channel()
 6 
 7 channel.exchange_declare(exchange='logs',
 8                       type='fanout')
 9 
10 message=''.join(sys.argv[1:])or"info:HelloWorld!"
11 channel.basic_publish(exchange='logs',
12                       routing_key='',  #fanout的话为空(默认)
13                       body=message)
14 print("[x]Sent%r"%message)
15 connection.close()

View Code

receive端:

金沙注册送58 49金沙注册送58 50

 1 import pika
 2 
 3 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel=connection.channel()
 5 
 6 channel.exchange_declare(exchange='logs',type='fanout')
 7 
 8 #不指定queue名字(为了收广播),rabbit会随机分配一个queue名字,
 9 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
10 result=channel.queue_declare(exclusive=True)
11 queue_name=result.method.queue
12 
13 #把声明的queue绑定到交换器exchange上
14 channel.queue_bind(exchange='logs',
15                 queue=queue_name)
16 
17 print('[*]Waitingforlogs.ToexitpressCTRL+C')
18 
19 def callback(ch,method,properties,body):
20     print("[x]%r"%body)
21 
22 
23 channel.basic_consume(callback,
24                       queue=queue_name,
25                       no_ack=True)
26 
27 channel.start_consuming()

View Code

有七个点要留心下:

  • fanout-广播,send端的routing_key=”, #fanout的话为空(私下认可)

  • receive端有一句代码:result=channel.queue_declare(exclusive=True),作用:不内定queue名字(为了收广播),rabbitMQ会随机分配三个queue名字,exclusive=True会在采纳此queue的消费者断开后,自动将queue删除。

 

二、有接纳的收到新闻(exchange
type=direct)

RabbitMQ还协助依照首要字发送,即:队列绑定关键字,发送者将数据根据重大字发送到音信exchange,exchange遵照 关键字
决断应该将数据发送至钦点队列。

金沙注册送58 51

send端:

金沙注册送58 52金沙注册送58 53

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localh'))ost
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='direct_logs',
 9                          type='direct')
10  
11 severity = sys.argv[1] iflen(sys.argv) > 1 else 'info'
12 message = ' '.join(sys.argv[2:]) or'Hello World!'
13 channel.basic_publish(exchange='direct_logs',
14                       routing_key=severity, #关键字不为空,告知消息发送到哪里(info,error~)
15                       body=message)
16 print(" [x] Sent %r:%r" % (severity, message))
17 connection.close()

View Code

receive端:

金沙注册送58 54金沙注册送58 55

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='direct_logs',
 9                          type='direct')
10  
11 result =channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13  
14 severities = sys.argv[1:]
15 if not severities:
16     sys.stderr.write("Usage: %s [info] [warning] [error]\n" %sys.argv[0])
17     sys.exit(1)
18  
19 for severity in severities:
20     channel.queue_bind(exchange='direct_logs',
21                        queue=queue_name,
22                        routing_key=severity)
23  
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25  
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" %(method.routing_key, body))
28  
29 channel.basic_consume(callback,
30                       queue=queue_name,
31                       no_ack=True)
32  
33 channel.start_consuming()

View Code

事实上最初叶作者看代码是一脸懵逼的~
下边是本身在cmd实行测试的截图(同盟着截图看会轻松了然些),一个send端,多少个receive端(先起receive端,再起receive端):

send端:

金沙注册送58 56

receive端-1:

金沙注册送58 57

receive端-2:

金沙注册送58 58

 

三、更密切的音信过滤topic(供参考)

Although using the direct exchange
improved our system, it still has limitations – it can’t do routing
based on multiple criteria.

In our logging system we might want to
subscribe to not only logs based on severity, but also based on the
source which emitted the log. You might know this concept from
the syslog unix tool, which routes logs based on both severity
(info/warn/crit…) and facility (auth/cron/kern…).

That would give us a lot of flexibility –
we may want to listen to just critical errors coming from ‘cron’ but
also all logs from ‘kern’.

倍感本身英文水准不高啊~,作者相比着垃圾有道翻译,加上自身的精晓,大约知道地点在讲什么。

举例:
假如是系统的失实,就把新闻发送到A,要是是MySQL的不当,就把音讯发送到B。可是对B来讲,想完结接收MySQL的错误新闻,能够用有采用的接受新闻(exchange type=direct),让主要字为error就落实了呀!今后B有个要求:不是享有的错误信息都收下,只收到钦定的荒唐。在某种音讯再开始展览过滤,那就是越来越细致的信息过滤topic。

 

send端:

金沙注册送58 59金沙注册送58 60

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='topic_logs',
 9                          type='topic')  #类型为topic
10  
11 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
13 channel.basic_publish(exchange='topic_logs',
14                       routing_key=routing_key,
15                       body=message)
16 print(" [x] Sent %r:%r" % (routing_key, message))
17 connection.close()

View Code

receive端:

金沙注册送58 61金沙注册送58 62

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='topic_logs',
 9                          type='topic')
10  
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13  
14 binding_keys = sys.argv[1:]
15 if not binding_keys:
16     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
17     sys.exit(1)
18  
19 for binding_key in binding_keys:
20     channel.queue_bind(exchange='topic_logs',
21                        queue=queue_name,
22                        routing_key=binding_key)
23  
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25  
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28  
29 channel.basic_consume(callback,
30                       queue=queue_name,
31                       no_ack=True)
32  
33 channel.start_consuming()

View Code

 

 

python安装RabbitMQ模块

六、RPC(Remote Procedure Call)

RPC的定义可看笔者百度的(其实就像小编在此之前做的FTP,作者从客户端发1个命令,服务端重临相关音信):

金沙注册送58 63金沙注册送58 64

RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

View Code

下边重点讲下RPC通讯,小编刚开首学挺难的,学完之后以为RPC通讯的思辨很有启发性,代码的例证写得也很牛!!

金沙注册送58 65

client端发的新闻被server端接收后,server端会调用callback函数,实行职务后,还亟需把相应的新闻发送到client,可是server怎么样将音讯发还给client?若是有多个client连接server,server又怎么领会是要发放哪个client??

RPC-server暗中认可监听rpc_queue.确定无法把要发放client端的新闻发到rpc_queue吧(rpc_queue是监听client端发到server端的多少)。

合理的方案是server端另起二个queue,通过queue将消息再次来到给对应client。但难点又来了,queue是server端起的,故client端断定不明了queue_name,连queue_name都不亮堂,client端接收毛线的多寡??

缓解方式:

客户端在出殡和埋葬指令的同时告诉服务端:义务实行完后,数据经过某队列重返结果。客户端监听该队列就OK了。

client端:

 1 import pika
 2 import uuid
 3 
 4 
 5 class FibonacciRpcClient(object):
 6     def __init__(self):
 7         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 8 
 9         self.channel = self.connection.channel()
10         #随机建立一个queue,为了监听返回的结果
11         result = self.channel.queue_declare(exclusive=True)
12         self.callback_queue = result.method.queue   ##队列名
13 
14         self.channel.basic_consume(self.on_response,  #一接收客户端发来的指令就调用回调函数on_response
15                                    no_ack=True,
16                                    queue=self.callback_queue)
17 
18     def on_response(self, ch, method, props, body):  #回调
19         #每条指令执行的速度可能不一样,指令1比指令2先发送,但可能指令2的执行结果比指令1先返回到客户端,
20         #此时如果没有下面的判断,客户端就会把指令2的结果误认为指令1执行的结果
21         if self.corr_id == props.correlation_id:
22             self.response = body
23 
24     def call(self, n):
25         self.response = None    ##指令执行后返回的消息
26         self.corr_id = str(uuid.uuid4())   ##可用来标识指令(顺序)
27         self.channel.basic_publish(exchange='',
28                                    routing_key='rpc_queue', #client发送指令,发到rpc_queue
29                                    properties=pika.BasicProperties(
30                                        reply_to=self.callback_queue, #将指令执行结果返回到reply_to队列
31                                        correlation_id=self.corr_id,
32                                    ),
33                                    body=str(n))
34         while self.response is None:
35             self.connection.process_data_events() #去queue接收数据(不阻塞)
36         return int(self.response)
37 
38 
39 fibonacci_rpc = FibonacciRpcClient()
40 
41 print(" [x] Requesting fib(30)")
42 response = fibonacci_rpc.call(30)
43 print(" [.] Got %r" % response)

server端:

 1 import pika
 2 import time
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     host='localhost'))
 6 
 7 channel = connection.channel()
 8 
 9 channel.queue_declare(queue='rpc_queue')
10 
11 
12 def fib(n):
13     if n == 0:
14         return 0
15     elif n == 1:
16         return 1
17     else:
18         return fib(n - 1) + fib(n - 2)
19 
20 
21 def on_request(ch, method, props, body):
22     n = int(body)
23 
24     print(" [.] fib(%s)" % n)
25     response = fib(n)  #从客户端收到的消息
26 
27     ch.basic_publish(exchange='',   ##服务端发送返回的数据到props.reply_to队列(客户端发送指令时声明)
28                      routing_key=props.reply_to,  #correlation_id (随机数)每条指令都有随机独立的标识符
29                      properties=pika.BasicProperties(correlation_id= \
30                                                          props.correlation_id),
31                      body=str(response))
32     ch.basic_ack(delivery_tag=method.delivery_tag)  #客户端持久化
33 
34 
35 channel.basic_qos(prefetch_count=1)  #公平分发
36 channel.basic_consume(on_request,    #一接收到消息就调用on_request
37                       queue='rpc_queue')
38 
39 print(" [x] Awaiting RPC requests")
40 channel.start_consuming()

 

转折申明出处: 

pip install pika
or
easy_install pika
or
源码

https://pypi.python.org/pypi/pika

二、落成最简易的类别通信

发送端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

接收端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print(ch,method,properties)
    #ch:<pika.adapters.blocking_connection.BlockingChannel object at 0x002E6C90>    管道内存对象地址
    #methon:<Basic.Deliver(['consumer_tag=ctag1.03d155a851b146f19cee393ff1a7ae38',   #具体信息
            # 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=lzl'])>
    #properties:<BasicProperties>
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)   #接受到消息后不返回ack,无论本地是否处理完消息都会在队列中消失
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

注:windows连linux上的rabbitMQ会合世报错,须求提供用户名密码

三、RabbitMQ信息分发轮询

先运行音信生产者,然后再各自运维三个顾客,通过生产者多发送几条消息,你会发觉,这几条音信会被依次分配到各种消费者身上

金沙注册送58 66

 

在那种情势下,RabbitMQ会暗中同意把p发的音信公平的逐壹分发给各类消费者(c),跟负载均衡大约

金沙注册送58 67金沙注册送58 68

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

pubulish.py

金沙注册送58 69金沙注册送58 70

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print(ch,method,properties)
    #ch:<pika.adapters.blocking_connection.BlockingChannel object at 0x002E6C90>    管道内存对象地址
    #methon:<Basic.Deliver(['consumer_tag=ctag1.03d155a851b146f19cee393ff1a7ae38',   #具体信息
            # 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=lzl'])>
    #properties:<BasicProperties>
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

经过施行pubulish.py和consume.py能够兑现地点的信息公平分发,这假诺c一收受消息之后宕机了,会产出什么动静吗?rabbitMQ是怎么着处理的?以往大家模拟一下

金沙注册送58 71金沙注册送58 72

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

publish.py

金沙注册送58 73金沙注册送58 74

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

在consume.py的callback函数里增添了time.sleep模拟函数处理,通过上面程序开始展览模拟发现,c1接受到新闻后未有拍卖完突然宕机,消息就从队列上海消防灭了,rabbitMQ把音信删除掉了;假如程序需求新闻必供给拍卖完本领从队列里删除,那大家就必要对程序举办拍卖一下

金沙注册送58 75金沙注册送58 76

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

publish.py

金沙注册送58 77金沙注册送58 78

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    #time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

由此把consume.py接收端里的no_ack``=``True去掉之后并在callback函数里面添加ch.basic_ack(delivery_tag ``= method.delivery_tag,就足以兑现音信不被拍卖完不可能在队列里清除

查阅新闻队列数:

金沙注册送58 79

4、消息持久化

假使音讯在传输进度中rabbitMQ服务器宕机了,会发觉在此以前的消息队列就不设有了,那时我们就要用到新闻持久化,新闻持久化会让队列不随着服务器宕机而消亡,会永恒的保留下来

发送端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl',durable=True)    #队列持久化

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode = 2     #消息持久化
                      )
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

接收端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl',durable=True)

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

5、新闻公平分发

万壹Rabbit只管按梯次把新闻发到各样消费者身上,不思考消费者负载的话,很大概出现,3个机器配置不高的消费者那里堆积了数不尽音信处理不完,同时布署高的主顾却从来很自在。为杀鸡取卵此主题素材,能够在各类消费者端,配置perfetch=一,意思正是报告RabbitMQ在小编那几个消费者当前音信还没处理完的时候就不要再给小编发新新闻了

金沙注册送58 80

channel.basic_qos(prefetch_count=1)

带音讯持久化+公平分发

金沙注册送58 81金沙注册送58 82

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl',durable=True)    #队列持久化

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode = 2     #消息持久化
                      )
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

pubulish.py

金沙注册送58 83金沙注册送58 84

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl',durable=True)

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

6、Publish\Subscribe(音信发布\订阅) 

此前的例子都基本都是一对一的消息发送和接受,即音信只可以发送到钦赐的queue里,但有点时候你想让你的音讯被有着的Queue收到,类似广播的功效,那时候将要用到exchange了,

An exchange is a very simple thing. On one
side it receives messages from producers and the other side it pushes
them to queues. The exchange must know exactly what to do with a message
it receives. Should it be appended to a particular queue? Should it be
appended to many queues? Or should it get discarded. The rules for that
are defined by the exchange type.

Exchange在概念的时候是有档次的,以调整到底是如何Queue符合条件,能够接收消息

fanout: 全部bind到此exchange的queue都能够吸收接纳新闻
direct: 通过routingKey和exchange决定的要命唯①的queue能够收起消息
topic:全部符合routingKey(此时得以是3个表明式)的routingKey所bind的queue还可以新闻

headers: 通过headers
来决定把音信发给哪些queue

表明式符号表明:#代表三个或多个字符,*表示任何字符

     
 例:#.a会匹配a.a,aa.a,aaa.a等
           
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange
Type为topic的时候一定于选择fanout 

①fanout收取全数广播:广播表示近日信息是实时的,假诺未有三个消费者在经受音讯,音信就会丢掉,在此间消费者的no_ack已经无用,因为fanout不会管你处理音信甘休未有,发过的新闻不会重发,记住广播是实时的

金沙注册送58 85

 

金沙注册送58 86金沙注册送58 87

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',   #广播不用声明queue
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

publish.py

金沙注册送58 88金沙注册送58 89

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,
                                                # exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue

channel.queue_bind(exchange='logs',         # 绑定转发器,收转发器上面的数据
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

consume.py

二有选取的吸收音信 direct:
 同fanout同样,
no_ack在此要设置为True,不然队列里多少不会清空(即便也不会重发)**

RabbitMQ还帮助依据首要字发送,即:队列绑定关键字,发送者将数据依照重大字发送到新闻exchange,exchange遵照关键字 判断应该将数据发送至钦命队列

金沙注册送58 90

 

金沙注册送58 91金沙注册送58 92

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

publish.py

金沙注册送58 93金沙注册送58 94

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

consume.py

三更加细致的音信过滤 topic:

Although using the direct exchange improved our system, it still has
limitations – it can’t do routing based on multiple
criteria.

In our logging system we might want to
subscribe to not only logs based on severity, but also based on the
source which emitted the log. You might know this concept from
the syslog unix
tool, which routes logs based on both severity (info/warn/crit…) and
facility (auth/cron/kern…).

That would give us a lot of flexibility –
we may want to listen to just critical errors coming from ‘cron’ but
also all logs from ‘kern’

金沙注册送58 95

金沙注册送58 96金沙注册送58 97

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

publish.py

金沙注册送58 98金沙注册送58 99

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

consume.py

 

RPC(Remote procedure
call )双向通讯

To illustrate how an RPC service could be
used we’re going to create a simple client class. It’s going to expose a
method named call which sends an RPC request and
blocks until the answer is received:

金沙注册送58 100

rpc client:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika
import uuid,time


class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, #只要收到消息就执行on_response
                                   no_ack=True,     #不用ack确认
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:    #验证码核对
            self.response = body


    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        print(self.corr_id)
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,    #发送返回信息的队列name
                                       correlation_id=self.corr_id,     #发送uuid 相当于验证码
                                   ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()   #非阻塞版的start_consuming
            print("no messages")
            time.sleep(0.5)     #测试
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()    #实例化
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)       #执行call方法
print(" [.] Got %r" % response)

rpc server:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,    #回信息队列名
                     properties=pika.BasicProperties(correlation_id=
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


#channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,
                      queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()

 

相关文章

网站地图xml地图