博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
多进程
阅读量:6197 次
发布时间:2019-06-21

本文共 10062 字,大约阅读时间需要 33 分钟。

 

#单进程#!/usr/bin/python# -*- coding: utf-8 -*-from multiprocessing import Process,Poolimport time,osdef fs(name,seconds):  print '%s will sleep: %d s,Now is %s ,fs pid is: %s' % (name,seconds,time.ctime(),os.getpid())  time.sleep(seconds)  print 'Now is: %s' % (time.ctime())  return 'fs' + nameif __name__ == '__main__':  print 'main Pname is %s' % (os.getpid())  for i in range(1,11):    p=Process(target=fs,args=(str(i),2))    p.start()    p.join()  print 'main end: ' + str(os.getpid())

 

#定义最大进程数量,Pool默认大小为CPU核心数量#!/usr/bin/python# -*- coding: utf-8 -*-from multiprocessing import Process,Poolimport time,osdef fs(name,seconds):  print '%s will sleep: %d s,Now is %s ,fs pid is: %s' % (name,seconds,time.ctime(),os.getpid())  time.sleep(seconds)  print 'Now is: %s' % (time.ctime())  return 'fs' + nameif __name__ == '__main__':  print 'main Pname is %s' % (os.getpid())  p=Pool(processes=3)  #定义最多开启3个进程  for i in range(1,11):    p.apply_async(fs,args=(str(i),2))   #如果fs只接受一个参数,则写法为 p.apply_async(fs,args=(a1,))  p.close()  p.join()  print 'main end: ' + str(os.getpid())

 

#获取每个进程的执行结果from multiprocessing import Process,Poolimport time,osdef fs(name,seconds):  print '%s will sleep: %d s,Now is %s ,fs pid is: %s' % (name,seconds,time.ctime(),os.getpid())  time.sleep(seconds)  print 'Now is: %s' % (time.ctime())  return 'fs' + nameif __name__ == '__main__':  print 'main Pname is %s' % (os.getpid())  p=Pool(processes=3)  #result只能获取函数fs的return语句的结果,与fs中的print无关  result=[]  for i in range(1,11):    print 'i is: %d' % (i)    result.append(p.apply_async(fs,args=(str(i),2)))  p.close()  p.join()  for r in result:    print r.get()  print 'main end: ' + str(os.getpid())

 Win32平台添加如下代码,防止多进程崩溃

 

from multiprocessing import Process, freeze_supportif __name__ == '__main__':    freeze_support()

 

p.start()来启动子进程

p.join()方法来使得子进程运行结束后再执行父进程

 

示例:

ping多个域名:

def fping(ip):    import subprocess,sys    reload(sys)    sys.setdefaultencoding('utf-8')    sc = subprocess.Popen(['ping.exe',ip,'-n','2'],shell=True,stdout=subprocess.PIPE)    while sc.poll() == None:        sclines = sc.stdout.readlines()        for l in sclines:            #return l.strip().decode('GBK')            print l.strip().decode('GBK')if __name__ == '__main__':    from multiprocessing import Process,Pool,freeze_support,Queue    freeze_support()    ips=['www.baidu.com','www.163.com','www.sina.com.cn','www.cctv.com','www.xin.com','apollo.youxinpai.com']    p=Pool(processes=4)     for ip in ips:        p.apply_async(fping,args=(ip,))    p.close()    p.join()

函数fping通过使用return取得返回结果:

def fping(ip):    import subprocess,sys    reload(sys)    sys.setdefaultencoding('utf-8')    sc = subprocess.Popen(['ping.exe',ip,'-n','1'],shell=True,stdout=subprocess.PIPE)    while sc.poll() == None:        sclines = sc.stdout.readlines()        prs =''        for l in sclines:            #return l.strip().decode('GBK')            prd = l.strip().decode('GBK') +'\n'            prs += prd        return prs.strip()print fping('www.baidu.com')

 

https://docs.python.org/2/library/multiprocessing.html

共享内存变量 multiprocessing.Queue/Array,效率高于manager()

from multiprocessing import Process, Queuedef f(q):    q.put([42, None, 'hello'])if __name__ == '__main__':    q = Queue()    p = Process(target=f, args=(q,)) #q可以传递到函数中,但通过applay_async的方式传递不进去。另这个Queue()有大小限制,大了的话程序就假死。    p.start()    print q.get()    # prints "[42, None, 'hello']"     p.join()

 

