多进程

进度之间是相互独立的,python是开发银行进度的时候,是开发银行的是原生进度。进度是尚未GIL锁的,而且不设有锁的概念,进程之间的数据式不能够共享的,而线程是足以的。

当四线程成立完结之后,start并未了当时运维,依然需求和别的线程抢CPU的身价,只是
时间不够长。
经过之间的通讯分为三种,queue和pipe

写一个game 循环
game
loop是各种游戏的中央.它不停的收获用户输入,更新游戏状态,渲染游戏结果到显示器上.网络电子游艺分为客户端和服务端两部分.两边的loop通过互连网连接起来.平日状态下,客户端获取用户输入,发送到服务端,服务端管理总结数据,更新游戏者状态,发送结果个客户端.举例游戏者只怕游戏物体的地方.分外重大的是,不要把客户端和服务端的成效混淆了,假使没有丰富的说辞的话.
假诺在客户端做游戏总括,那么分歧的客户端13分轻便就不联合了.

进程

壹、进度的概念

用muliprocessing那几个包中的Process来定义多进度,跟定义多线程类似

from multiprocessing import Process   # 导入进程模块
import time


def run(name):
    time.sleep(2)
    print("hello", name)

if __name__ == "__main__":
    p_obj_list = list()    # 存放进程对象
    for i in range(10):    # 启动10个进程
        p = Process(target=run, args=("QQ{0}".format(i),))  # 产生一个进程实例
        p.start()   # 启动进程
        p_obj_list.append(p)

    for p in p_obj_list:
        p.join()   # 等待进程结果
 1 import multiprocessing
 2 def foo(q):
 3     q.put([1,'hello',True])
 4 if __name__=='__main__':
 5     q=multiprocessing.Queue()#通过multiprocessing建立一个队列
 6     p=multiprocessing.Process(target=foo,args=(q,))
 7   #用multiprocessing在调用Process,建立一个子进程,定义函数名,将q作为参数传到foo函数,
 8     #foo函数就可以通过这个参数来与主进程做交互了。
 9     p.start()#激活这个子进程
10     print(q.get())#主进程

A game loop iteration is often called a tick. Tick is an event
meaning that current game loop iteration is over and the data for the
next frame(s) is ready.

一.含义:Computer中的程序关于某数码集结上的贰遍运营活动,是系统开展资源分配和调节的骨干单位。说白了就是贰个先后的实施实例。

执行八个主次正是3个历程,举例您打开浏览器看到本身的博客,浏览器本人是2个软件程序,你此时展开的浏览器正是三个进度。

 

二、进程中参与线程

from multiprocessing import Process
import time,threading


def thread_run(name):   # 定义线程执行的方法
    print("{0}:{1}".format(name, threading.get_ident()))  # thread.get_ident ()返回当前线程的标识符,标识符是一个非零整数


def run(name):
    time.sleep(2)
    print("hello", name)
    t = threading.Thread(target=thread_run, args=(name,))   # 嵌入线程
    t.start()   # 执行线程


if __name__ == "__main__":
    p_obj_list = list()
    for i in range(10):
        p = Process(target=run, args=("QQ{0}".format(i),))
        p.start()
        p_obj_list.append(p)

    for p in p_obj_list:
        p.join()

上边函数通过multiprocessing的queue来达成进程间通讯。

在下多少个事例中,我们写贰个客户端,那么些客户端通过WebSocket连接服务器,同时运营多个简约的loop,接受输入发送给服务器,回显音信.Client
source code is located
here.

二.经过的特色

  • ### 多少个进程里能够有多少个子进度

  • ### 新的长河的创设是完全拷贝整个主进度

  • ### 进度里能够包含线程

  • ### 进程之间(包涵主进程和子进度)不存在多少共享,互相通讯(浏览器和python之间的数码不能够互通的),要通讯则要借助队列,管道之类的

 

三、老爹和儿子进程

每一个子进度都以由一个父进度运营的,各样程序也是有三个父进度

from multiprocessing import Process
import os


def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())  # 获得父进程ID
    print('process id:', os.getpid())  # 获得子进程ID
    print("\n\n")


def f(name):
    info('\033[31;1m function f\033[0m')
    print('hello', name)

