| 网站首页 | 业界新闻 | 小组 | 威客 | 人才 | 下载频道 | 博客 | 代码贴 | 在线编程 | 编程论坛
欢迎加入我们,一同切磋技术
用户名:   
 
密 码:  
共有 2675 人关注过本帖
标题:python多进程编程
只看楼主 加入收藏
sweet6hero
Rank: 1
等 级:新手上路
帖 子:87
专家分:0
注 册:2013-6-9
结帖率:40%
收藏
已结贴  问题点数:20 回复次数:9 
python多进程编程
# -*- coding: UTF-8 -*-
'''
Created on 2013-9-3


'''
import os
import threading
import multiprocessing
import time


# worker function
def worker(sign, lock):
    '''使用Lock同步,在一个任务输出完成之后,再允许另一个任务输出,可以避免多个任务同时向终端输出。'''
    lock.acquire()
    '''Process.PID中保存有PID,如果进程还没有start(),则PID为None。'''
    print(sign, os.getpid())
    lock.release()



def proc1(pipe):
    pipe.send('hello')
    print('proc1 rec:',pipe.recv())

def proc2(pipe):
    print('proc2 rec:',pipe.recv())
    pipe.send('hello, too')
   

# input worker
def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.time())
    queue.put(info)

# output worker
def outputQ(queue,lock):
    info = queue.get()
    lock.acquire()
    print (str(os.getpid()) + '(get):' + info)
    lock.release()

if __name__ == '__main__':
    '''所有Thread的PID都与主程序相同,而每个Process都有一个不同的PID。'''
    # 1、Multi-thread
    record = []
    lock  = threading.Lock()
    for i in range(5):
        thread = threading.Thread(target=worker,args=('thread',lock))
        thread.start()
        record.append(thread)
    for thread in record:
        thread.join()
    # Main
    print('Main:',os.getpid())
     
      
    #2、 Multi-process
    record = []
    lock = multiprocessing.Lock()
    for i in range(5):
        '''利用multiprocessing.Process对象来创建一个进程
        Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。
              此外multiprocessing包中也有Lock/Event/Semaphore/Condition类
               (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程'''
        process = multiprocessing.Process(target=worker,args=('process',lock))
        process.start()
        record.append(process)

    for process in record:
        process.join()
        
 
    # 3、Build a pipe
    '''Pipe可以是单向(half-duplex)pipe有一个,也可以是双向(duplex)pipe有两个。
             我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。'''
    pipe = multiprocessing.Pipe()
   
    # Pass an end of the pipe to process 1
    p1   = multiprocessing.Process(target=proc1, args=(pipe[0],))
    # Pass the other end of the pipe to process 2
    p2   = multiprocessing.Process(target=proc2, args=(pipe[1],))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
  
  
    #3、Build Queue
    '''Queue允许多个进程放入,多个进程从队列取出对象。
        Queue使用mutiprocessing.Queue(maxsize)创建,maxsize表示队列中可以存放对象的最大数量。'''
    record1 = []   # store input processes
    record2 = []   # store output processes
    lock  = multiprocessing.Lock()    # To prevent messy print
    queue = multiprocessing.Queue(3)
   
    '''一些进程使用put()在Queue中放入字符串,这个字符串中包含PID和时间。
             另一些进程从Queue中取出,并打印自己的PID以及get()的字符串。'''
    # input processes
    for i in range(10):
        process = multiprocessing.Process(target=inputQ,args=(queue,))
        process.start()
        record1.append(process)
   
    # output processes
    for i in range(10):
        process = multiprocessing.Process(target=outputQ,args=(queue,lock))
        process.start()
        record2.append(process)
   
    for p in record1:
        p.join()
   
    queue.close()  # No more object will come, close the queue
   
    for p in record2:
        p.join()
  
搜索更多相关主题的帖子: python import hello start 
2013-09-03 14:39
sweet6hero
Rank: 1
等 级:新手上路
帖 子:87
专家分:0
注 册:2013-6-9
收藏
得分:0 
# -*- coding: UTF-8 -*-
'''
Created on 2013-8-8

'''

#创建自己的线程类,必要时重写threading.Thread类的方法
import threading, time
import thread
from Queue import Queue
import random


#1、用 thread.start_new_thread() 创建线程   
def timer(no, interval):  
    cnt = 0  
    while cnt<10:  
        print 'Thread:(%d) Time:%s/n'%(no, time.ctime())  
        time.sleep(interval)  
        cnt+=1  
    thread.exit_thread()  
   
   
   
   
