1.介绍
1.1 GIL:全局解释器锁,因为他的存在,python没有真正意义的多线程,在同一时刻,python进程中只有一个线程在执行。
1.2 Gil是cpython引入的特性,cpython是大部分python默认的执行环境,所以真实的python存在gil实际上指的是cpython下的gil,jpython便没有gil
1.3 进程就好比工厂的车间,它代表CPU所能处理的单个任务,线程就好比车间里的工人。一个进程可以包括多个线程.车间的空间是工人们共享的,比如许多房间是每个工人都可以进出的。这象征一个进程的内存空间是共享的,每个线程都可以使用这些共享内存.可是,每间房间的大小不同,有些房间最多只能容纳一个人,比如厕所。里面有人的时候,其他人就不能进去了。这代表一个线程使用某些共享内存时,其他线程必须等它结束,才能使用这一块内存.一个防止他人进入的简单方法,就是门口加一把锁。先到的人锁上门,后到的人看到上锁,就在门口排队,等锁打开再进去。这就叫“互斥锁”(Mutual exclusion,缩写 Mutex),防止多个线程同时读写某一块内存区域. 知乎上的一片介绍线程,进程相关知识讲的很好。 线程进程解释
1.4 多进程multiprocessing,多线程threading,multiprocessing.dummy模块也是多线程
1.5 IO密集型使用多线程,threading,cpu密集型使用多进程multiprocessing。
1.6 由于gil的存在,同一时刻,只有一个线程在运行,所以python只有并发操作,没有并行
1.7 队列Queue()里集成了锁的基本设置,如果queue的为空,则get元素的时候会被阻塞,知道队列里面被其他线程写入数据
- 代码 2.1 threading 2.1.1 继承threading.Thread类
import time
import threading
class MyThread(threading.Thread):
def __init__(self):
super(MyThread,self).__init__()
def run(self):
for i in range(2):
print 'thread {}, @number: {}'.format(self.name, i)
time.sleep(1)
def main():
print "Start main threading"
# 创建三个线程
threads = [MyThread() for i in range(3)]
# 启动三个线程
for t in threads:
t.start()
print "End Main threading"
if __name__ == '__main__':
main()
结果:
Start main threading
thread Thread-1, @number: 0
thread Thread-2, @number: 0
End Main threading
thread Thread-3, @number: 0
thread Thread-1, @number: 1thread Thread-2, @number: 1
thread Thread-3, @number: 1
[Finished in 2.1s]
启动三个线程,主进程结束了,其他线程还没有结束。
如果想要主进程结束,其他线程也结束,则 设置setdaemon(True).
class MyThread(threading.Thread):
def __init__(self):
super(MyThread,self).__init__()
def run(self):
for i in range(2):
print 'thread {}, @number: {}'.format(self.name, i)
time.sleep(1)
def main():
print "Start main threading"
# 创建三个线程
threads = [MyThread() for i in range(3)]
# 启动三个线程
for t in threads:
t.setDaemon(True)
t.start()
print "End Main threading"
if __name__ == '__main__':
main()
结果:
Start main threading
thread Thread-1, @number: 0
thread Thread-2, @number: 0
thread Thread-3, @number: 0
End Main threading
[Finished in 0.1s]
想所有线程结束了,主线程在结束,需要阻塞主线程
class MyThread(threading.Thread):
def __init__(self):
super(MyThread,self).__init__()
def run(self):
for i in range(2):
print 'thread {}, @number: {}'.format(self.name, i)
time.sleep(1)
def main():
print "Start main threading"
# 创建三个线程
threads = [MyThread() for i in range(3)]
# 启动三个线程
for t in threads:
t.start()
for t in threads:
t.join()
print "End Main threading"
if __name__ == '__main__':
main()
结果:
Start main threading
thread Thread-1, @number: 0
thread Thread-2, @number: 0
thread Thread-3, @number: 0
thread Thread-1, @number: 1thread Thread-2, @number: 1
thread Thread-3, @number: 1
End Main threading
[Finished in 2.1s]
如果在短时间“同时并行”读取修改共享资源里的数据,很可能造成数据不同步,避免线程不同步造成是数据不同步,可以对资源进行加锁.加锁主要可以使各个线程的数据同步.
比如计数器,
class MyThread(threading.Thread):
def __init__(self,mutex):
super(MyThread,self).__init__()
self.mutex = mutex
def run(self):
global num
mutex.acquire()
for i in range(2):
num += 1
print 'thread {}, @number: {},@num:{}'.format(self.name, i,num)
time.sleep(1)
mutex.release()
mutex = threading.Lock()
num = 0
def main():
print "Start main threading"
# 创建三个线程
threads = [MyThread(mutex) for i in range(3)]
# 启动三个线程
for t in threads:
t.start()
for t in threads:
t.join()
print "End Main threading"
if __name__ == '__main__':
main()
结果:
Start main threading
thread Thread-1, @number: 0,@num:1
thread Thread-1, @number: 1,@num:2
thread Thread-2, @number: 0,@num:3
thread Thread-2, @number: 1,@num:4
thread Thread-3, @number: 0,@num:5
thread Thread-3, @number: 1,@num:6
End Main threading
[Finished in 6.1s]
Queue模块已经设置了对锁的操作,如果queue的为空,则get元素的时候会被阻塞,知道队列里面被其他线程写入数据:
import Queue
import threading
import urllib2
import time
hosts = ["http://yahoo.com", "http://amazon.com",
"http://ibm.com", "http://apple.com"]
queue = Queue.Queue()
class ThreadUrl(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
if queue.empty():
break
host = self.queue.get()
print 'thread {}, host {}'.format(self.name,host)
#signals to queue job is done
self.queue.task_done()
def main():
for host in hosts:
queue.put(host)
threads = [ThreadUrl(queue) for i in range(2)]
for t in threads:
t.start()
for t in threads:
t.join()
main()
结果:
thread Thread-1, host http://yahoo.com
thread Thread-2, host http://amazon.com
thread Thread-1, host http://ibm.com
thread Thread-2, host http://apple.com
[Finished in 0.1s]
queue.task_done()主要是发送队列中某个任务结束信号
2.1.2 使用threading.Thread对象
def func(num):
sum = 0
while num:
sum += num
num -= 1
print sum,'\n'
num = 10
def main():
threads = [threading.Thread(target=func,args=(num,)) for i in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
main()
结果:
55
55
55
55
55
[Finished in 0.1s]
2.2 multiprocessing
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
return "done " + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = []
for i in xrange(10):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
pool.close()
pool.join()
for res in result:
print res.get()
print "Sub-process(es) done."
所有任务都执行完之后一定要记得用close()和join()回收进程.
3 通用的多线程代码
#!/usr/bin/env python
#coding=utf8
import Queue
import threading
import time
import pdb
import sys
import urllib2
import json
import os
import cookielib
import traceback
reload(sys)
sys.setdefaultencoding('utf8')
class WorkManager(object):
def __init__(self, work_num=1000,thread_num=2):
self.work_queue = Queue.Queue()
self.threads = []
self.__init_work_queue(work_num)
self.__init_thread_pool(thread_num)
"""
初始化线程
"""
def __init_thread_pool(self,thread_num):
for i in range(thread_num):
self.threads.append(Work(self.work_queue,i))
"""
初始化工作队列
"""
def __init_work_queue(self, jobs_num):
for i in range(jobs_num):
url = '***'+str(i)
self.add_job(do_job, url)
"""
添加一项工作入队
"""
def add_job(self, func, *args):
self.work_queue.put((func, list(args)))#任务入队,Queue内部实现了同步机制
"""
检查剩余队列任务
"""
def check_queue(self):
return self.work_queue.qsize()
"""
等待所有线程运行完毕
"""
def wait_allcomplete(self):
for item in self.threads:
if item.isAlive():item.join()
class Work(threading.Thread):
def __init__(self, work_queue,i):
threading.Thread.__init__(self)
self.work_queue = work_queue
self.num = i
self.start()
def run(self):
#死循环,从而让创建的线程在一定条件下关闭退出
while True:
if self.work_queue.empty():
print '队列为空','程序退出'
break
try:
do, args = self.work_queue.get(block=False)#任务异步出队,Queue内部实现了同步机制
do(args)
self.work_queue.task_done()#通知系统任务完成
except Exception,e:
traceback.print_exc()
print '线程有误',str(e)
break
#具体要做的任务
def do_job(args, retries=3, Connection='Connection'):
# pdb.set_trace()
headers = {Connection: 'keep-alive', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.155 Safari/537.36'}
url = args[0]
print threading.current_thread(), list(args)
req = urllib2.Request(url, headers = headers)
try:
page = urllib2.urlopen(req, timeout=100).read()
except Exception,e:
# print str(e)
# print '超时重传', '还剩下', retries, '次'
if retries:
page = do_job(args, retries = retries-1)
else:
page = '0'
time.sleep(0.1)#模拟处理时间
if __name__ == '__main__':
start = time.time()
work_manager = WorkManager(10, 2)#或者work_manager = WorkManager(10000, 20)
work_manager.wait_allcomplete()
print '剩余队列',work_manager.check_queue()
# print
end = time.time()
print "cost all time: %s" % (end-start)
解释
类WorkManager是一个管理者,管理线程池和任务队列,类Work是具体的一个线程。给WorkManager分配指定的任务量和线程数,每个线程都从任务队列中获取任务来执行,直到队列中没有任务。 多用在多线程并行抓取任务上。
这里的do_job()函数是一个抓取网页代码。