if __name__ == '__main__':
    info('\033[32;1m main process line\033[0m')
    p = Process(target=f, args=('QQ',))
    p.start()
    p.join()

  

 

 

 1 from multiprocessing import  Pipe,Process
 2 def foo(sk):
 3     sk.send('hello world')#通过管道sk发送内容
 4     print(sk.recv())#打印接收到的内容
 5 if __name__ == '__main__':
 6     sock,conn=Pipe()#定义一个管道的两头
 7     p=Process(target=foo,args=(sock,))#由于上面已经通过multiprocessing导入了Process,
 8     # 所以这里直接就可以创建一个子进程,并将sock(管道的一头)作为参数给foo函数
 9     p.start()#激活这个进程
10     print(conn.recv())#打印接收到的内容,conn是管道的另一头
11     conn.send('hi son')#通过管道发送内容

3.1

Example 3.1 source
code

大家选用aiohttp来创造多个game
server.这几个库能够创设asyncio的client和server.这几个库的益处是还要帮忙http请求和websocket.所以服务器就不需求把结果管理成html了.
来看一下server如何运营:

import asyncio
from aiohttp import web

async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


async def wshandler(request):
    app = request.app
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    app["sockets"].append(ws)

    while 1:
        msg = await ws.receive()
        if msg.tp == web.MsgType.text:
            print("Got message %s" % msg.data)
            ws.send_str("Pressed key code: {}".format(msg.data))
        elif msg.tp == web.MsgType.close or\
             msg.tp == web.MsgType.error:
            break

    app["sockets"].remove(ws)
    print("Closed connection")
    return ws

async def game_loop(app):
    while 1:
        for ws in app["sockets"]:
            ws.send_str("game loop says: tick")
        await asyncio.sleep(2)


app = web.Application()
app["sockets"] = []

asyncio.ensure_future(game_loop(app))

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

其一代码就不翻译了,

三.历程和线程之间的界别

  • ### 线程共享地址空间,而经过之间有互相独立的空中

  • ### 线程之间数据互通,互相操作,而经过不可能

  • ### 新的线程比新的进程创制轻松,比开进程的成本小多数

  • ### 主线程能够影响子线程,而主进程不能影响子进度

 

 

进度间数据交互与共享

知道区别进程之间内部存款和储蓄器是不共享的,要想达成七个进度间的通讯供给选择multiprocessing库中的queue(队列)模块,这几个multiprocessing库中的queue模块跟单纯的queue库是区别样的。进度导入前者(那里的queue是特地为经过之间的通信设计的)不不可信,导入后者(那里的queue首假诺线程间数据交互)出错。

地方代码通过Pipe来贯彻三个进程间的通讯。

三.贰 有请求才开端loop

地方的例证,server是不停的loop.以往改成有请求才loop.
同时,server上或许存在多个room.1个player创立了三个session(一场较量依旧二个别本?),其余的player能够参与.

import asyncio
from aiohttp import web

async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


async def wshandler(request):
    app = request.app
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    app["sockets"].append(ws)

    if app["game_is_running"] == False:
        asyncio.ensure_future(game_loop(app))
    while 1:
        msg = await ws.receive()
        if msg.tp == web.MsgType.text:
            print("Got message %s" % msg.data)
            ws.send_str("Pressed key code: {}".format(msg.data))
        elif msg.tp == web.MsgType.close or\
             msg.tp == web.MsgType.error:
            break

    app["sockets"].remove(ws)
    print("Closed connection")

    return ws

async def game_loop(app):
    app["game_is_running"] = True
    while 1:
        for ws in app["sockets"]:
            ws.send_str("game loop says: tick")
        if len(app["sockets"]) == 0:
            break
        await asyncio.sleep(2)
    app["game_is_running"] = False


app = web.Application()

app["sockets"] = []
app["game_is_running"] = False

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

肆.在python中,进度与线程的用法就只是名字差别,使用的主意也是没多大差别

一、线程访问queue

import queue,threading


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = queue.Queue()   # 把这个q传给了子线程
    p = threading.Thread(target=f, args=(q,))   # 子线程访问父线程的q
    p.start()
    print(q.get())
    p.join()

#执行结果
[66, None, 'hello word']
 1 from multiprocessing import  Manager,Process
 2 def foo(l,i):#收到参数,l是Mlist,i是循环的i
 3     l.append(i*i)#将i平方添加到Mlist
 4 if __name__=='__main__':
 5     manager=Manager()
 6     Mlist=manager.list([11,22,33])#定义一个列表
 7 
 8     l=[]
 9     for i in range(5):#创建5个子进程