#2、线程共享数据时候要用线程锁   
count = 0
class Counter(threading.Thread):  
    def __init__(self, lock, threadName):  
        '''@summary: 初始化对象。
         
         @param lock: 琐对象。
          @param threadName: 线程名称。
        '''  
        super(Counter, self).__init__(name = threadName)  #注意:一定要显式的调用父类的初始  化函数。  
        self.lock = lock  
      
    def run(self):  
        '''@summary: 重写父类run方法,在线程启动后执行该方法内的代码。
        '''  
        global count  
        self.lock.acquire()  
        for i in xrange(10000):  
            count = count + 1
            print count   
        self.lock.release()
    def stop(self):  
        self.thread_stop = True

      



#3、线程通信(条件变量)共享product数据   
# 商品
product = None
# 条件变量
con = threading.Condition()
  
# 生产者方法
def produce():
    global product
     
    if con.acquire():
        while True :
            if product is None :
                print 'produce...'
                product = 'anything'
                 
                # 通知消费者,商品已经生产
                con.notify()
            
            # 等待通知
            con.wait()
            time.sleep( 2 )
    #con.release()
  
# 消费者方法
def consume():
    global product
     
    if con.acquire():
        while True :
            if product is not None :
                print 'consume...'
                product = None
                 
                # 通知生产者,商品已经没了
                con.notify()
            
            # 等待通知
            con.wait()
            time.sleep( 2 )
    #con.release()
  
 
 
  
#4、同步队列,一个队列的同步实现。
'''myqueue.put(10) 调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block

为可选参数,默认为1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full

异常。

 将一个值从队列中取出myqueue.get() 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为1。如果队列为空且block

为1,get()就使调用线程暂停,直至有项目可用。如果block为0,队列将引发Empty异常。'''
# Producer thread

class Producer(threading.Thread):

    def __init__(self, threadname, queue):

        threading.Thread.__init__(self, name = threadname)

        self.sharedata = queue

    def run(self):

        for i in range(20):

            print self.getName(),'adding',i,'to queue'

            self.sharedata.put(i)

            time.sleep(random.randrange(10)/10.0)

            print self.getName(),'Finished'

 

# Consumer thread

class Consumer(threading.Thread):

    def __init__(self, threadname, queue):

        threading.Thread.__init__(self, name = threadname)

        self.sharedata = queue

    def run(self):

        for i in range(20):

            print self.getName(),'got a value:',self.sharedata.get()

            time.sleep(random.randrange(10)/10.0)

            print self.getName(),'Finished'

 
  

   
if __name__=='__main__':
    #1、用 thread.start_new_thread() 创建线程   
    thread.start_new_thread(timer, (1,1))   
    thread.start_new_thread(timer, (2,2))
   
     
   
    #2、线程共享数据count时候要用线程锁   
    lock = threading.Lock()  
    for i in range(5):
        #setDaemon(True)主线程退出子线程也将结束
        t= Counter(lock, "thread-" + str(i))
        t.setDaemon(True)   
        t.start()  
    time.sleep(2)   #确保线程都执行完毕
     
        
   
   
    #2、线程通信(条件变量)
    t2 = threading.Thread(target = consume)
    t1 = threading.Thread(target = produce)
    t2.start()
    t1.start()
   
   
   
   
    #3、同步队列
    queue = Queue()  
    producer = Producer('Producer', queue)
    consumer = Consumer('Consumer', queue)
    print 'Starting threads ...'
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()

  

  
2013-09-03 14:48
sweet6hero
Rank: 1
等 级:新手上路
帖 子:87
专家分:0
注 册:2013-6-9
收藏
得分:0 
# -*- coding: UTF-8 -*-
'''
Created on 2013-8-12

'''   
import Queue   
import threading   
import time   
   
class WorkManager(object):   
    def __init__(self,lock=None, work_num=1000,thread_num=2):
        #工作队列   
        self.work_queue = Queue.Queue()
        #线程队列   
        self.threads = []
        self.lock =lock  
        #初始化工作队列和线程队列   
        self.__init_work_queue(work_num)   
        self.__init_thread_pool(thread_num)
        
   
    """   
        初始化thread_num个线程 ,每个线程关联到同一个工组队列,放入线程队列中  
    """   
    def __init_thread_pool(self,thread_num):
        #创建 thread_num个线程放入线程队列   
        for i in range(thread_num):   
            self.threads.append(Work(self.work_queue,self.lock))   
   
    """   
        初始化工作队列,将工作放入到工组队列   
    """   
    def __init_work_queue(self, jobs_num):
        #将工作放入工作队列   
        for i in range(jobs_num):   
            self.add_job(do_job, i)  
 
   
    """   
        添加一项工作到工作队列  
    """   
    def add_job(self, func, *args):   
        self.work_queue.put((func, list(args)))#任务入队,Queue内部实现了同步机制
        
   
    """   
        等待所有线程运行完毕 ,判断线程是否激活状态是则调用  
    """      
    def wait_allcomplete(self):   
        for item in self.threads:   
            if item.isAlive():
                item.join()   
   
