搞定python多线程和多进程

xiaoxiao2021-02-28  121

1 概念梳理: 1.1 线程

1.1.1 什么是线程

线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。一个线程是一个execution context(执行上下文),即一个cpu执行时所需要的一串指令。

1.1.2 线程的工作方式

假设你正在读一本书,没有读完,你想休息一下,但是你想在回来时恢复到当时读的具体进度。有一个方法就是记下页数、行数与字数这三个数值,这些数值就是execution context。如果你的室友在你休息的时候,使用相同的方法读这本书。你和她只需要这三个数字记下来就可以在交替的时间共同阅读这本书了。

线程的工作方式与此类似。CPU会给你一个在同一时间能够做多个运算的幻觉,实际上它在每个运算上只花了极少的时间,本质上CPU同一时刻只干了一件事。它能这样做就是因为它有每个运算的execution context。就像你能够和你朋友共享同一本书一样,多任务也能共享同一块CPU。

1.2 进程

一个程序的执行实例就是一个进程。每一个进程提供执行程序所需的所有资源。(进程本质上是资源的集合)

一个进程有一个虚拟的地址空间、可执行的代码、操作系统的接口、安全的上下文(记录启动该进程的用户和权限等等)、唯一的进程ID、环境变量、优先级类、最小和最大的工作空间(内存空间),还要有至少一个线程。

每一个进程启动时都会最先产生一个线程,即主线程。然后主线程会再创建其他的子线程。

与进程相关的资源包括:

内存页(同一个进程中的所有线程共享同一个内存空间) 文件描述符(e.g. open sockets) 安全凭证(e.g.启动该进程的用户ID) 1.3 进程与线程区别

1.同一个进程中的线程共享同一内存空间,但是进程之间是独立的。 2.同一个进程中的所有线程的数据是共享的(进程通讯),进程之间的数据是独立的。 3.对主线程的修改可能会影响其他线程的行为,但是父进程的修改(除了删除以外)不会影响其他子进程。 4.线程是一个上下文的执行指令,而进程则是与运算相关的一簇资源。 5.同一个进程的线程之间可以直接通信,但是进程之间的交流需要借助中间代理来实现。 6.创建新的线程很容易,但是创建新的进程需要对父进程做一次复制。 7.一个线程可以操作同一进程的其他线程,但是进程只能操作其子进程。 8.线程启动速度快,进程启动速度慢(但是两者运行速度没有可比性)。

2 多线程 2.1 线程常用方法

方法 注释 start() 线程准备就绪,等待CPU调度 setName() 为线程设置名称 getName() 获取线程名称 setDaemon(True) 设置为守护线程 join() 逐个执行每个线程,执行完毕后继续往下执行 run() 线程被cpu调度后自动执行线程对象的run方法,如果想自定义线程类,直接重写run方法就行了 2.1.1 Thread类

1.普通创建方式

import threading import time

def run(n): print(“task”, n) time.sleep(1) print(‘2s’) time.sleep(1) print(‘1s’) time.sleep(1) print(‘0s’) time.sleep(1)

t1 = threading.Thread(target=run, args=(“t1”,)) t2 = threading.Thread(target=run, args=(“t2”,)) t1.start() t2.start()

“”” task t1 task t2 2s 2s 1s 1s 0s 0s “”” 2.继承threading.Thread来自定义线程类 其本质是重构Thread类中的run方法

import threading import time

class MyThread(threading.Thread): def init(self, n): super(MyThread, self).init() # 重构run函数必须要写 self.n = n

def run(self): print("task", self.n) time.sleep(1) print('2s') time.sleep(1) print('1s') time.sleep(1) print('0s') time.sleep(1)

if name == “main“: t1 = MyThread(“t1”) t2 = MyThread(“t2”)

t1.start() t2.start()

2.1.2 计算子线程执行的时间

注:sleep的时候是不会占用cpu的,在sleep的时候操作系统会把线程暂时挂起。

join() #等此线程执行完后,再执行其他线程或主线程 threading.current_thread() #输出当前线程 import threading import time

def run(n): print(“task”, n,threading.current_thread()) #输出当前的线程 time.sleep(1) print(‘3s’) time.sleep(1) print(‘2s’) time.sleep(1) print(‘1s’)

strat_time = time.time()

t_obj = [] #定义列表用于存放子线程实例

for i in range(3): t = threading.Thread(target=run, args=(“t-%s” % i,)) t.start() t_obj.append(t)

“”” 由主线程生成的三个子线程 task t-0

