线程,进程,锁

1.介绍

1.1 GIL:全局解释器锁,因为他的存在,python没有真正意义的多线程,在同一时刻,python进程中只有一个线程在执行。

1.2 Gil是cpython引入的特性,cpython是大部分python默认的执行环境,所以真实的python存在gil实际上指的是cpython下的gil,jpython便没有gil

1.3 进程就好比工厂的车间,它代表CPU所能处理的单个任务,线程就好比车间里的工人。一个进程可以包括多个线程.车间的空间是工人们共享的,比如许多房间是每个工人都可以进出的。这象征一个进程的内存空间是共享的,每个线程都可以使用这些共享内存.可是,每间房间的大小不同,有些房间最多只能容纳一个人,比如厕所。里面有人的时候,其他人就不能进去了。这代表一个线程使用某些共享内存时,其他线程必须等它结束,才能使用这一块内存.一个防止他人进入的简单方法,就是门口加一把锁。先到的人锁上门,后到的人看到上锁,就在门口排队,等锁打开再进去。这就叫“互斥锁”(Mutual exclusion,缩写 Mutex),防止多个线程同时读写某一块内存区域. 知乎上的一片介绍线程,进程相关知识讲的很好。 线程进程解释

1.4 多进程multiprocessing,多线程threading,multiprocessing.dummy模块也是多线程

1.5 IO密集型使用多线程,threading,cpu密集型使用多进程multiprocessing。

1.6 由于gil的存在,同一时刻,只有一个线程在运行,所以python只有并发操作,没有并行

1.7 队列Queue()里集成了锁的基本设置,如果queue的为空,则get元素的时候会被阻塞,知道队列里面被其他线程写入数据

  1. 代码 2.1 threading 2.1.1 继承threading.Thread类

    import time
    import threading
    
    class MyThread(threading.Thread):
        def __init__(self):
            super(MyThread,self).__init__()
        def run(self):
            for i in range(2):
                print 'thread {}, @number: {}'.format(self.name, i)
                time.sleep(1)
    
    def main():
        print "Start main threading"
        # 创建三个线程
        threads = [MyThread() for i in range(3)]
        # 启动三个线程
        for t in threads:
            t.start()
        print "End Main threading"
        
    if __name__ == '__main__':
        main()
        

结果:


    Start main threading
    thread Thread-1, @number: 0
    thread Thread-2, @number: 0
     End Main threading
    thread Thread-3, @number: 0
    thread Thread-1, @number: 1thread Thread-2, @number: 1
    thread Thread-3, @number: 1
    
    [Finished in 2.1s]
启动三个线程,主进程结束了其他线程还没有结束
如果想要主进程结束其他线程也结束 设置setdaemon(True).

    class MyThread(threading.Thread):
        def __init__(self):
            super(MyThread,self).__init__()
        def run(self):
            for i in range(2):
                print 'thread {}, @number: {}'.format(self.name, i)
                time.sleep(1)
    
    def main():
        print "Start main threading"
        # 创建三个线程
        threads = [MyThread() for i in range(3)]
        # 启动三个线程
        for t in threads:
            t.setDaemon(True)
            t.start() 
        print "End Main threading"
    if __name__ == '__main__':
        main()

结果:


    Start main threading
    thread Thread-1, @number: 0
    thread Thread-2, @number: 0
     thread Thread-3, @number: 0
    End Main threading
    [Finished in 0.1s]
想所有线程结束了主线程在结束需要阻塞主线程

    class MyThread(threading.Thread):
        def __init__(self):
            super(MyThread,self).__init__()
        def run(self):
            for i in range(2):
                print 'thread {}, @number: {}'.format(self.name, i)
                time.sleep(1)
    
    def main():
        print "Start main threading"
        # 创建三个线程
        threads = [MyThread() for i in range(3)]
        # 启动三个线程
        for t in threads:
            t.start()
        for t in threads:
            t.join()
        print "End Main threading"
    if __name__ == '__main__':
        main()

结果:


    Start main threading
    thread Thread-1, @number: 0
    thread Thread-2, @number: 0
    thread Thread-3, @number: 0
    thread Thread-1, @number: 1thread Thread-2, @number: 1
    thread Thread-3, @number: 1
    End Main threading
    [Finished in 2.1s]

如果在短时间同时并行读取修改共享资源里的数据很可能造成数据不同步避免线程不同步造成是数据不同步可以对资源进行加锁.加锁主要可以使各个线程的数据同步.
比如计数器

    class MyThread(threading.Thread):
        def __init__(self,mutex):
            super(MyThread,self).__init__()
            self.mutex = mutex
        def run(self):
            global num
            mutex.acquire()
            for i in range(2):
                num += 1
                print 'thread {}, @number: {},@num:{}'.format(self.name, i,num)
                time.sleep(1)
            mutex.release()
    mutex = threading.Lock()
    num = 0
    def main():
        print "Start main threading"
        # 创建三个线程
        threads = [MyThread(mutex) for i in range(3)]
        # 启动三个线程
        for t in threads:
            t.start()
        for t in threads:
            t.join()
        print "End Main threading"
    if __name__ == '__main__':
        main()

结果:

Start main threading
thread Thread-1, @number: 0,@num:1
thread Thread-1, @number: 1,@num:2
thread Thread-2, @number: 0,@num:3
thread Thread-2, @number: 1,@num:4
thread Thread-3, @number: 0,@num:5
thread Thread-3, @number: 1,@num:6
End Main threading
[Finished in 6.1s]