class Work(threading.Thread):   
    def __init__(self, work_queue,lock=None,threadname=None):   
        threading.Thread.__init__(self,threadname)   
        self.work_queue = work_queue
        self.lock=lock   
        self.start()   
   
    def run(self):   
        #死循环,从而让创建的线程在一定条件下关闭退出   
        while True:   
            try:
                self.lock.acquire()      
                do, args = self.work_queue.get(block=False)#任务异步出队,Queue内部实现了同步机制
                self.lock.release()   
                do(args)   
                self.work_queue.task_done()#通知系统任务完成   
            except:   
                break   


   
#具体要做的任务   
def do_job(args):   
    time.sleep(0.1)#模拟处理时间   
    print threading.current_thread(),"ssss"
   
def do_job1(args):
    for a in args:
        print a
 
 
   
if __name__ == '__main__':   
    start = time.time()
    lock=threading.Lock()   
    work_manager =  WorkManager(lock,10, 2)#或者work_manager =  WorkManager(10000, 20)
    work_manager.add_job(do_job1,{'1':"ss"},2,3)   
    work_manager.wait_allcomplete()   
    end = time.time()   
    print "cost all time: %s" % (end-start)  

  

  
2013-09-03 14:49
yuccn
Rank: 16Rank: 16Rank: 16Rank: 16
来 自:何方
等 级:版主
威 望:167
帖 子:6814
专家分:42393
注 册:2010-12-16
收藏
得分:10 
只能帮顶 了

我行我乐
公众号:逻辑客栈
我的博客:
https://blog.yuccn. net
2013-09-05 08:08
wp231957
Rank: 20Rank: 20Rank: 20Rank: 20Rank: 20
来 自:神界
等 级:贵宾
威 望:423
帖 子:13688
专家分:53332
注 册:2012-10-18
收藏
得分:10 
以下是引用yuccn在2013-9-5 08:08:11的发言:

只能帮顶 了

DO IT YOURSELF !
2013-09-05 09:12
sweet6hero
Rank: 1
等 级:新手上路
帖 子:87
专家分:0
注 册:2013-6-9
收藏
得分:0 
import threadpool


def time_process_disaster(data):
    """
        function of the theard
    """
    scalr_conn = data["scalr_conn"]
    print scalr_conn

def time_disaster(data):
    """
        function of the theard
    """
    scalr_conn = data["scalr_conn"]
    print "scalr_conn"
     
class TimeProcessDnsDisaster:
    def __init__(self):
        self.pool = threadpool.ThreadPool(2)
    def job_function(self):
        data=[]  
        for i in range(100):
            data.append({"scalr_conn":i})
        requests = threadpool.makeRequests(time_process_disaster,data)
        print requests.__len__()
        [self.pool.putRequest(req) for req in requests]
        self.pool.wait()

class TimeDisaster:
    def __init__(self):
        self.pool = threadpool.ThreadPool(2)
    def job_function(self):
        data=[]  
        for i in range(100):
            data.append({"scalr_conn":i})
        requests = threadpool.makeRequests(time_disaster,[data[0]])
        request = threadpool.makeRequests(time_disaster,[data[2]])
        print requests.__len__()
        [self.pool.putRequest(req) for req in requests]
        [self.pool.putRequest(req) for req in request]
        self.pool.wait()
        
if __name__ == "__main__":
    t= TimeProcessDnsDisaster()
    t.job_function()
   
    q=TimeDisaster()
    q.job_function()
   
    print t.pool,q.pool
2013-10-30 11:14
sweet6hero
Rank: 1
等 级:新手上路
帖 子:87
专家分:0
注 册:2013-6-9
收藏
得分:0 
#!/usr/bin/python
_DEBUG=True
def debug_demo(val):
    if _DEBUG == True:
        import pdb
        pdb.set_trace()
    if val <= 1600 :
        print "level 1"
        print 0
    elif val <= 3500 :
        print "level 2"
        print (val - 1600) * 0.05
    elif val <= 6500 :
        print "level 3"
        print (val - 3500) * 0.10 + (3500-1600) * 0.05
    else:
        print "level 4"
        print (val - 6500) * 0.20 + (6500-3500) * 0.10 + (3500-1600) * 0.05


