博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
进程池
阅读量:5088 次
发布时间:2019-06-13

本文共 3960 字,大约阅读时间需要 13 分钟。

如何要有进程池呢?

当需要操作多个文件,或者多个任务的的时候,需要并发多进程来解决,但需要注意几点问题:

1,执行的任务远大于核数

2,操作系统不可能无限的开进程,开多了肯定会卡死

3,进程开的越多,效率越低

考虑到这么多问题,那么我们就来引入进程池的概念吧!

进程池:顾名思义,就是放进程的池子,但还是他会规定个数,然后在后面的进程会利用原有的进程来执行程序,提高效率。

定义一个进程池:

Pool([numprocess  [,initializer [, initargs]]]):创建进程池
1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None3 initargs:是要传给initializer的参数组

主要方法:

1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()2 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。3    4 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成5 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

这里要强调的是:方法1,他是一种串行的执行方式,效率低,建议不要用

应用

from multiprocessing import Poolimport os,timedef work(n):    print('%s run' %os.getpid())    time.sleep(3)    return n**2if __name__ == '__main__':    p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务    res_l=[]    for i in range(10):        res=p.apply(work,args=(i,)) #同步运行,阻塞、直到本次任务执行完毕拿到res        res_l.append(res)    print(res_l)
from multiprocessing import Poolimport os,timedef work(n):    print('%s run' %os.getpid())    time.sleep(3)    return n**2if __name__ == '__main__':    p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务    res_l=[]    for i in range(10):        res=p.apply_async(work,args=(i,)) #同步运行,阻塞、直到本次任务执行完毕拿到res        res_l.append(res)    #异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了    p.close()    p.join()    for res in res_l:        print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
apply_asyn 并行 无阻塞

关于进程在套接字上的应用

#Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count())#开启6个客户端,会发现2个客户端处于等待状态#在每个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程from socket import *from multiprocessing import Poolimport osserver=socket(AF_INET,SOCK_STREAM)server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)server.bind(('127.0.0.1',8080))server.listen(5)def talk(conn,client_addr):    print('进程pid: %s' %os.getpid())    while True:        try:            msg=conn.recv(1024)            if not msg:break            conn.send(msg.upper())        except Exception:            breakif __name__ == '__main__':    p=Pool()    while True:        conn,client_addr=server.accept()        p.apply_async(talk,args=(conn,client_addr))        # p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问
服务端
from socket import *client=socket(AF_INET,SOCK_STREAM)client.connect(('127.0.0.1',8080))while True:    msg=input('>>: ').strip()    if not msg:continue    client.send(msg.encode('utf-8'))    msg=client.recv(1024)    print(msg.decode('utf-8'))
客户端

重要的知识来了:回调函数 关键字:callpack=函数名

需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数

我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。

from multiprocessing import Poolimport requestsimport jsonimport osdef get_page(url):    print('
<进程%s>
get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {
'url':url,'text':respone.text}def pasrse_page(res): print('
<进程%s>
parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res)if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=Pool(3) res_l=[] for url in urls: res=p.apply_async(get_page,args=(url,),callback=pasrse_page) res_l.append(res) p.close() p.join() print([res.get() for res in res_l]) #拿到的是get_page的结果,其实完全没必要拿该结果,该结果已经传给回调函数处理了
回调函数的应用:模拟爬虫

 

转载于:https://www.cnblogs.com/niehaidong111/p/7445317.html

你可能感兴趣的文章
Enterprise Library - Data Access Application Block 6.0.1304
查看>>
重构代码 —— 函数即变量(Replace temp with Query)
查看>>
Bootstrap栅格学习
查看>>
程序员的数学
查看>>
聚合与组合
查看>>
jQuery如何获得select选中的值?input单选radio选中的值
查看>>
设计模式 之 享元模式
查看>>
如何理解汉诺塔
查看>>
洛谷 P2089 烤鸡【DFS递归/10重枚举】
查看>>
15 FFT及其框图实现
查看>>
Linux基本操作
查看>>
osg ifc ifccolumn
查看>>
C++ STL partial_sort
查看>>
3.0.35 platform 设备资源和数据
查看>>
centos redis 安装过程,解决办法
查看>>
IOS小技巧整理
查看>>
WebDriverExtensionsByC#
查看>>
我眼中的技术地图
查看>>
lc 145. Binary Tree Postorder Traversal
查看>>
sublime 配置java运行环境
查看>>