实测:在python2.7、mac os下,运行以下代码可能会产生脏数据。但是在python3中就不一定会出现下面的问题。

import threading import time

def run(n): global num num += 1

num = 0 t_obj = []

for i in range(20000): t = threading.Thread(target=run, args=(“t-%s” % i,)) t.start() t_obj.append(t)

for t in t_obj: t.join()

print “num:”, num “”” 产生脏数据后的运行结果: num: 19999 “”” 2.5 互斥锁(mutex)

为了方式上面情况的发生,就出现了互斥锁(Lock)

import threading import time

def run(n): lock.acquire() #获取锁 global num num += 1 lock.release() #释放锁

lock = threading.Lock() #实例化一个锁对象

num = 0 t_obj = []

for i in range(20000): t = threading.Thread(target=run, args=(“t-%s” % i,)) t.start() t_obj.append(t)

for t in t_obj: t.join()

print “num:”, num 2.6 递归锁

RLcok类的用法和Lock类一模一样,但它支持嵌套,,在多个锁没有释放的时候一般会使用使用RLcok类。

import threading import time

gl_num = 0

lock = threading.RLock()

def Func(): lock.acquire() global gl_num gl_num +=1 time.sleep(1) print gl_num lock.release()

for i in range(10): t = threading.Thread(target=Func) t.start() 2.7 信号量(BoundedSemaphore类)

互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

import threading import time

def run(n): semaphore.acquire() #加锁 time.sleep(1) print(“run the thread:%s\n” % n) semaphore.release() #释放

num = 0 semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程同时运行

for i in range(22): t = threading.Thread(target=run, args=(“t-%s” % i,)) t.start()

while threading.active_count() != 1: pass # print threading.active_count() else: print(‘—–all threads done—–’) 2.8 事件(Event类) python线程的事件用于主线程控制其他线程的执行,事件是一个简单的线程同步对象,其主要提供以下几个方法:

方法 注释 clear 将flag设置为“False” set 将flag设置为“True” is_set 判断是否设置了flag wait 会一直监听flag,如果没有检测到flag就一直处于阻塞状态 事件处理的机制:全局定义了一个“Flag”,当flag值为“False”,那么event.wait()就会阻塞,当flag值为“True”,那么event.wait()便不再阻塞。

利用Event类模拟红绿灯

import threading import time

event = threading.Event()

def lighter(): count = 0 event.set() #初始值为绿灯 while True: if 5 < count <=10 : event.clear() # 红灯,清除标志位 print(“\33[41;1mred light is on…\033[0m”) elif count > 10: event.set() # 绿灯,设置标志位 count = 0 else: print(“\33[42;1mgreen light is on…\033[0m”)

time.sleep(1) count += 1

def car(name): while True: if event.is_set(): #判断是否设置了标志位 print(“[%s] running…”%name) time.sleep(1) else: print(“[%s] sees red light,waiting…”%name) event.wait() print(“[%s] green light is on,start going…”%name)

light = threading.Thread(target=lighter,) light.start()

car = threading.Thread(target=car,args=(“MINI”,)) car.start() 2.9 条件(Condition类)

使得线程等待,只有满足某条件时,才释放n个线程

2.10 定时器(Timer类)

定时器,指定n秒后执行某操作

from threading import Timer

def hello(): print(“hello, world”)

t = Timer(1, hello) t.start() # after 1 seconds, “hello, world” will be printed 3 多进程 在linux中,每个进程都是由父进程提供的。每启动一个子进程就从父进程克隆一份数据,但是进程之间的数据本身是不能共享的。

from multiprocessing import Process import time def f(name): time.sleep(2) print(‘hello’, name)

if name == ‘main‘: p = Process(target=f, args=(‘bob’,)) p.start() p.join() from multiprocessing import Process import os

def info(title): print(title) print(‘module name:’, name) print(‘parent process:’, os.getppid()) #获取父进程id print(‘process id:’, os.getpid()) #获取自己的进程id print(“\n\n”)