10         p=Process(target=foo,args=(Mlist,i))#定义一个进程,将Mlist和i作为参数传到foo
11         p.start()#激活这个进程,执行foo函数
12         l.append(p)#将5个进程添加到l这个列表
13     for i in l:
14         i.join()#循环这个列表,然后将每个进程join
15     print(Mlist)#当所有的子进程都结束,运行主进程

3.3 管理task

直白操作task对象.未有人的时候,能够cancel掉task.
注意!:
This cancel()
call tells scheduler not to pass execution to this coroutine anymore and
sets its state tocancelled
which then can be checked by cancelled()
method. And here is one caveat worth to mention: when you have external
references to a task object and exception happens in this task, this
exception will not be raised. Instead, an exception is set to this task
and may be checked by exception()
method. Such silent fails are not useful when debugging a code. Thus,
you may want to raise all exceptions instead. To do so you need to call
result()
method of unfinished task explicitly. This can be done in a callback:

若果想要cancel掉,也不想触发exception,那么就检查一下canceled状态.
app["game_loop"].add_done_callback(lambda t: t.result() if not t.cancelled() else None)

伍.简短实例

一)创设3个简便的多进度:

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time

def func(name):
    time.sleep(1)
    print('hello',name,time.ctime())

ml = []
for i in range(3):
    p = multiprocessing.Process(target=func,args=('yang',))
    p.start()
    ml.append(p)

for i in ml:
    i.join() #注意这里,进程必须加join方法,不然会导致僵尸进程

  

运行结果:

金沙注册送58 1

 

不管怎么说,反正报错了,一样的代码,在python自带的IDLE里搜求:

金沙注册送58 2

从未任何事物就截止了。好的,那里要说下了,根据本人个人的通晓,当你用pycharm只怕IDLE时,pycharm恐怕IDLE在您的微机里自身也是1个经过,并且私下认可是主进度。所以在pycharm会报错,而在IDLE里运转就是空手,个人通晓,对不对目前不谈,前期学到子进度时再说。

 

消除办法便是,其余的不改变,加3个if __name == ‘__经过间通讯,进度锁和进度池的行使。main__’剖断就行:

金沙注册送58 3

 

诸如此类就消除了,好的,你未来得以回味到那句话了,经过与线程的用法就只是名字分歧,使用的法子也是没多大差别。不多说,自行体会。而运作结果看到的年华是一道的,那么那进程才是当真含义上的交互运营。

 

2)自定义类式进度

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time

class myprocess(multiprocessing.Process):
    def __init__(self,name):
        super(myprocess,self).__init__()
        self.name = name

    def run(self):
        time.sleep(1)
        print('hello',self.name,time.ctime())

if __name__ == '__main__':
    ml = []
    for i in range(3):
        p = myprocess('yang')
        p.start()
        ml.append(p)

    for j in ml:
        j.join()

  

运维结果:

金沙注册送58 4

 

 

下一场setDaemon之类的办法和线程也是完全1致的。

 

叁)每叁个进度都有根进程,换句话,每三个进度都有父进程

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time,os

def info():
    print('mudule name:',__name__)
    print('parent process:',os.getppid()) #父进程号
    print('son process:',os.getpid())     #子进程号

if __name__ == '__main__':
    info()
    print('-----')
    p = multiprocessing.Process(target=info,args=[])
    p.start()
    p.join()

  

运营结果:

 

金沙注册送58 5

 

而查看本人本机的历程:

金沙注册送58 6

 

能够精通,620四就是pycharm,就是那时的根进度,而主进度就是自家那么些py文件(由__main__能够),接着再往下的子进度等等等的。

 

二、进度访问queue

from multiprocessing import Process
import queue


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = queue.Queue()   # 把这个q传给了子线程
    p = Process(target=f, args=(q,))   # 子线程访问父线程的q
    p.start()
    print(q.get())
    p.join()

#执行结果
Traceback (most recent call last):
  File "C:/Users/dell/PycharmProjects/untitled/process/进程的定义.py", line 77, in <module>
    p.start()
  File "C:\Python36\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\Python36\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Python36\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\Python36\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Python36\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

上边代码通过Manger完成子进度间的通讯。

三.4 等待多个事件

