线程,进程,锁

1.介绍

1.1 GIL:全局解释器锁,因为他的存在,python没有真正意义的多线程,在同一时刻,python进程中只有一个线程在执行。

1.2 Gil是cpython引入的特性,cpython是大部分python默认的执行环境,所以真实的python存在gil实际上指的是cpython下的gil,jpython便没有gil

1.3 进程就好比工厂的车间,它代表CPU所能处理的单个任务,线程就好比车间里的工人。一个进程可以包括多个线程.车间的空间是工人们共享的,比如许多房间是每个工人都可以进出的。这象征一个进程的内存空间是共享的,每个线程都可以使用这些共享内存.可是,每间房间的大小不同,有些房间最多只能容纳一个人,比如厕所。里面有人的时候,其他人就不能进去了。这代表一个线程使用某些共享内存时,其他线程必须等它结束,才能使用这一块内存.一个防止他人进入的简单方法,就是门口加一把锁。先到的人锁上门,后到的人看到上锁,就在门口排队,等锁打开再进去。这就叫“互斥锁”(Mutual exclusion,缩写 Mutex),防止多个线程同时读写某一块内存区域. 知乎上的一片介绍线程,进程相关知识讲的很好。 线程进程解释

1.4 多进程multiprocessing,多线程threading,multiprocessing.dummy模块也是多线程

1.5 IO密集型使用多线程,threading,cpu密集型使用多进程multiprocessing。

1.6 由于gil的存在,同一时刻,只有一个线程在运行,所以python只有并发操作,没有并行

1.7 队列Queue()里集成了锁的基本设置,如果queue的为空,则get元素的时候会被阻塞,知道队列里面被其他线程写入数据

  1. 代码 2.1 threading 2.1.1 继承threading.Thread类

    import time
    import threading
    
    class MyThread(threading.Thread):
        def __init__(self):
            super(MyThread,self).__init__()
        def run(self):
            for i in range(2):
                print 'thread {}, @number: {}'.format(self.name, i)
                time.sleep(1)
    
    def main():
        print "Start main threading"
        # 创建三个线程
        threads = [MyThread() for i in range(3)]
        # 启动三个线程
        for t in threads:
            t.start()
        print "End Main threading"
        
    if __name__ == '__main__':
        main()
        

结果:


    Start main threading
    thread Thread-1, @number: 0
    thread Thread-2, @number: 0
     End Main threading
    thread Thread-3, @number: 0
    thread Thread-1, @number: 1thread Thread-2, @number: 1
    thread Thread-3, @number: 1
    
    [Finished in 2.1s]
启动三个线程,主进程结束了,其他线程还没有结束。
如果想要主进程结束,其他线程也结束,则 设置setdaemon(True).

    class MyThread(threading.Thread):
        def __init__(self):
            super(MyThread,self).__init__()
        def run(self):
            for i in range(2):
                print 'thread {}, @number: {}'.format(self.name, i)
                time.sleep(1)
    
    def main():
        print "Start main threading"
        # 创建三个线程
        threads = [MyThread() for i in range(3)]
        # 启动三个线程
        for t in threads:
            t.setDaemon(True)
            t.start() 
        print "End Main threading"
    if __name__ == '__main__':
        main()

结果:


    Start main threading
    thread Thread-1, @number: 0
    thread Thread-2, @number: 0
     thread Thread-3, @number: 0
    End Main threading
    [Finished in 0.1s]
想所有线程结束了,主线程在结束,需要阻塞主线程

    class MyThread(threading.Thread):
        def __init__(self):
            super(MyThread,self).__init__()
        def run(self):
            for i in range(2):
                print 'thread {}, @number: {}'.format(self.name, i)
                time.sleep(1)
    
    def main():
        print "Start main threading"
        # 创建三个线程
        threads = [MyThread() for i in range(3)]
        # 启动三个线程
        for t in threads:
            t.start()
        for t in threads:
            t.join()
        print "End Main threading"
    if __name__ == '__main__':
        main()

结果:


    Start main threading
    thread Thread-1, @number: 0
    thread Thread-2, @number: 0
    thread Thread-3, @number: 0
    thread Thread-1, @number: 1thread Thread-2, @number: 1
    thread Thread-3, @number: 1
    End Main threading
    [Finished in 2.1s]

如果在短时间“同时并行”读取修改共享资源里的数据,很可能造成数据不同步,避免线程不同步造成是数据不同步,可以对资源进行加锁.加锁主要可以使各个线程的数据同步.
比如计数器,

    class MyThread(threading.Thread):
        def __init__(self,mutex):
            super(MyThread,self).__init__()
            self.mutex = mutex
        def run(self):
            global num
            mutex.acquire()
            for i in range(2):
                num += 1
                print 'thread {}, @number: {},@num:{}'.format(self.name, i,num)
                time.sleep(1)
            mutex.release()
    mutex = threading.Lock()
    num = 0
    def main():
        print "Start main threading"
        # 创建三个线程
        threads = [MyThread(mutex) for i in range(3)]
        # 启动三个线程
        for t in threads:
            t.start()
        for t in threads:
            t.join()
        print "End Main threading"
    if __name__ == '__main__':
        main()

结果:

Start main threading
thread Thread-1, @number: 0,@num:1
thread Thread-1, @number: 1,@num:2
thread Thread-2, @number: 0,@num:3
thread Thread-2, @number: 1,@num:4
thread Thread-3, @number: 0,@num:5
thread Thread-3, @number: 1,@num:6
End Main threading
[Finished in 6.1s]