import multiprocessingfrom multiprocessing import Process, Value, Array  def f(n, a):    n.value   = 3.14    a[0]      = 5if __name__ == '__main__':      num   = multiprocessing.Value('d', 0.0)    arr   = multiprocessing.Array('i', range(10))    #arr = Array('c', 'oaaaaaaaaaaaaaa') #字符串类型          p = multiprocessing.Process(target=f, args=(num, arr))    p.start()    p.join()          print num.value #返回3.14    print arr[:] #返回[5, 1, 2, 3, 4, 5, 6, 7, 8, 9]

 

使用共享变量list实现真正的多进程并发:

# -*- coding: UTF-8 -*-from multiprocessing import Process,Pool,freeze_support,Managerimport subprocess,sys,timereload(sys)sys.setdefaultencoding('utf-8')def fping(ip,ls):    sc = subprocess.Popen(['ping.exe',ip,'-n','1'],shell=True,stdout=subprocess.PIPE)    while sc.poll() == None:        sclines = sc.stdout.readlines()        ls.append(sclines)if __name__ == '__main__':    freeze_support()    #使用Manager()在多进程间共享变量ls    manager = Manager()    ls = []    ls = manager.list()    ips=['www.baidu.com','www.163.com','www.sina.com.cn','www.cctv.com','www.xin.com','apollo.youxinpai.com']        p=Pool(processes=4)  #定义进程数量    for ip in ips:        p.apply_async(fping,args=(ip,ls))     p.close()    p.join()        #将结果写入到文本文件中    fpingfile = 'e:\\ping.txt'    fw = open(fpingfile,'a')    for lrs in ls:        for lr in lrs:            fw.write(lr.strip().decode('GBK') + '\n')    fw.close()

共享变量使用dict:

def testfunc(key,value,ls):    ls[key]=value    print 'process id: ',os.getpid()if __name__ == '__main__':    freeze_support()    manager = Manager()    ls = Manager().dict()    p=Pool(processes=8)    for ll in range(10):        lld = ll+1        p.apply_async(testfunc,args=(ll,lld,ls))    p.close()    p.join()    print 'main end,main process id: ',os.getpid()    print ls返回:{0: 1, 1: 2, 2: 3, 3: 4, 4: 5, 5: 6, 6: 7, 7: 8, 8: 9, 9: 10}

 共享变量使用value:

def testfunc(cc,ls,lock):    #lock.acquire() #效果同with lock写法    with lock:        ls.value +=cc    #lock.release()    print 'process id: ',os.getpid(),'  ',ls.valueif __name__ == '__main__':    freeze_support()    manager = Manager()    lock = manager.Lock() #使用multiprocessing中导入的Lock,经测试无法使用,定义为global,或者通过args传递均无法使用    ls = Manager().Value('tmp',0)    p=Pool() #processes=2    for ll in range(6):        p.apply_async(testfunc,args=(ll,ls,lock))    p.close()    p.join()    print 'main end,main process id: ',os.getpid()    print ls.value #返回15

 共享变量Queue的应用:(先进先出,测试的时候,如果不加锁,也能正常put进queue。因为多进程中的queue有安全机制,所以不用加lock)

# 写数据进程执行的代码:def write(q,lock,value):    #with lock:    lock.acquire() #加上锁    print 'Put %s to queue...' % value    time.sleep(0.2)            q.put(value)            lock.release() #释放锁  # 读数据进程执行的代码:def read(q):    while True:        #not q.empty():        #value = q.get_nowait()         try:       q.get(timeout=3) #3秒后还取不到数据则抛出Queue.empty。用该参数变相结束read进程。          print 'Get %s from queue.queue size is %s' % (value,q.qsize())          time.sleep(0.5)         except:           breakif __name__=='__main__':    freeze_support()    manager = Manager()    # 父进程创建Queue,并传给各个子进程:    q = manager.Queue()    lock = manager.Lock() #初始化一把锁    p = Pool()    for ll in range(10):        pw = p.apply_async(write,args=(q,lock,ll))          pr = p.apply_async(read,args=(q,)) #read并发多个进程。     p.close()    p.join()        print    print 'done'

 1.

if __name__ == '__main__':    #global q    freeze_support()    q=Manager().Queue()    p=Pool()    p2=Pool(2)    for ll in range(10):        pw = p.apply_async(write,args=(ll,q))    #read可以使用另一个进程池,也可以共享queue    p2.apply_async(read,args=(q,)) #read并发一个进程    p2.close()    p.close()    p.join()    p2.join()