Example 3.4 source
code
在无数情形下,供给在服务器管理客户端的handler中,
等待多少个事件.除了等候客户端的新闻,可能还索要等待不相同的音信产生.举例,
游戏1局的岁月到了,须要八个timer的能量信号.也许,须要别的进度的音信,只怕别的server的音信.(使用分布式音讯系统).
下边这么些事例使用了Condition.那里不保留全体的socket,而是在每一趟循环结束通过Condition.notify_all来通告.这么些应用pub/sub情势达成.
为了在二个handler中,等待八个事件,首先大家运用ensure_future来包装一下.

if not recv_task: 
  recv_task = asyncio.ensure_future(ws.receive())
if not tick_task: 
  await tick.acquire() 
  tick_task = asyncio.ensure_future(tick.wait())```

在我们调用Condition.call之前,我们需要获取一下锁.这个锁在调用了tick.wait之后就释放掉.这样其他的协程也可以用了.但是当我们得到一个notification, 会重新获取锁.所以我们在收到notification之后要release一下.

done, pending = await asyncio.wait( [recv_task, tick_task],
return_when=asyncio.FIRST_COMPLETED)“`
其1会阻塞住直到有1个职责完毕,那年会回去八个列表,完成的和如故在运作的.假诺task
is done,大家再设置为None,那样下一个循环里会再贰次创造.

import asyncio
from aiohttp import web

async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)



tick = asyncio.Condition()

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    recv_task = None
    tick_task = None
    while 1:
        if not recv_task:
            recv_task = asyncio.ensure_future(ws.receive())
        if not tick_task:
            await tick.acquire()
            tick_task = asyncio.ensure_future(tick.wait())

        done, pending = await asyncio.wait(
            [recv_task,
             tick_task],
            return_when=asyncio.FIRST_COMPLETED)

        if recv_task in done:
            msg = recv_task.result()
            if msg.tp == web.MsgType.text:
                print("Got message %s" % msg.data)
                ws.send_str("Pressed key code: {}".format(msg.data))
            elif msg.tp == web.MsgType.close or\
                 msg.tp == web.MsgType.error:
                break
            recv_task = None

        if tick_task in done:
            ws.send_str("game loop ticks")
            tick.release()
            tick_task = None

    return ws

async def game_loop():
    while 1:
        await tick.acquire()
        tick.notify_all()
        tick.release()
        await asyncio.sleep(1)

asyncio.ensure_future(game_loop())

app = web.Application()

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

(这些重中之重是asyncio.Condition的用法)

6.多进度间的通讯和数目共享

率先大家都曾经清楚进度之间是独立的,不能够互通,并且数据交互独立,而在事实上付出中,一定会遇见供给进程间通信的意况须求,那么大家怎么搞呢

有三种格局:

  • pipe
  • queue

1)使用queue通信

在二十多线程那里已经学过queue了,成立queue的章程,q =
queue.Queue(),这种创造是开创的线程queue,并不是进度queue。创立进度queue的艺术是:

金沙注册送58 7

 

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(q,name,age): #这里必须要把q对象作为参数传入才能实现进程之间通信
    q.put({'name':name,'age':age})

if __name__ == '__main__':
    q = multiprocessing.Queue() #创建进程queue对象
    ml = []
    for i in range(3):
        p = multiprocessing.Process(target=func,args=(q,'yang',21))
        p.start()
        ml.append(p)
    print(q.get()) #获取queue信息
    print(q.get()) 
    print(q.get())
    for i in ml:
        i.join()

  

运作结果:

金沙注册送58 8

 

好的,已经因此queue实现通讯,那么细心的对象或者会想,此时的queue到底是同三个吗依旧copy的吗?开首测试,码如下:

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(q,name,age):
    q.put({'name':name,'age':age})
    print('id:',id(q))
if __name__ == '__main__':
    q = multiprocessing.Queue()
    ml = []
    print('id:',id(q))
    for i in range(3):
        p = multiprocessing.Process(target=func,args=(q,'yang',21))
        p.start()
        ml.append(p)
    print(q.get())
    print(q.get())
    print(q.get())
    for i in ml:
        i.join()

  

在Windows平台运营结果:

金沙注册送58 9

 

Linux的ubuntu下是这么的:

金沙注册送58 10

 

那就倒霉怎么说了,作者个人的了然,线程和进度那类与Computer硬件(CPU,RAM)等有联系的都有不鲜明因素,姑且感觉在Windows平台里queue是copy的,在Linux里是同1个呢,并且据经验人员表示,在macbook上也是同二个。

 

还有个难点, 要是使用的queue是线程式的吧?

代码别的都没变,只改了此处:

