# GIL
GIL global interpreter lock (cpython),python 中一个线程对应 C 语言的一个线程
GIL 使得同一时刻只有一个线程在一个 CPU 上执行,无法将多个线程映射到多个 CPU 上执行
GIL 会根据执行的字节码行数以及时间片释放 GIL ,在遇到 IO 操作时也会主动释放
运行
1 2 3 4 5 6 7 8 9 10 import threading, multiprocessingdef loop (): x = 0 while True : x = x ^ 1 for i in range (multiprocessing.cpu_count()): t = threading.Thread(target=loop) t.start()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import threadingtotal = 0 def add (): global total for _ in range (100000 ): total += 1 def dec (): global total for _ in range (100000 ): total += 1 thread_add = threading.Thread(target=add) thread_dec = threading.Thread(target=dec) thread_add.start() thread_dec.start() thread_add.join() thread_dec.join() print(total)
# 多线程的实现方式
threading.Thread 实例化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import timeimport threadingdef get_detail_html (url ): print("get detail html started" ) time.sleep(2 ) print("get detail html ended" ) def get_detail_url (url ): print("get detail url started" ) time.sleep(4 ) print("get detail url ended" ) thread1 = threading.Thread(target=get_detail_html, args=('' ,)) thread2 = threading.Thread(target=get_detail_url, args=('' ,)) start = time.time() thread1.start() thread2.start() thread1.join() thread2.join() end = time.time() print(f'cost time: { end - start } ' )
继承 threading.Thread 实现 run 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class GetDetailHtml (threading.Thread ): def __init__ (self, name ): super ().__init__(name=name) def run (self ): print("get detail html started" ) time.sleep(2 ) print("get detail html ended" ) class GetDetailUrl (threading.Thread ): def __init__ (self, name ): super ().__init__(name=name) def run (self ): print("get detail url started" ) time.sleep(2 ) print("get detail url ended" ) thread1 = GetDetailHtml('get_detail_html' ) thread2 = GetDetailUrl('get_deatil_url' ) start = time.time() thread1.start() thread2.start() thread1.join() thread2.join() end = time.time() print(f'cost time: { end - start } ' )
# 线程同步
使用锁可以防止多个线程对同一个变量的修改造成的不一致,锁的缺点是会影响性能并且容易引起死锁
RLock:可重入锁,在同一个线程可以连续调用锁的 acquire 和 release.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import threadingfrom threading import Lock, RLocktotal = 0 lock = RLock() def foo (lock ): lock.acquire() lock.release() def add (lock ): global total for _ in range (1000000 ): lock.acquire() foo(lock) total += 1 lock.release() def dec (lock ): global total for _ in range (1000000 ): lock.acquire() total -= 1 lock.release() thread_add = threading.Thread(target=add, args=(lock,)) thread_dec = threading.Thread(target=dec, args=(lock,)) thread_add.start() thread_dec.start() thread_add.join() thread_dec.join() print(total)
# 线程间通信
queue 包中的 Queue 可以实现线程间通信,并且 Queue 是线程安全的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import timeimport threadingfrom queue import Queuedetail_url_queue = Queue(maxsize=1000 ) def get_detail_html (queue ): ''' 爬去文章详情页 ''' while True : url = detail_url_queue.get() print("get detail html started" ) time.sleep(2 ) print("get detail html ended" ) def get_detail_url (queue ): ''' 爬去文章列表页 ''' while True : print("get detail url started" ) time.sleep(4 ) for i in range (20 ): queue.put(f'https://www.test.com/{i} ' ) print("get detail url ended" ) thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,)) for _ in range (10 ): thread_detail_html = threading.Thread(target=get_detail_html, args=(detail_url_queue,)) thread_detail_html.start()
Thread_Local 全局变量 local_school 就是一个 ThreadLocal 对象,每个 Thread 对它都可以读写 student 属性,但互不影响。你可以把 local_school 看成全局变量,但每个属性如 local_school.student 都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal 内部会处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import threadinglocal_school = threading.local() def process_student (): std = local_school.student print('hello,%s (in %s)' %(std,threading.current_thread().name)) def process_thread (name ): local_school.student = name process_student() t1 = threading.Thread(target=process_thread,args=('python' ,),name='thread-a' ) t2 = threading.Thread(target=process_thread,args=('ruby' ,),name='thread-b' ) t1.start() t2.start() t1.join() t2.join()
# 控制线程数量的锁 Semaphore
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import threadingimport timefrom threading import Semaphoreclass HtmlSpider (threading.Thread ): def __init__ (self, url, sem ): super ().__init__() self.url = url self.sem = sem def run (self ): time.sleep(2 ) print('get {} data' .format (self.url)) self.sem.release() class UrlProducer (threading.Thread ): def __init__ (self, sem ): super ().__init__() self.sem = sem def run (self ): for i in range (20 ): self.sem.acquire() html_spider = HtmlSpider('http://www.test.com/{}' .format (i), self.sem) html_spider.start() if __name__ == '__main__' : sem = Semaphore(3 ) url_producer = UrlProducer(sem) url_producer.start()
# 条件控制的锁 Condition
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 import threadingfrom threading import Lockfrom threading import Conditionclass StudentA (threading.Thread ): def __init__ (self, name, cond ): super ().__init__(name=name) self.cond = cond def run (self ): with self.cond: print(f'{self.name} : Hello BBB' ) self.cond.notify() self.cond.wait() print(f'{self.name} : Where are you?' ) self.cond.notify() self.cond.wait() class StudentB (threading.Thread ): def __init__ (self, name, cond ): super ().__init__(name=name) self.cond = cond def run (self ): with self.cond: self.cond.wait() print(f'{self.name} : Hello AAA' ) self.cond.notify() self.cond.wait() print(f'{self.name} : I am in Shanghai.' ) self.cond.notify() if __name__ == '__main__' : cond = Condition() sb = StudentB('BBB' , cond) sa = StudentA('AAA' , cond) sb.start() sa.start()
# 线程池 ThreadPoolExecutor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED, ALL_COMPLETEDimport timedef get_html (times ): time.sleep(times) print('get {}' .format (times)) return times executor = ThreadPoolExecutor(max_workers=2 ) urls = [3 , 2 , 4 ] all_tasks = [executor.submit(get_html, (url)) for url in urls] wait(all_tasks, return_when=ALL_COMPLETED) print('main' )