![]() |
#2
sweet6hero2013-09-03 14:48
# -*- coding: UTF-8 -*-
''' Created on 2013-8-8 ''' #创建自己的线程类,必要时重写threading.Thread类的方法 import threading, time import thread from Queue import Queue import random #1、用 thread.start_new_thread() 创建线程 def timer(no, interval): cnt = 0 while cnt<10: print 'Thread:(%d) Time:%s/n'%(no, time.ctime()) time.sleep(interval) cnt+=1 thread.exit_thread() #2、线程共享数据时候要用线程锁 count = 0 class Counter(threading.Thread): def __init__(self, lock, threadName): '''@summary: 初始化对象。 @param lock: 琐对象。 @param threadName: 线程名称。 ''' super(Counter, self).__init__(name = threadName) #注意:一定要显式的调用父类的初始 化函数。 self.lock = lock def run(self): '''@summary: 重写父类run方法,在线程启动后执行该方法内的代码。 ''' global count self.lock.acquire() for i in xrange(10000): count = count + 1 print count self.lock.release() def stop(self): self.thread_stop = True #3、线程通信(条件变量)共享product数据 # 商品 product = None # 条件变量 con = threading.Condition() # 生产者方法 def produce(): global product if con.acquire(): while True : if product is None : print 'produce...' product = 'anything' # 通知消费者,商品已经生产 con.notify() # 等待通知 con.wait() time.sleep( 2 ) #con.release() # 消费者方法 def consume(): global product if con.acquire(): while True : if product is not None : print 'consume...' product = None # 通知生产者,商品已经没了 con.notify() # 等待通知 con.wait() time.sleep( 2 ) #con.release() #4、同步队列,一个队列的同步实现。 '''myqueue.put(10) 调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block 为可选参数,默认为1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full 异常。 将一个值从队列中取出myqueue.get() 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为1。如果队列为空且block 为1,get()就使调用线程暂停,直至有项目可用。如果block为0,队列将引发Empty异常。''' # Producer thread class Producer(threading.Thread): def __init__(self, threadname, queue): threading.Thread.__init__(self, name = threadname) self.sharedata = queue def run(self): for i in range(20): print self.getName(),'adding',i,'to queue' self.sharedata.put(i) time.sleep(random.randrange(10)/10.0) print self.getName(),'Finished' # Consumer thread class Consumer(threading.Thread): def __init__(self, threadname, queue): threading.Thread.__init__(self, name = threadname) self.sharedata = queue def run(self): for i in range(20): print self.getName(),'got a value:',self.sharedata.get() time.sleep(random.randrange(10)/10.0) print self.getName(),'Finished' if __name__=='__main__': #1、用 thread.start_new_thread() 创建线程 thread.start_new_thread(timer, (1,1)) thread.start_new_thread(timer, (2,2)) #2、线程共享数据count时候要用线程锁 lock = threading.Lock() for i in range(5): #setDaemon(True)主线程退出子线程也将结束 t= Counter(lock, "thread-" + str(i)) t.setDaemon(True) t.start() time.sleep(2) #确保线程都执行完毕 #2、线程通信(条件变量) t2 = threading.Thread(target = consume) t1 = threading.Thread(target = produce) t2.start() t1.start() #3、同步队列 queue = Queue() producer = Producer('Producer', queue) consumer = Consumer('Consumer', queue) print 'Starting threads ...' producer.start() consumer.start() producer.join() consumer.join() |
# -*- coding: UTF-8 -*-
'''
Created on 2013-9-3
'''
import os
import threading
import multiprocessing
import time
# worker function
def worker(sign, lock):
'''使用Lock同步,在一个任务输出完成之后,再允许另一个任务输出,可以避免多个任务同时向终端输出。'''
lock.acquire()
'''Process.PID中保存有PID,如果进程还没有start(),则PID为None。'''
print(sign, os.getpid())
lock.release()
def proc1(pipe):
pipe.send('hello')
print('proc1 rec:',pipe.recv())
def proc2(pipe):
print('proc2 rec:',pipe.recv())
pipe.send('hello, too')
# input worker
def inputQ(queue):
info = str(os.getpid()) + '(put):' + str(time.time())
queue.put(info)
# output worker
def outputQ(queue,lock):
info = queue.get()
lock.acquire()
print (str(os.getpid()) + '(get):' + info)
lock.release()
if __name__ == '__main__':
'''所有Thread的PID都与主程序相同,而每个Process都有一个不同的PID。'''
# 1、Multi-thread
record = []
lock = threading.Lock()
for i in range(5):
thread = threading.Thread(target=worker,args=('thread',lock))
thread.start()
record.append(thread)
for thread in record:
thread.join()
# Main
print('Main:',os.getpid())
#2、 Multi-process
record = []
lock = multiprocessing.Lock()
for i in range(5):
'''利用multiprocessing.Process对象来创建一个进程
Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。
此外multiprocessing包中也有Lock/Event/Semaphore/Condition类
(这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程'''
process = multiprocessing.Process(target=worker,args=('process',lock))
process.start()
record.append(process)
for process in record:
process.join()
# 3、Build a pipe
'''Pipe可以是单向(half-duplex)pipe有一个,也可以是双向(duplex)pipe有两个。
我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。'''
pipe = multiprocessing.Pipe()
# Pass an end of the pipe to process 1
p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
# Pass the other end of the pipe to process 2
p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join()
#3、Build Queue
'''Queue允许多个进程放入,多个进程从队列取出对象。
Queue使用mutiprocessing.Queue(maxsize)创建,maxsize表示队列中可以存放对象的最大数量。'''
record1 = [] # store input processes
record2 = [] # store output processes
lock = multiprocessing.Lock() # To prevent messy print
queue = multiprocessing.Queue(3)
'''一些进程使用put()在Queue中放入字符串,这个字符串中包含PID和时间。
另一些进程从Queue中取出,并打印自己的PID以及get()的字符串。'''
# input processes
for i in range(10):
process = multiprocessing.Process(target=inputQ,args=(queue,))
process.start()
record1.append(process)
# output processes
for i in range(10):
process = multiprocessing.Process(target=outputQ,args=(queue,lock))
process.start()
record2.append(process)
for p in record1:
p.join()
queue.close() # No more object will come, close the queue
for p in record2:
p.join()