#~def debug_demo                 

if __name__ == "__main__":

    debug_demo(4500)
2013-10-30 11:29
sweet6hero
Rank: 1
等 级:新手上路
帖 子:87
专家分:0
注 册:2013-6-9
收藏
得分:0 
import unittest

class Widget:  
    def __init__(self, size = (40, 40)):  
        self._size = size  
    def getSize(self):  
        return self._size  
    def resize(self, width, height):  
        if width == 0  or height < 0:  
            raise ValueError, "illegal size"  
        self._size = (width, height)  


class mytestproject1(unittest.TestCase):
    def testcase1(self):
        print 'testcase1'
        self.assertEquals(7/2,3)
    def testcase2(self):
        print 'testcase2'
        self.assertEquals("".join(['a','b',' c']),"abc")

#
class DefaultWidgetSizeTestCase(unittest.TestCase):
    def setUp(self):
        print "setup"
        self.widget=Widget((50,50))
         
    def tearDown(self):
        print 'teardown'
        self.widget=None

    def testrunTest1(self):
        print 'run1'
        assert self.widget.getSize()==(50,50),'incorrect default size'
     
    def testrunTest2(self):
        print 'run2'
        assert self.widget.getSize()==(50,50),'incorrect default size'

        
if __name__ == "__main__":
    '''#直接测试类中某个方法
    testcase=mytestproject1("testcase1")
    runner=unittest.TextTestRunner()
    runner.run(testcase)

    #使用聚合测试套件测试多个类的多个方法
    suite=unittest.TestSuite()
    suite.addTest(DefaultWidgetSizeTestCase("testrunTest1"))
    #suite=unittest.makeSuite(DefaultWidgetSizeTestCase,'testrunTest2')
    suite.addTest(mytestproject1("testcase1"))
    runner=unittest.TextTestRunner()
    runner.run(suite)'''
   
    #使用聚合测试套件测试多个类的多个方法
    suite1 = unittest.TestLoader().loadTestsFromTestCase(DefaultWidgetSizeTestCase)
    unittest.TextTestRunner(verbosity=2).run(suite1)
2013-10-30 11:30
sweet6hero
Rank: 1
等 级:新手上路
帖 子:87
专家分:0
注 册:2013-6-9
收藏
得分:0 
from xml.sax import *  
class UserDecodeHandler(ContentHandler):  
    users=None  
    map=None  
    temp=""  
    currenttag=None  
    user=None  
    def startDocument(self):  
        print "start xml document"  
         
    def endDocument(self):  
        print "end xml document"  
    def startElement(self,name,attrs):  
        if name=="users":  
            self.users=[]  
        elif name=="user":  
            self.user={"name":attrs['name']}  
        self.currenttag=name  
      
    def endElement(self,name):  
        if name=="user":  
            self.users.append(self.user)  
        elif name=="description":  
            self.user.update({"description":self.temp.strip()})  
            self.temp=""  
        self.currenttag=None  
    def characters(self,content):  
        self.temp+=content  
  
parser=make_parser()  
handler=UserDecodeHandler()  
parser.setContentHandler(handler)  
data=""  
with open("e:\s.xml") as file:  
    data=file.read().strip()
    print data  
import StringIO  
parser.parse(StringIO.StringIO(data))  
  
for item in handler.users:  
    print "======================="  
    for i in item.items():  
        key,value=i  
        print key,value.encode("gbk")
2013-10-30 11:32
sweet6hero
Rank: 1
等 级:新手上路
帖 子:87
专家分:0
注 册:2013-6-9
收藏
得分:0 
import Tkinter
import urllib
def resize(ev=None):
    label.config(font='Helvetica -%d bold' % scale.get())
   
top = Tkinter.Tk()
top.geometry('250x150')
label = Tkinter.Label(top, text='Hello World!',font='Helvetica -12 bold')
quit = Tkinter.Button(top, text='Hello World!',command=top.quit)
label.pack(expand=1)
scale = Tkinter.Scale(top, from_=10, to=40, command=resize)
scale.set(12)
scale.pack(expand=1)
quit.pack()
urllib.urlretrieve(r'http://www.baidu.com', localfile=r"e:\b.txt")
Tkinter.mainloop()
2013-10-30 11:34
快速回复:python多进程编程
数据加载中...
 
   



关于我们 | 广告合作 | 编程中国 | 清除Cookies | TOP | 手机版

编程中国 版权所有,并保留所有权利。
Powered by Discuz, Processed in 0.017298 second(s), 7 queries.
Copyright©2004-2024, BCCN.NET, All Rights Reserved