from multiprocessing import Process# 有个 url 列表 ,有5个 url ,一次请求是1秒,5个5秒# 要求1秒把 url 请求完,a = [] # 在进程中数据不共享,# 解决: 将其变成共享n = 1def fun(): global n n = 2 # 做用或只在子进程 if __name__ == '__main__': p = Process(target=fun) p.start() p.join() print(n) # 打印出的是 1 子进程的修改是无效的
引入 Manager 服务器进程 实现数据的共享,
from multiprocessing import Process,Manager# 此列要求在 linux 中def fun(d): d['a'] = 0 # 修改变量是有效的,manager = Manager()d = manager.dict() # 变成 manager 数据类型,实现共享if __name__ == '__main__': p = Process(target=fun,args=(d,)) p.start() p.join() print(d) # {'a': 0}
线程安全
from multiprocessing import Process,Managerfrom threading import Thread# 此列要求在 linux 中x = 0n = 1000000# GIL 锁,保证了同一时间只有一个线程在运行def fun1(n): global x for i in range(n): x +=1def fun2(n): global x for i in range(n): x -= 1if __name__ == '__main__': t1 = Thread(target=fun1,args=(n,)) t2 = Thread(target=fun2,args=(n,)) t1.start() t2.start() t1.join() t2.join() print(x) # 当 n 的值越大,x 的值越不定, 正确答案是 0 ,但实际 出来的 x 是不定的,
控制好共享数据
from multiprocessing import Process,Managerimport threadingimport time# lock = threading.Lock() # 实例 LOCK 锁lock = threading.RLock() # 实例 RLOCK 锁x = 0n = 1000000# GIL 锁,保证了同一时间只有一个线程在运行#def fun1(n): global x for i in range(n): try: lock.acquire() # 获取 x +=1 finally: # 上边不管对错,下边一定执行, lock.release() # 释放# 不管执行对错,必须释放锁 防止程序错误,赌死整个程序,def fun2(n): global x for i in range(n): try: lock.acquire() # 获取 x -= 1 finally: # 上边不管对错,下边一定执行, lock.release() # 释放if __name__ == '__main__': t1 = threading.Thread(target=fun1,args=(n,)) t2 = threading.Thread(target=fun2,args=(n,)) st = time.time() t1.start() t2.start() t1.join() t2.join() print(x) print( time.time()-st )# 原子操作:一步能完成,不会被打断,# LOCK 锁 ,在同一时间内,一把锁只能被一个线程被获取# RLOCK 锁,在同一个线程,可以我次获取,
# RLOCK 解决 嵌套 lock.acquire() # lock.acquire() 赌死程序 线程,进程安全队列,
import queue # 队列 线程中使用q = queue.Queue() # () 设置队列长度,0 无穷大,# 添加数据 put# 获取数据 getq.put(1) # # 添加数据q.put(2) # # 添加数据q.put(3) # # 添加数据print(q.get()) # 获取数据 1print(q.get()) # 获取数据 2print(q.get()) # 获取数据 3print(q.get()) # 获取数据 空q.put(1,block=False)# block=False 如果队列已经满,立刻抛出异常,# block=True 会一直等待,q.empty() # 是否为空q.full() # 是否潢q.qsize() # 队列长度q.task_done() # 任务结束q.join() # 等待完成# 内部有个计数器,每 put 一次,+1,# task_done() 一次,-1,# 只有其计数器 = 0 时解除阻塞,
生产者与消费者模式
# -*- coding: utf-8 -*-# 斌彬电脑# @Time : 2018/7/19 0019 6:26import queueimport threadingimport timeq = queue.Queue()def fun1(q): while True: data = q.get() # 只客读取 print(data) q.task_done()for i in range(10): t = threading.Thread(target=fun1, args=(q,)) t.start()while True: time.sleep(0.5) q.put(time.time()) # 只管添加 print('添加数据')