多线程使用场景:
由于 Python 虚拟机是单线程(GIL)的原因,只有线程在执行I/O密集型的应用时才能更好的发挥 Python的并发行,计算密集型应用只需做轮询
同步原语:
当任意数量的线程可以访问临界区的代码,但在给定时刻只有一个线程可以通过时,才使用同步机制。
- 锁:最简单、最低级的机制
- 信号量:用于多线程竞争有限资源的情况
问题:
问题一:
import thread
报错
解决方法:import _thread
建议:不要使用thread模块(有缺点),建议使用threading模块
问题二:
学习过程中,遇到这样一行代码:
loops=(randrange(2,5) for x in xrange(randrange(3,7)))
不知道什么意思,后面明白了,它的作用是:生成3-6个(个数由randrange(3,7)
控制)随机数,随机数的值为2-4(由randrange(2,5)
控制)。
问题三:
下面代码不清楚什么作用:
class CleanOutputSet(set):
def __str__(self):
return ', '.join(x for x in self)
后来,查阅资料知道其作用是:自定义一个集合类,重写—str方法,将默认输出改变为将其所有元素按照逗号分隔的字符串
锁示例
问题二和问题三的完整代码:
#python 3.6
from atexit import register
from random import randrange
from threading import Thread,Lock,currentThread
from time import sleep,ctime
#自定义一个集合类,重写—__str__方法,将默认输出改变为将其所有元素按照逗号分隔的字符串
class CleanOutputSet(set):
def __str__(self):
return ', '.join(x for x in self)
lock = Lock()#锁
#随机数量的线程(3~6个),每个线程暂停2~4秒
loops = (randrange(2,5) for x in range(randrange(3,7)))
remaining = CleanOutputSet()#自定义集合类的实例
def loop(nsec):
myname = currentThread().name#获得当前线程的名称
lock.acquire()#获取锁,阻止其他线程进入到临界区
remaining.add(myname)#将线程名添加到集合中
print('[%s] Started %s'% (ctime(),myname))
lock.release()
sleep(nsec)
lock.acquire()
remaining.remove(myname)#从集合中删除当前线程
print('[%s] Completed %s (%d secs)' % (ctime(),myname,nsec))
print(' (remaining: %s)' % (remaining or 'NONE'))
lock.release()
#main函数前面添加‘_’是为了不在其他地方使用而导入。_main只能在命令行模式下才能执行
def _main():
for pause in loops:
Thread(target=loop,args=(pause,)).start()#循环派生并执行每个线程
#装饰器,注册_atexit()函数,使得解释器在脚本退出的时候执行此函数
@register
def _atexit():
print('ALL DONE AT:',ctime())
if __name__ == '__main__':
_main()
输出:
C:\Users\Satan\AppData\Local\Programs\Python\Python36\python.exe "E:/PyCharm Project/MT/mtsleepF.py"
[Tue May 14 20:38:59 2019] Started Thread-1
[Tue May 14 20:38:59 2019] Started Thread-2
[Tue May 14 20:38:59 2019] Started Thread-3
[Tue May 14 20:38:59 2019] Started Thread-4
[Tue May 14 20:39:01 2019] Completed Thread-2 (2 secs)
(remaining: Thread-1, Thread-3, Thread-4)
[Tue May 14 20:39:01 2019] Completed Thread-1 (2 secs)
(remaining: Thread-3, Thread-4)
[Tue May 14 20:39:02 2019] Completed Thread-4 (3 secs)
(remaining: Thread-3)
[Tue May 14 20:39:03 2019] Completed Thread-3 (4 secs)
(remaining: NONE)
ALL DONE AT: Tue May 14 20:39:03 2019
threading.currentThread()从2.6版本开始重命名为threading.current_thread(),不过为了保持后向兼容性,旧的写法仍然保留了下来
信号量示例
信号量是一个计数器,当资源消耗时递减,当资源释放时递增。
#使用锁和信号量模拟一个糖果机
#python 3.6
from atexit import register
from random import randrange
from threading import BoundedSemaphore,Lock,Thread#增加了信号量
from time import sleep,ctime
#3个全局变量
lock = Lock()#锁
MAX = 5 #表示库存糖果最大值的常量
candytray = BoundedSemaphore(MAX)#‘糖果托盘’,一个信号量
#向库存中添加糖果。这段代码是一个临界区,输出用户的行动,并在糖果超过最大库存的时候给出警告
def refill():
lock.acquire()
print('重装糖果……')
try:
candytray.release()
except ValueError:
print('满了,跳过')
else:
print('成功')
lock.release()
#购买糖果。也是一个临界区,效果和refill函数相反
def buy():
lock.acquire()
print('购买糖果中………')
#检查是否所有的资源都已经消费完。
#计数器的值不能小于0,所以这个调用一般会在计数器再次增加之前被阻塞。传入非阻塞标志False,让调用不再阻塞,而在应当阻塞的时候返回一个false,表示没有更多资源了。
if candytray.acquire(False):
print('成功')
else:
print('空,跳过')
lock.release()
#模拟糖果机的所有者
def producer(loops):
for i in range(loops):
refill()
sleep(randrange(3))
#模拟消费者
def consumer(loops):
for i in range(loops):
buy()
sleep(randrange(3))
#_main表示从命令行执行
def _main():
print('开始于:',ctime())
nloops = randrange(2,6)
print('糖果机(一共 %s 个槽)' % MAX)
#创建消费者和所有者线程
#其中消费者线程中,增加了额外的操作,用于随机给出正偏差,使得消费者真正消费的糖果数可能会比供应者放入机器的更多;否则代码永远不会进入消费者尝试从空机器购买糖果的情况
Thread(target = consumer,args = (randrange(nloops,nloops+MAX+2),)).start()#
Thread(target = producer,args = (nloops,)).start()
#注册退出函数
@register
def _atexit():
print('结束于:',ctime())
if __name__ == '__main__':
_main()
输出:
C:\Users\Satan\AppData\Local\Programs\Python\Python36\python.exe "E:/PyCharm Project/MT/candy.py"
开始于: Tue May 14 23:03:14 2019
糖果机(一共 5 个槽)
购买糖果中………
成功
重装糖果……
成功
购买糖果中………
成功
购买糖果中………
成功
重装糖果……
成功
购买糖果中………
成功
重装糖果……
成功
购买糖果中………
成功
购买糖果中………
成功
购买糖果中………
成功
购买糖果中………
成功
购买糖果中………
空,跳过
结束于: Tue May 14 23:03:25 2019
threading模块包括两种信号量类:Semaphore和BoundedSemaphore。
- Semaphore是一个计数器,当资源消耗时递减(调用acquire),计数器会减1;当资源释放是递增(调用release),计数器会加1。计数器的值不会小于0;当等于0的时候,再调用acquire会阻塞,直到其他线程调用release为止。可以认为信号量代表他们的资源可用或不可用。
- BoundedSemaphore也是一个计数器,它额外功能是返回一个新的有界信号量对象,计数器的值永远不会超过它的初始值,换句话说,它可以防范其中信号量释放次数多于获得次数的异常用例。