def f(name): info(‘\033[31;1mfunction f\033[0m’) print(‘hello’, name)

if name == ‘main‘: info(‘\033[32;1mmain process line\033[0m’) p = Process(target=f, args=(‘bob’,)) p.start() p.join() 3.1 进程间通信

由于进程之间数据是不共享的,所以不会出现多线程GIL带来的问题。多进程之间的通信通过Queue()或Pipe()来实现

3.1.1 Queue()

使用方法跟threading里的queue差不多

from multiprocessing import Process, Queue

def f(q): q.put([42, None, ‘hello’])

if name == ‘main‘: q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints “[42, None, ‘hello’]” p.join() 3.1.2 Pipe()

Pipe的本质是进程之间的数据传递,而不是数据共享,这和socket有点像。pipe()返回两个连接对象分别表示管道的两端,每端都有send()和recv()方法。如果两个进程试图在同一时间的同一端进行读取和写入那么,这可能会损坏管道中的数据。

from multiprocessing import Process, Pipe

def f(conn): conn.send([42, None, ‘hello’]) conn.close()

if name == ‘main‘: parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints “[42, None, ‘hello’]” p.join() 3.2 Manager

通过Manager可实现进程间数据的共享。Manager()返回的manager对象会通过一个服务进程,来使其他进程通过代理的方式操作python对象。manager对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array.

from multiprocessing import Process, Manager

def f(d, l): d[1] = ‘1’ d[‘2’] = 2 d[0.25] = None l.append(1) print(l)

if name == ‘main‘: with Manager() as manager: d = manager.dict()

l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d, l)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)

3.3 进程锁(进程同步)

数据输出的时候保证不同进程的输出内容在同一块屏幕正常显示,防止数据乱序的情况。 Without using the lock output from the different processes is liable to get all mixed up.

from multiprocessing import Process, Lock

def f(l, i): l.acquire() try: print(‘hello world’, i) finally: l.release()

if name == ‘main‘: lock = Lock()

for num in range(10): Process(target=f, args=(lock, num)).start()

3.4 进程池

由于进程启动的开销比较大,使用多进程的时候会导致大量内存空间被消耗。为了防止这种情况发生可以使用进程池,(由于启动线程的开销比较小,所以不需要线程池这种概念,多线程只会频繁得切换cpu导致系统变慢,并不会占用过多的内存空间)

进程池中常用方法: apply() 同步执行(串行) apply_async() 异步执行(并行) terminate() 立刻关闭进程池 join() 主进程等待所有子进程执行完毕。必须在close或terminate()之后。 close() 等待所有进程结束后,才关闭进程池。

from multiprocessing import Process,Pool import time

def Foo(i): time.sleep(2) return i+100

def Bar(arg): print(‘–>exec done:’,arg)

pool = Pool(5) #允许进程池同时放入5个进程

for i in range(10): pool.apply_async(func=Foo, args=(i,),callback=Bar) #func子进程执行完后,才会执行callback,否则callback不执行(而且callback是由父进程来执行了) #pool.apply(func=Foo, args=(i,))

print(‘end’) pool.close() pool.join() #主进程等待所有子进程执行完毕。必须在close()或terminate()之后。 进程池内部维护一个进程序列,当使用时,去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。在上面的程序中产生了10个进程,但是只能有5同时被放入进程池,剩下的都被暂时挂起,并不占用内存空间,等前面的五个进程执行完后,再执行剩下5个进程。

4 补充:协程 线程和进程的操作是由程序触发系统接口,最后的执行者是系统,它本质上是操作系统提供的功能。而协程的操作则是程序员指定的,在python中通过yield,人为的实现并发处理。

协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时。协程,则只使用一个线程,分解一个线程成为多个“微线程”,在一个线程中规定某个代码块的执行顺序。

协程的适用场景:当程序中存在大量不需要CPU的操作时(IO)。 常用第三方模块gevent和greenlet。(本质上,gevent是对greenlet的高级封装,因此一般用它就行,这是一个相当高效的模块。)

4.1 greenlet

from greenlet import greenlet

def test1(): print(12) gr2.switch() print(34) gr2.switch()

def test2(): print(56) gr1.switch() print(78)

gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch() 实际上,greenlet就是通过switch方法在不同的任务之间进行切换。

4.2 gevent

from gevent import monkey; monkey.patch_all() import gevent import requests

def f(url): print(‘GET: %s’ % url) resp = requests.get(url) data = resp.text print(‘%d bytes received from %s.’ % (len(data), url))

gevent.joinall([ gevent.spawn(f, ‘https://www.python.org/‘), gevent.spawn(f, ‘https://www.yahoo.com/‘), gevent.spawn(f, ‘https://github.com/‘), ]) 通过joinall将任务f和它的参数进行统一调度,实现单线程中的协程。代码封装层次很高,实际使用只需要了解它的几个主要方法即可。

转载请注明原文地址: https://www.6miu.com/read-23711.html

最新回复(0)