金沙注册送58 11

 

结果:

金沙注册送58 12

 

虽说报错了,不过却有二个关键点,提醒的是无法pickle线程锁对象,也正是说刚才大家应用的queue是进度对象,所以能够pickle,注意了,那里正是关键点,使用了pickle,那么约等于说,在Windows平台里是copy的,假设不是copy,就不必要存在pickle对啊?直接拿来用正是呀,干嘛要pickle之后取的时候再反pickle呢对吗?

 

再看Linux下吧,由于Linux暗中同意是python2,所以模块包名稍微有点分歧

金沙注册送58 13

结果阻塞住了,可是后边的要么出来了,看到的id果然照旧一如既往的。

 

此地就有三点供给注意:(个人了然,如有误望指正)

1.进程里的确不能够使用线程式queue

二.Windows平台的进度式queue是copy的

3.Linux平台的线程式和进度式都以同多少个,不过借使在进程里使用线程式queue会阻塞住

但笔者个人认为copy更有安全性

 

2)使用pipe通信

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(conn):
    conn.send('约吗?')  #子进程发送数据
    print(conn.recv())  #接受数据,不能加参数1024之类的
    conn.close()        #子进程关闭连接
if __name__ == '__main__':
    parent_conn,son_conn = multiprocessing.Pipe() #创建pipe对象,父进程,子进程
    ml = []
    p = multiprocessing.Process(target=func,args=(son_conn,))
    p.start()
    print(parent_conn.recv())  #父进程接受数据,不能加参数1024之类的
    parent_conn.send('不约')    #发送数据
    p.join()                   #join方法是进程特有

 

  

运转结果:

金沙注册送58 14

 

如此那般就联系上了,相信您开掘了,主干和前面包车型客车socket大致,不过唯1的不如是recv()方法不能够加参数,不信的话,你加来尝试

反观线程通讯,相信你会认为进度比线程更有益

 

本来pipe也足以有多少个:

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time

def func(conn):
    conn.send('约吗?')  #子进程发送数据
    print(conn.recv())
    conn.close()        #子进程关闭连接
if __name__ == '__main__':
    parent_conn,son_conn = multiprocessing.Pipe() #创建pipe对象,父进程,子进程
    ml = []
    for i in range(3):
        p = multiprocessing.Process(target=func,args=(son_conn,))
        p.start()
        ml.append(p)
        print(parent_conn.recv())  #父进程接受数据,不能加参数1024之类的
        parent_conn.send('不约')
    for i in ml:
        i.join()

  

运转结果:

金沙注册送58 15

 

7.经过之间数据共享——manager

相比较轻松,就采取了经过里的manager对象下的顺序数据类型,其余的很简短的,小编就不注释了

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(l,d,num):
    l.append(num)
    d[num] = num

if __name__ == '__main__':
    with multiprocessing.Manager() as manager:
        l = manager.list()
        d = manager.dict()
        ml = []
        for i in range(6):
            p = multiprocessing.Process(target=func,args=(l,d,i))
            p.start()
            ml.append(p)
        for i in ml:
            i.join()
        print('d:',d)
        print('l:',l)

  

运营结果:

金沙注册送58 16

 

如此是还是不是就得以完成了数额共享了?

 

好的,进程也剖析完了

 

三、进度访问`multiprocessing库中的Queue模块`

from multiprocessing import Process,Queue


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = Queue()   # 把这个q传给了子线程
    p = Process(target=f, args=(q,))   # 子线程访问父线程的q
    p.start()
    print(q.get())
    p.join()

#执行结果
[66, None, 'hello word']

父进程也正是克隆一个Q,把温馨的Q克隆了一份交给子进程,子进度这年往Q里面放了壹份数据,然后父进度又能实际的收获到。然而你克隆了1份是还是不是就和父进度未有关系了,为何还能够联系在1道啊?不过事实上:等于这三个Q里面包车型客车多少又把它体系化了,系列化到2个中级的地点,类似于翻译,然后反连串化给这么些父进程那边来了,其实那五个Q正是透过pickle来体系化的,不是2个着实的Q。

总结:八个线程之间能够修改一个数据,不加锁,或然就会出错。以往经过中的Queue,是完成了数量的传递,不是在修改同一份数据,只是达成2个进程的多少传给了其余二个进度。

金沙注册送58 17

三.5 和线程一齐使用

Example 3.5 source
code