Queue模块已经设置了对锁的操作,如果queue的为空,则get元素的时候会被阻塞,知道队列里面被其他线程写入数据:


    import Queue
    import threading
    import urllib2
    import time
    hosts = ["http://yahoo.com", "http://amazon.com",
    "http://ibm.com", "http://apple.com"]
    queue = Queue.Queue()
    class ThreadUrl(threading.Thread):
        def __init__(self, queue):
          threading.Thread.__init__(self)
          self.queue = queue
        def run(self):
          while True:
            if queue.empty():
                break
            host = self.queue.get()
            print 'thread {}, host {}'.format(self.name,host)
            #signals to queue job is done
            self.queue.task_done()
    def main():
        for host in hosts:
            queue.put(host)
        threads = [ThreadUrl(queue) for i in range(2)]
        for t in threads:
            t.start()
        for t in threads:
            t.join()  
    main()

结果:

thread Thread-1, host http://yahoo.com
 thread Thread-2, host http://amazon.com
thread Thread-1, host http://ibm.com
thread Thread-2, host http://apple.com
[Finished in 0.1s]

queue.task_done()主要是发送队列中某个任务结束信号

2.1.2 使用threading.Thread对象


    def func(num):
        sum = 0
        while num:
            sum += num
            num -= 1
        print sum,'\n'
    num = 10
    def main():
        threads = [threading.Thread(target=func,args=(num,)) for i in range(5)]
        for t in threads:
            t.start()
        for t in threads:
            t.join()
    main()
结果:
55 
55 
55 
55 
55 
[Finished in 0.1s]

2.2 multiprocessing


    import multiprocessing
    import time
     
    def func(msg):
        for i in xrange(3):
            print msg
            time.sleep(1)
        return "done " + msg
     
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes=4)
        result = []
        for i in xrange(10):
            msg = "hello %d" %(i)
            result.append(pool.apply_async(func, (msg, )))
        pool.close()
        pool.join()
        for res in result:
            print res.get()
        print "Sub-process(es) done."
        

所有任务都执行完之后一定要记得用close()和join()回收进程.

3 通用的多线程代码

#!/usr/bin/env python
#coding=utf8

import Queue
import threading
import time
import pdb
import sys
import urllib2
import json
import os
import cookielib
import traceback

reload(sys)
sys.setdefaultencoding('utf8')

class WorkManager(object):
    def __init__(self, work_num=1000,thread_num=2):
        self.work_queue = Queue.Queue()
        self.threads = []
        self.__init_work_queue(work_num)
        self.__init_thread_pool(thread_num)

    """
        初始化线程
    """
    def __init_thread_pool(self,thread_num):
        for i in range(thread_num):
            self.threads.append(Work(self.work_queue,i))

    """
        初始化工作队列
    """
    def __init_work_queue(self, jobs_num):
        for i in range(jobs_num):
            url = '***'+str(i)
            self.add_job(do_job, url)
    """
        添加一项工作入队
    """
    def add_job(self, func, *args):
        self.work_queue.put((func, list(args)))#任务入队,Queue内部实现了同步机制

    """
        检查剩余队列任务
    """
    def check_queue(self):
        return self.work_queue.qsize()
    """
        等待所有线程运行完毕
    """
    def wait_allcomplete(self):
        for item in self.threads:
            if item.isAlive():item.join()

class Work(threading.Thread):
    def __init__(self, work_queue,i):
        threading.Thread.__init__(self)
        self.work_queue = work_queue
        self.num = i
        self.start()

    def run(self):
        #死循环,从而让创建的线程在一定条件下关闭退出
        while True:
            if self.work_queue.empty():
                print '队列为空','程序退出'
                break
            try:
                do, args = self.work_queue.get(block=False)#任务异步出队,Queue内部实现了同步机制
                do(args)
                self.work_queue.task_done()#通知系统任务完成
            except Exception,e:
                traceback.print_exc()
                print '线程有误',str(e)
                break

#具体要做的任务
def do_job(args, retries=3, Connection='Connection'):
    # pdb.set_trace()
    headers = {Connection: 'keep-alive', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.155 Safari/537.36'}
    url = args[0]
    print threading.current_thread(), list(args)

    req  = urllib2.Request(url, headers = headers)
    try:
        page = urllib2.urlopen(req, timeout=100).read()
    except Exception,e:
        # print str(e)
        # print '超时重传', '还剩下', retries, '次'
        if retries:
            page = do_job(args, retries = retries-1)
        else:
            page = '0'
    time.sleep(0.1)#模拟处理时间


if __name__ == '__main__':
    start = time.time()
    work_manager =  WorkManager(10, 2)#或者work_manager =  WorkManager(10000, 20)
    work_manager.wait_allcomplete()
    print '剩余队列',work_manager.check_queue()
    # print 
    end = time.time()
    print "cost all time: %s" % (end-start)

解释

类WorkManager是一个管理者,管理线程池和任务队列,类Work是具体的一个线程。给WorkManager分配指定的任务量和线程数,每个线程都从任务队列中获取任务来执行,直到队列中没有任务。 多用在多线程并行抓取任务上。
这里的do_job()函数是一个抓取网页代码。


版权申明

知识共享许可协议
本作品采用知识共享署名-非商业性使用 4.0 国际许可协议进行许可。 转载文章请注明原文出处。

天道酬勤
评分4.8/5 based on 20