2023年8月2日发(作者:)
Pythonmultiprocessing使⽤详解multiprocessing包是Python中的多进程管理包。与类似,它可以利⽤s对象来创建⼀个进程。该进程可以运⾏在Python程序内部编写的函数。该Process对象与Thread对象的⽤法相同,也有start(), run(), join()的⽅法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),⽤以同步进程,其⽤法与threading包中的同名类⼀致。所以,multiprocessing的很⼤⼀部份与threading使⽤同⼀套API,只不过换到了多进程的情境。但在使⽤这些共享API的时候,我们要注意以下⼏点:在UNIX平台上,当某个进程终结之后,该进程需要被其⽗进程调⽤wait,否则进程成为僵⼫进程(Zombie)。所以,有必要对每个Process对象调⽤join()⽅法 (实际上等同于wait)。对于多线程来说,由于只有⼀个进程,所以不存在此必要性。multiprocessing提供了threading包中没有的IPC(⽐如Pipe和Queue),效率上更⾼。应优先考虑Pipe和Queue,避免使⽤Lock/Event/Semaphore/Condition等同步⽅式 (因为它们占据的不是⽤户进程的资源)。多进程应该避免共享资源。在多线程中,我们可以⽐较容易地共享资源,⽐如使⽤全局变量或者传递参数。在多进程情况下,由于每个进程有⾃⼰独⽴的内存空间,以上⽅法并不合适。此时我们可以通过共享内存和Manager的⽅法来共享资源。但这样做提⾼了程序的复杂度,并因为同步的需要⽽降低了程序的效率。中保存有PID,如果进程还没有start(),则PID为None我们可以从下⾯的程序中看到Thread对象和Process对象在使⽤上的相似性与结果上的不同。各个线程和进程都做⼀件事:打印PID。但问题是,所有的任务在打印的时候都会向同⼀个标准输出(stdout)输出。这样输出的字符会混合在⼀起,⽆法阅读。使⽤Lock同步,在⼀个任务输出完成之后,再允许另⼀个任务输出,可以避免多个任务同时向终端输出import osimport threadingimport multiprocessing# Mainprint('Main:', ())# worker functiondef worker(sign, lock): e() print(sign, ()) e()# Multi-threadrecord = []lock = ()# Multi-processrecord = []lock = ()if __name__ == '__main__': for i in range(5): thread = (target=worker, args=('thread', lock)) () (thread) for thread in record: ()
for i in range(5): process = s(target=worker, args=('process', lock)) () (process)
for process in record: ()Main: 10012thread 10012thread 10012thread 10012thread 10012thread 10012Main: 6052process 6052Main: 8080Main: 4284Main: 7240process 8080process 4284process 7240Main: 10044process 10044Pipe和Queue正如我们在Linux多线程中介绍的管道PIPE和消息队列message queue,multiprocessing包中有Pipe类和Queue类来分别⽀持这两种IPC机制。Pipe和Queue可以⽤来传送常见的对象。1. Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过(duplex=False)创建单向管道 (默认为双向)。⼀个进程从PIPE⼀端输⼊对象,然后被PIPE另⼀端的进程接收,单向管道只允许管道⼀端的进程输⼊,⽽双向管道则允许从两端输⼊。下⾯的程序展⽰了Pipe的使⽤:import multiprocessing as muldef proc1(pipe): ('hello') print('proc1 rec:', ())def proc2(pipe): print('proc2 rec:', ()) ('hello, too')# Build a pipepipe = ()if __name__ == '__main__': # Pass an end of the pipe to process 1 p1 = s(target=proc1, args=(pipe[0],)) # Pass the other end of the pipe to process 2 p2 = s(target=proc2, args=(pipe[1],)) () () () ()proc2 rec: helloproc1 rec: hello, too这⾥的Pipe是双向的。Pipe对象建⽴的时候,返回⼀个含有两个元素的表,每个元素代表Pipe的⼀端(Connection对象)。我们对Pipe的某⼀端调⽤send()⽅法来传送对象,在另⼀端使⽤recv()来接收。2. Queue与Pipe相类似,都是先进先出的结构。但Queue允许多个进程放⼊,多个进程从队列取出对象。Queue使⽤(maxsize)创建,maxsize表⽰队列中可以存放对象的最⼤数量。下⾯的程序展⽰了Queue的使⽤:import osimport multiprocessingimport time#==================# input workerdef inputQ(queue): info = str(()) + '(put):' + str(()) (info)# output workerdef outputQ(queue,lock): info = () e() print (str(()) + ' get: ' + info) e()#===================# Mainrecord1 = [] # store input processesrecord2 = [] # store output processeslock = () # To prevent messy printqueue = (3)if __name__ == '__main__': # input processes for i in range(10): process = s(target=inputQ,args=(queue,)) () (process)
# output processes for i in range(10): process = s(target=outputQ,args=(queue,lock)) () (process)
for p in record1: ()
() # No more object will come, close the queue
for p in record2: ()8572 get: 6300(put):1555486924.36762268136 get: 3464(put):1555486924.4126259576 get: 9660(put):1555486924.51263076936 get: 5064(put):1555486924.597635510652 get: 8688(put):1555486924.59763556992 get: 10988(put):1555486924.75264456548 get: 6836(put):1555486924.74564433504 get: 7284(put):1555486924.76664548652 get: 4960(put):1555486924.853650310868 get: 460(put):1555486924.8606508⼀些进程使⽤put()在Queue中放⼊字符串,这个字符串中包含PID和时间。另⼀些进程从Queue中取出,并打印⾃⼰的PID以及get()的字符串。进程池进程池 (Process Pool)可以创建多个进程。这些进程就像是随时待命的⼠兵,准备执⾏任务(程序)。⼀个进程池中可以容纳多个待命的进程。import multiprocessing as muldef f(x): return x ** 2if __name__ == '__main__': pool = (5) rel = (f, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) print(rel)[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]我们创建了⼀个容许5个进程的进程池 (Process Pool) 。Pool运⾏的每个进程都执⾏f()函数。我们利⽤map()⽅法,将f()函数作⽤到表的每个元素上。这与built-in的map()函数类似,只是这⾥⽤5个进程并⾏处理。如果进程运⾏结束后,还有需要处理的元素,那么的进程会被⽤于重新运⾏f()函数。除了map()⽅法外,Pool还有下⾯的常⽤⽅法。apply_async(func,args) 从进程池中取出⼀个进程执⾏func,args为func的参数。它将返回⼀个AsyncResult的对象,你可以对该对象调⽤get()⽅法以获得结果。close() 进程池不再创建新的进程join() wait进程池中的全部进程。必须对Pool先调⽤close()⽅法才能join。共享内存实例代码:import multiprocessing# Value/Arraydef func1(a, arr): = 3.14 for i in range(len(arr)): arr[i] = 0 = 0if __name__ == '__main__': num = ('d', 1.0) # num=0 arr = ('i', range(10)) # arr=range(10) p = s(target=func1, args=(num, arr)) () () print () print (arr[:])0.0[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]这⾥我们实际上只有主进程和Process对象代表的进程。我们在主进程的内存空间中创建共享的内存,也就是Value和Array两个对象。对象Value被设置成为双精度数(d), 并初始化为1.0。⽽Array则类似于C中的数组,有固定的类型(i, 也就是整数)。在Process进程中,我们修改了Value和Array对象。回到主程序,打印出结果,主程序也看到了两个对象的改变,说明资源确实在两个进程之间共享。ManagerManager是通过共享进程的⽅式共享数据。Manager管理的共享数据类型有:Value、Array、dict、list、Lock、Semaphore等等,同时Manager还可以共享类的实例对象。实例代码:from multiprocessing import Process,Managerdef func1(shareList,shareValue,shareDict,lock): with lock: +=1 shareDict[1]='1' shareDict[2]='2' for i in xrange(len(shareList)): shareList[i]+=1if __name__ == '__main__': manager=Manager() list1=([1,2,3,4,5]) dict1=() array1=('i',range(10)) value1=('i',1) lock=() proc=[Process(target=func1,args=(list1,value1,dict1,lock)) for i in xrange(20)] for p in proc: () for p in proc: () print list1 print dict1 print array1 print value1[21, 22, 23, 24, 25]{1: '1', 2: '2'}array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])Value('i', 21)
发布者:admin,转转请注明出处:http://www.yc00.com/xiaochengxu/1690958505a473004.html
评论列表(0条)