其一例子,大家把asyncio的loop放到此外二个单独线程中.下面也说过了,因为python的GIL的安排,不容许同时运营四个code.所以使用十二线程来拍卖总计瓶颈的主题素材,并不是3个好主意.然后还有别的3个应用线程原因就是:
如若局部函数恐怕库不帮衬asyncio,那么就会阻塞住主线程的运营.那种情景下唯1的方法就是放在其它多个线程中.

要小心asyncio本身不是threadsafe的,不过提供了多少个函数.call_soon_threadsafe和run_coroutine_threadsafe.
当您运行那几个事例的时候,你会看出notify的线程id正是主线程的id,这是因为notify协程运维在主线程中,sleep运维在别的一个线程,所以不会阻塞住主线程.

import asyncio
from aiohttp import web

from concurrent.futures import ThreadPoolExecutor
import threading
from time import sleep


async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


tick = asyncio.Condition()

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    recv_task = None
    tick_task = None
    while 1:
        if not recv_task:
            recv_task = asyncio.ensure_future(ws.receive())
        if not tick_task:
            await tick.acquire()
            tick_task = asyncio.ensure_future(tick.wait())

        done, pending = await asyncio.wait(
            [recv_task,
             tick_task],
            return_when=asyncio.FIRST_COMPLETED)

        if recv_task in done:
            msg = recv_task.result()
            if msg.tp == web.MsgType.text:
                print("Got message %s" % msg.data)
                ws.send_str("Pressed key code: {}".format(msg.data))
            elif msg.tp == web.MsgType.close or\
                 msg.tp == web.MsgType.error:
                break
            recv_task = None

        if tick_task in done:
            ws.send_str("game loop ticks")
            tick.release()
            tick_task = None

    return ws

def game_loop(asyncio_loop):
    print("Game loop thread id {}".format(threading.get_ident()))
    # a coroutine to run in main thread
    async def notify():
        print("Notify thread id {}".format(threading.get_ident()))
        await tick.acquire()
        tick.notify_all()
        tick.release()

    while 1:
        task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop)
        # blocking the thread
        sleep(1)
        # make sure the task has finished
        task.result()

print("Main thread id {}".format(threading.get_ident()))

asyncio_loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
asyncio_loop.run_in_executor(executor, game_loop, asyncio_loop)

app = web.Application()

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

 

 

叁.陆 多进度和扩展scaling up

三个线程的server可以干活了,不过那一个server唯有二个cpu可用.
为了扩充,大家供给周转多个进度,每一种进程包蕴本人的eventloop.
所以我们需求经过之间通讯的方式.同时在嬉戏世界,日常会有雅量的计量(寻路什么的).那个任务平时不会不慢形成(二个tick内).
在协程中开始展览多量耗时的测算没风趣,因为会阻塞住新闻循环本人.所以在那种处境下,把大批量的计算交给此外的历程就很有不能缺少了
最简便易行的秘诀正是开发银行多少个单线程的server.然后,能够接纳haproxy那样的load
balancer,来把客户端的三番五次分散到差别的经过上去.进城之间的通讯有为数不少方法.一种是基于网络连接,也足以扩充到五个server.以往已经有许多贯彻了新闻和仓库储存系统的框架(基于asyncio).
比方:

aiomcache
for memcached client
aiozmq
for zeroMQ
aioredis
for Redis storage and pub/sub

还有任何的片段乱7八糟,在git上,大多数是aio打头.
使用网络音讯,能够丰裕实用的储存数据,可能交流音讯.不过一旦要拍卖多量实时的多少,而且有恢宏历程通信的气象,就不行了.在那种情景下,贰个更贴切的办法是选拔专门的职业的unix
pipe.asyncio has support for pipes and there is a very low-level
example of the server which uses
pipes
inaiohttp repository.
在这些例子中,大家选拔python的高等级次序的multiprocessing库来触发1个新的进度来进展总结,通过multiprocessing.Queue来开始展览进度间通讯.不幸的是,近年来的multiprocessing完成并不帮忙asyncio.所以阻塞的调用就会阻塞住event
loop.
那便是利用线程的最棒案例.因为大家在此外多少个线程运维multiprocessing的代码.看代码

import asyncio
from aiohttp import web

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import Queue, Process
import os
from time import sleep


async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