Queue模块已经设置了对锁的操作,如果queue的为空,则get元素的时候会被阻塞,知道队列里面被其他线程写入数据:


    import Queue
    import threading
    import urllib2
    import time
    hosts = ["http://yahoo.com", "http://amazon.com",
    "http://ibm.com", "http://apple.com"]
    queue = Queue.Queue()
    class ThreadUrl(threading.Thread):
        def __init__(self, queue):
          threading.Thread.__init__(self)
          self.queue = queue
        def run(self):
          while True:
            if queue.empty():
                break
            host = self.queue.get()
            print 'thread {}, host {}'.format(self.name,host)
            #signals to queue job is done
            self.queue.task_done()
    def main():
        for host in hosts:
            queue.put(host)
        threads = [ThreadUrl(queue) for i in range(2)]
        for t in threads:
            t.start()
        for t in threads:
            t.join()  
    main()

结果:

thread Thread-1, host http://yahoo.com
 thread Thread-2, host http://amazon.com
thread Thread-1, host http://ibm.com
thread Thread-2, host http://apple.com
[Finished in 0.1s]

queue.task_done()主要是发送队列中某个任务结束信号

2.1.2 使用threading.Thread对象


    def func(num):
        sum = 0
        while num:
            sum += num
            num -= 1
        print sum,'\n'
    num = 10
    def main():
        threads = [threading.Thread(target=func,args=(num,)) for i in range(5)]
        for t in threads:
            t.start()
        for t in threads:
            t.join()
    main()
结果:
55 
55 
55 
55 
55 
[Finished in 0.1s]

2.2 multiprocessing


    import multiprocessing
    import time
     
    def func(msg):
        for i in xrange(3):
            print msg
            time.sleep(1)
        return "done " + msg
     
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes=4)
        result = []
        for i in xrange(10):
            msg = "hello %d" %(i)
            result.append(pool.apply_async(func, (msg, )))
        pool.close()
        pool.join()
        for res in result:
            print res.get()
        print "Sub-process(es) done."
        

所有任务都执行完之后一定要记得用close()和join()回收进程.

3 通用的多线程代码

#!/usr/bin/env python
#coding=utf8

import Queue
import threading
import time
import pdb
import sys
import urllib2
import json
import os
import cookielib
import traceback

reload(sys)
sys.setdefaultencoding('utf8')

class WorkManager(object):
    def __init__(self, work_num=1000,thread_num=2):
        self.work_queue = Queue.Queue()
        self.threads = []
        self.__init_work_queue(work_num)
        self.__init_thread_pool(thread_num)

    """
        初始化线程
    """
    def __init_thread_pool(self,thread_num):
        for i in range(thread_num):
            self.threads.append(Work(self.work_queue,i))

    """
        初始化工作队列
    """
    def __init_work_queue(self, jobs_num):
        for i in range(jobs_num):
            url = '***'+str(i)
            self.add_job(do_job, url)
    """
        添加一项工作入队
    """
    def add_job(self, func, *args):
        self.work_queue.put((func, list(args)))#任务入队,Queue内部实现了同步机制

    """
        检查剩余队列任务
    """
    def check_queue(self):
        return self.work_queue.qsize()
    """
        等待所有线程运行完毕
    """
    def wait_allcomplete(self):
        for item in self.threads:
            if item.isAlive():item.join()

class Work(threading.Thread):
    def __init__(self, work_queue,i):
        threading.Thread.__init__(self)
        self.work_queue = work_queue
        self.num = i
        self.start()

    def run(self):
        #死循环,从而让创建的线程在一定条件下关闭退出
        while True:
            if self.work_queue.empty():
                print '队列为空','程序退出'
                break
            try:
                do, args = self.work_queue.get(block=False)#任务异步出队,Queue内部实现了同步机制
                do(args)
                self.work_queue.task_done()#通知系统任务完成
            except Exception,e:
                traceback.print_exc()
                print '线程有误',str(e)
                break

#具体要做的任务
def do_job(args, retries=3, Connection='Connection'):
    # pdb.set_trace()
    headers = {Connection: 'keep-alive', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.155 Safari/537.36'}
    url = args[0]
    print threading.current_thread(), list(args)

    req  = urllib2.Request(url, headers = headers)
    try:
        page = urllib2.urlopen(req, timeout=100).read()
    except Exception,e:
        # print str(e)
        # print '超时重传', '还剩下', retries, '次'
        if retries:
            page = do_job(args, retries = retries-1)
        else:
            page = '0'
    time.sleep(0.1)#模拟处理时间


if __name__ == '__main__':
    start = time.time()
    work_manager =  WorkManager(10, 2)#或者work_manager =  WorkManager(10000, 20)
    work_manager.wait_allcomplete()
    print '剩余队列',work_manager.check_queue()
    # print 
    end = time.time()
    print "cost all time: %s" % (end-start)

解释

类WorkManager是一个管理者,管理线程池和任务队列,类Work是具体的一个线程。给WorkManager分配指定的任务量和线程数,每个线程都从任务队列中获取任务来执行,直到队列中没有任务。 多用在多线程并行抓取任务上。
这里的do_job()函数是一个抓取网页代码。


版权申明

知识共享许可协议
本作品采用知识共享署名-非商业性使用 4.0 国际许可协议进行许可。 转载文章请注明原文出处。

天道酬勤
评分4.8/5 based on 20