2.

if __name__ == '__main__':    #global q    freeze_support()    q=Manager().Queue()    p=Pool()    p2=Pool(2)    for ll in range(10):        pw = p.apply_async(write,args=(ll,q))    #read可以另起一个进程,也可以共享queue    p1=Process(target=read,args=(q,)) #read一个进程    p1.start()    p1.join()

 3.

for i in range(3):        p = Process(target=write2,args=(i,q))        threads.append(p)        p.start()    for t in threads:        t.join()

 

 通过class派生类:

import multiprocessingclass Worker(multiprocessing.Process):
def run(self):        print 'In %s' % self.name        returnif __name__ == '__main__':    jobs = []    for i in range(5):        p = Worker()        jobs.append(p)        p.start()    for j in jobs:        j.join()

 

import multiprocessing, Queueimport osimport timefrom multiprocessing import Processfrom time import sleepfrom random import randintclass Producer(multiprocessing.Process):    def __init__(self, queue):        multiprocessing.Process.__init__(self)        self.queue = queue            def run(self):        for i in range(10):            value = self.queue.put(i)            print multiprocessing.current_process().name + 'is putting ' + str(i) + ' ' + str(os.getpid())            sleep(randint(1, 3))                class Consumer(multiprocessing.Process):    def __init__(self, queue):        multiprocessing.Process.__init__(self)        self.queue = queue            def run(self):        while True:            d = self.queue.get(timeout=1)            if d != None:                print multiprocessing.current_process().name + 'is getting ' + str(d) + ' ' +str(os.getpid())                sleep(randint(1, 4))                continue            else:                break                #create queuequeue = multiprocessing.Queue(10)       if __name__ == "__main__":    print 'start'    #create processes        threads = []    for i in range(3):        pp = Producer(queue)        pc = Consumer(queue)        threads.append(pp)        threads.append(pc)        pp.start()        pc.start()        for t in threads:        t.join()        返回:startProducer-1is putting 0 65620Producer-3is putting 0 49132Consumer-6is getting 0 58876Consumer-4is getting 0 55396Producer-5is putting 0 63544Consumer-2is getting 0 62548Producer-1is putting 1 65620Producer-3is putting 1 49132Consumer-6is getting 1 58876Producer-5is putting 1 63544Consumer-2is getting 1 62548Producer-3is putting 2 49132Producer-1is putting 2 65620Producer-3is putting 3 49132Consumer-6is getting 1 58876Consumer-4is getting 2 55396Producer-5is putting 2 63544Consumer-2is getting 2 62548Producer-5is putting 3 63544Producer-3is putting 4 49132Consumer-4is getting 3 55396Producer-5is putting 4 63544Consumer-2is getting 2 62548Producer-1is putting 3 65620Consumer-4is getting 3 55396Producer-3is putting 5 49132Consumer-6is getting 4 58876

 经测试:Manage().list()或Queue()在使用过程中效率远低于global变量。

 

 

 

 

多进程间共享变量:

http://www.tuicool.com/articles/ZZri22

http://www.cnblogs.com/itech/archive/2012/01/10/2318120.html

转载于:https://www.cnblogs.com/dreamer-fish/p/5132917.html

你可能感兴趣的文章
(喷血分享)利用.NET生成数据库表的创建脚本,类似SqlServer编写表的CREATE语句...
查看>>
Beauty Contest
查看>>
[ACM_模拟] POJ1068 Parencodings (两种括号编码转化 规律 模拟)
查看>>
黑苹果收集
查看>>
【转】Struts2 和 Spring MVC对比
查看>>
【Hibernate步步为营】--继承映射具体解释
查看>>
Android -- ImageLoader简析
查看>>
『一些同学学不好C语言,把罪责归于「因为教材是谭浩强写的」实在是很滑稽』吗?...
查看>>
mysql 索引相关知识
查看>>
自定义控件出现“loaded nib but the view outlet was not set”
查看>>
深信服笔试题(网络project师售后)
查看>>
我是一个线程(修订版)
查看>>
成都Uber优步司机奖励政策(4月4日)
查看>>
C语言+SDL2 图形化编程
查看>>
Oracle数据库之SQL基础(二)
查看>>
mybati之#与$的区别
查看>>
常用命令收集
查看>>
似懂非懂的localStorage和sessionStorage
查看>>
阿里移动技术峰会的一些体会 2015-07-04
查看>>
Visual Studio 连接 SQL Server 的connectionStringz和
查看>>