tick = asyncio.Condition()

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    recv_task = None
    tick_task = None
    while 1:
        if not recv_task:
            recv_task = asyncio.ensure_future(ws.receive())
        if not tick_task:
            await tick.acquire()
            tick_task = asyncio.ensure_future(tick.wait())

        done, pending = await asyncio.wait(
            [recv_task,
             tick_task],
            return_when=asyncio.FIRST_COMPLETED)

        if recv_task in done:
            msg = recv_task.result()
            if msg.tp == web.MsgType.text:
                print("Got message %s" % msg.data)
                ws.send_str("Pressed key code: {}".format(msg.data))
            elif msg.tp == web.MsgType.close or\
                 msg.tp == web.MsgType.error:
                break
            recv_task = None

        if tick_task in done:
            ws.send_str("game loop ticks")
            tick.release()
            tick_task = None

    return ws

def game_loop(asyncio_loop):
    # coroutine to run in main thread
    async def notify():
        await tick.acquire()
        tick.notify_all()
        tick.release()

    queue = Queue()

    # function to run in a different process
    def worker():
        while 1:
            print("doing heavy calculation in process {}".format(os.getpid()))
            sleep(1)
            queue.put("calculation result")

    Process(target=worker).start()

    while 1:
        # blocks this thread but not main thread with event loop
        result = queue.get()
        print("getting {} in process {}".format(result, os.getpid()))
        task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop)
        task.result()

asyncio_loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
asyncio_loop.run_in_executor(executor, game_loop, asyncio_loop)

app = web.Application()

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

worker()在其它3个经过中运转.包蕴了有个别耗费时间总括,把结果放在queue中.获得结果过后,通告主线程的主eventloop那么些等待的client.这些事例十一分简陋,进度未有适用甘休,同时worker可能需求其余三个queue来输入数据.

Important! If you are going to run anotherasyncio
event loop in a different thread or sub-process created from main
thread/process, you need to create a loop explicitly, using
asyncio.new_event_金沙注册送58,loop()
, otherwise, it will not work.

4、通过Pipe()达成进程间的数额交互,manger达成多少共享

地点的例证是透过进程中的Queue,来打开数量共享的,其实还有1种格局完毕多中国少年共产党享,那就是管道,pipe,以及数据共享manger。

4.1、Pipe()函数

管道函数会再次来到由管道双方连日来的一组连接对象,该管道默许是双向的(双向的)。

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([66, None, 'hello,word'])  # 发送消息给父进程
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 管道生成返回两个实例,是双向的,这边把第1个作为父连接,第2个作为子连接。也可以,两者角色调换一下
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # 接收子进程的消息
    p.join()

4.贰、接受反复和出殡和埋葬多次

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([66, None, 'hello,word'])  # 发送消息给父进程
    conn.send("QQ")  # 发送消息给父进程
    print(conn.recv())   # 接收父进程的消息
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 管道生成返回两个实例,是双向的,这边把第1个作为父连接,第2个作为子连接。也可以,两者角色调换一下
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    print(parent_conn.recv())  # 接收两次
    parent_conn.send("微信")   # 发送给子进程
    p.join()

4.3、manger

manger能够产生多少间的共享。

from multiprocessing import Process, Manager
import os


def f(d, l):
    d[os.getpid()] = os.getpid()
    l.append(os.getpid())
    print(l)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()   # 声明一个字典,这个字典是用manger声明的,不是用dict()声明的
        # manger.dict()是用专门的语法生产一个可在多进程之间进行传递和共享的一个字典
        l = manager.list(range(5))  # 同样声明一个列表
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
        print(d)
        print(l)

线程修改同壹份数据的时候需求加锁,进度修改数据吧:不用加锁,因为这些manger已经帮你加锁了,它就暗许不容许四个经过同时修改1份数据。七个进度未有办法同时修改壹份数据,进度之间是单独的,它本身也要加锁,因为它把温馨的事物还要copy好几份,跟刚刚的不行Queue相同,copy13个字典最后合成二个字典

 

 

 

协程
协程,又叫微线程,实际上正是单线程,通过python语法,或模块来促成产出。
精神上正是二个历程1个线程。

进度锁和进度池的运用

金沙注册送58 18

1、进程锁

因此multiprocessing中的Lock模块来落成进度锁

from multiprocessing import Process,Lock   # 导入进程锁


def f(l, i):
    l.acquire()    # 加锁
    try:
        print("hello word", i)
    finally:
        l.release()   # 释放锁

if __name__ == "__main__":
    lock = Lock()     # 定义锁
    for num in range(10):
        Process(target=f, args=(lock, num,)).start()  # 把锁传入进程中

进程中不是互相独立的吧?为啥还要加锁:即便各样进程都以单身运作的,不过难点来了,它们共享壹块显示屏。这几个锁存在的意思正是显示屏共享。即便经过壹想着打字与印刷数据,而经过②想也想打印数据的状态,就有十分大大概乱套了,然后经过这些锁来决定,去打字与印刷的时候,那几个荧屏只有自个儿独占,导致显示器不会乱。

金沙注册送58 19

2、进程池apply和apply_saync

2.1、appley

一道施行,也便是串行实行的

from multiprocessing import Pool  # 导入进程池模块pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印进程号


if __name__ == "__main__":
    pool = Pool(processes=5)   # 设置进程池个数为5,也可以写成pool = Pool(5),允许进程池同时放入5个进程,并且这5个进程交给cpu去运行
    for i in range(10):
        pool.apply(func=foo, args=(i,))   # 同步执行挂起进程
    print('end')
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

2.2、apply_saync

异步执行,也等于并行实施。

from multiprocessing import Pool  # 导入进程池模块pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印进程号


if __name__ == "__main__":
    pool = Pool(processes=5)   # 设置进程池个数为5,也可以写成pool = Pool(5),允许进程池同时放入5个进程,并且这5个进程交给cpu去运行
    for i in range(10):
        pool.apply_async(func=foo, args=(i,))   # 采用异步方式执行foo函数
    print('end')
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

贰.3、异步下回调函数

程序实行落成之后,再回调过来推行那个Bar函数。

from multiprocessing import Process,Pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印子进程的进程号


def bar(arg):
    print('-->exec done:', arg, os.getpid())   # 打印进程号

if __name__ == "__main__":
    pool = Pool(processes=2)
    print("主进程", os.getpid())   # 主进程的进程号
    for i in range(3):
        pool.apply_async(func=foo, args=(i,), callback=bar)   # 执行回调函数callback=Bar
    print('end')
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

#执行结果
主进程 752
end
in process 2348
-->exec done: None 752
in process 8364
-->exec done: None 752
in process 2348
-->exec done: None 752

注:

  1. 回调函数表明fun=Foo干不完就不试行bar函数,等Foo实施完就去执行Bar
  2. 其2回调函数是主进度去调用的,而不是各种子进度去调用的。
  3. 回调函数的用处:

      比方说你从各类机器上备份完结,在回调函数中自动写2个剧本,说备份完结

  4. 回调函数是主进度调用的来头?

      假使是子进程去调用那一个回调函数,有几个子进度就有多少个三番五次,借使是主进度的话,只必要1次长连接就足以了,这么些功用就高了

  

 

上海教室是用yield落成了四个八个函数逇并发管理。

 1 from greenlet import greenlet#导入这个模块
 2 def foo():#定义一个函数
 3     print('ok1')#打印
 4     gr2.switch()#将程序切换到下面一个函数,按照名字切
 5     print('ok3')#打印
 6     gr2.switch()#将程序切换到下面一个函数,按照名字切
 7 def bar():
 8     print('ok2')#打印
 9     gr1.switch()#切到上面foo函数
10     print('ok4')
11 gr1=greenlet(foo)#实例化这个函数
12 gr2=greenlet(bar)
13 gr1.switch()#在外面写这个就执行了这个函数

由此greenlet模块的switch来贯彻协程的切换,greenlet模块必要手动去pycharm下载

 1 import gevent#导入这个模块
 2 def foo():
 3     print('running in foo')
 4     gevent.sleep(2)#打印之后睡一秒,模拟io操作
 5     print('switch to foo again')
 6 def bar():
 7     print('switch  to bar')
 8     gevent.sleep(1)#打印之后睡一秒,模拟io操作
 9     print('switch to bar again')
10 gevent.joinall([gevent.spawn(foo),gevent.spawn(bar)])
11 '''
12 这个程序的运行过程是,先执行foo函数,
13 打印之后遇到了IO操作,然后自动切换到下一个函数执行,
14 打印之后又遇到了IO操作,然后切回foo函数发现IO2秒还没有结束,
15 然后又切到了bar函数发现IO结束,打印,再切回foo函数打印
16 '''

上面代码通过gevent模块来兑现写成的IO时期机动切换达成产出的次序。
gevent需要从pycharm下载。

金沙注册送58 20

 

相关文章

网站地图xml地图