网络部分大整理
一: 基础概念
并发编程 进程 进程是计算机中最小的资源分配单位 进程与进程之间数据隔离,执行过程异步 为什么会出现进程的概念? 合理利用cpu,提高用户体验 多个进程是可以同时利用多个cpu的 可以实现并行的效果 僵尸进程 进程 状态码 Z/z 僵尸进程 linux 在主进程中控制子进程的方法 子进程对象 = Process(target,args) 在创建的这一刻根本就没有通知操作系统 子进程对象.start() 通知操作系统,开启子进程,异步非阻塞 子进程对象.terminate() 通知操作系统,结束子进程,异步非阻塞 子进程对象.is_alive() 查看子进程是否还活着 子进程对象.join() 阻塞,直到子进程结束 子进程对象.join(timeout = 10) 阻塞最多10s,期间子进程如果结束就结束阻塞,如果没结束10s之后也结束阻塞 守护进程 守护进程是一个子进程 守护进程会在主进程代码结束之后才结束 为什么会这样? 由于主进程必须要回收所有的子进程的资源 所以主进程必须在子进程结束之后才能结束 而守护进程就是为了守护主进程存在的 不能守护到主进程结束,就只能退而求其次,守护到代码结束了 守护到主进程的代码结束,意味着如果有其他子进程没有结束,守护进程无法继续守护 解决方案 : 在主进程中加入对其他子进程的join操作,来保证守护进程可以守护所有主进程和子进程的执行 如何设置守护进程 子进程对象.daemon = True 这句话写在start之前 锁 为什么要用锁? 由于多个进程的并发,导致很多数据的操作都在同时进行 所以就有可能产生多个进程同时操作 : 文件\数据库 中的数据 导致数据不安全 所以给某一段修改数据的程序加上锁,就可以控制这段代码永远不会被多个进程同时执行 保证了数据的安全 Lock 锁(互斥锁) 锁实际上是把你的某一段程序变成同步的了,降低了程序运行的速度,为了保证数据的安全性 没有数据安全的效率都是耍流氓 二:
1/信号量 保证一段代码同一时刻只能有n个进程执行 流量控制 from multiprocessing import Semaphore(是类)
import timeimport randomfrom multiprocessing import Process,Semaphoredef ktv(name,sem): sem.acquire() print("%s走进了ktv"%name) time.sleep(random.randint(5,10)) print("%s走出了ktv" % name) sem.release()if __name__ == '__main__': sem = Semaphore(4) for i in range(100): p = Process(target=ktv,args = ('name%s'%i,sem)) p.start()
2/ 事件 Event(类) from multiprocessing import Event
# 红绿灯 一种实际问题 演示 import timeimport randomfrom multiprocessing import Event,Processdef traffic_light(e): print('\033[1;31m红灯亮\033[0m') while True: time.sleep(2) if e.is_set(): # 如果当前是绿灯 print('\033[1;31m红灯亮\033[0m') # 先打印红灯亮 e.clear() # 再把灯改成红色 else : # 当前是红灯 print('\033[1;32m绿灯亮\033[0m') # 先打印绿灯亮 e.set() # 再把灯变绿色def car(e,carname): if not e.is_set(): print('%s正在等待通过'%carname) e.wait() print('%s正在通过'%carname)if __name__ == '__main__': e = Event() p = Process(target=traffic_light,args = (e,)) p.start() for i in range(100): time.sleep(random.randrange(0,3)) p = Process(target=car, args=(e,'car%s'%i)) p.start()
e = Event()e 事件对象事件本身就带着标识 : Falsewait 阻塞它的阻塞条件是 对象标识为False结束阻塞条件是 对象标识为True对象的标识相关的 :set 将对象的标识设置为Trueclear 将对象的标识设置为Falseis_set 查看对象的标识是否为TrueEvent事件放到进程中的代码一定不止一段这两个操作之间 存在同步关系一个操作去确认另一个操作的执行条件是否打成标识 控制wait是否阻塞的关键如何修改这个标识 : clear set如何查看这个标识 : is_set
3/ 进程间通信 IPC Inter-Process Communication
实现进程之间通信的两种机制: # 管道 Pipe from multiprocessing import pipe,Process # 队列 Queue from multiprocessing import Queue,Process
# 为什么队列为空 为满 这件事情不够准确 # q.qsize() 队列的大小 # q.full() 是否满了 满返回True # q.empty() 是否空了 空返回True
from multiprocessing import Queue,Processdef consumer(q): print( '子进程 :', q.get() )if __name__ == '__main__': q = Queue() p = Process(target=consumer,args=(q,)) p.start() q.put('hello,world')
# 生产者消费者模型import timefrom multiprocessing import Queue,Processdef producer(name,food,num,q): '''生产者''' for i in range(num): time.sleep(0.3) foodi = food + str(i) print('%s生产了%s'%(name,foodi)) q.put(foodi)def consumer(name,q): while True: food = q.get() # 等待接收数据 if food == None:break print('%s吃了%s'%(name,food)) time.sleep(1)if __name__ == '__main__': q = Queue(maxsize=10) p1 = Process(target=producer,args = ('宝元','泔水',20,q)) p2 = Process(target=producer,args = ('战山','鱼刺',10,q)) c1 = Process(target=consumer, args=('alex', q)) c2 = Process(target=consumer, args=('wusir', q)) p1.start() # 开始生产 p2.start() # 开始生产 c1.start() c2.start() p1.join() # 生产者结束生产了 p2.join() # 生产者结束生产了 q.put(None) # put None 操作永远放在所有的生产者结束生产之后 q.put(None) # 有几个消费者 就put多少个None #下面是 JoinableQueue
import time from multiprocessing import JoinableQueue,Process def consumer(name,q): while True: food = q.get() time.sleep(1) print('%s消费了%s'%(name,food)) q.task_done() def producer(name,food,num,q): '''生产者''' for i in range(num): time.sleep(0.3) foodi = food + str(i) print('%s生产了%s'%(name,foodi)) q.put(foodi) q.join() # 消费者消费完毕之后会结束阻塞 if __name__ == '__main__': q = JoinableQueue() p1 = Process(target=producer, args=('宝元', '泔水', 20, q)) c1 = Process(target=consumer, args=('alex', q)) c2 = Process(target=consumer, args=('wusir', q)) c1.daemon = True c2.daemon = True p1.start() c1.start() c2.start() p1.join() # 消费者每消费一个数据会给队列发送一条信息 # 当每一个数据都被消费掉之后 joinablequeue的join阻塞行为就会结束 # 以上就是为什么我们要在生产完所有数据的时候发起一个q.join() # 随着生产者子进程的执行完毕,说明消费者的数据都消费完毕了 # 这个时候主进程中的p1.join结束 # 主进程的代码结束 # 守护进程也结束了
4/ Manager 类
from multiprocessing import Manager,Process,Lockdef work(d,lock): # with lock: #不加锁而操作共享的数据,肯定会出现数据错乱 # d['count']-=1 lock.acquire() d['count'] -= 1 lock.release()if __name__ == '__main__': lock=Lock() m = Manager() dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic) Manager是一个类 内部有一些数据类型能够实现进程之间的数据共享dict list这样的数据 内部的数字进行自加 自减 是会引起数据不安全的,这种情况下 需要我们手动加锁完成因此 我们一般情况下 不适用这种方式来进行进程之间的通信我们宁可使用Queue队列或者其他消息中间件 来实现消息的传递 保证数据的安全
5/ pool 进程池 (类)
什么是进程池? 有限的进程的池子为什么要用进程池? 任务很多 cpu个数*5个任务以上 为了节省创建和销毁进程的时间 和 操作系统的资源一般进程池中进程的个数: cpu的1-2倍 如果是高计算,完全没有io,那么就用cpu的个数 随着IO操作越多,可能池中的进程个数也可以相应增加向进程池中提交任务的三种方式 map 异步提交任务 简便算法 接收的参数必须是 子进程要执行的func,可迭代的(可迭代中的每一项都会作为参数被传递给子进程) 能够传递的参数是有限的,所以比起apply_async限制性比较强 apply 同步提交任务(你删了吧) apply_async 异步提交任务 能够传递比map更丰富的参数,但是比较麻烦 首先 apply_async提交的任务和主进程完全异步 可以通过先close进程池,再join进程池的方式,强制主进程等待进程池中任务的完成 也可以通过get获取返回值的方式,来等待任务的返回值 我们不能在apply_async提交任务之后直接get获取返回值 for i in range(100): ret = p.apply_async(func,args=(i,)) # 自动带join 异步的 apply_async异步提交任务 l.append(ret) for ret in l: print(ret.get())
异步方式向进程池提交任务并且获取返回值import timefrom multiprocessing import Pool # 池def func(i): i -= 1 time.sleep(1) return i**2# 你的池中打算放多少个进程,个数cpu的个数 * 1|2if __name__ == '__main__': p = Pool(5) l = [] for i in range(100): ret = p.apply_async(func,args=(i,)) # 自动带join 异步的 apply_async异步提交任务 l.append(ret) for ret in l: print(ret.get())
回调函数import osimport timeimport randomfrom multiprocessing import Pool # 池def func(i): # [2,1,1,5,0,0.2] i -= 1 time.sleep(random.uniform(0,2)) return i**2def back_func(args): print(args,os.getpid())if __name__ == '__main__': print(os.getpid()) p = Pool(5) l = [] for i in range(100): ret = p.apply_async(func,args=(i,),callback=back_func) # 5个任务 p.close() p.join()callback回调函数主动执行func,然后在func执行完毕之后的返回值,直接传递给back_func作为参数,调用back_func处理池中任务的返回值回调函数是由谁执行的? 主进程
5000个网页5个进程import reimport jsonfrom urllib.request import urlopenfrom multiprocessing import Pooldef get_page(i): ret = urlopen('https://movie.douban.com/top250?start=%s&filter='%i).read() ret = ret.decode('utf-8') return retdef parser_page(s): com = re.compile( '.*?.*? (?P\d+).*? (?P .*?)</span>' '.*? .*?(?P.*?)评价 ', re.S) ret = com.finditer(s) with open('file','a',encoding='utf-8') as f: for i in ret: dic = { "id": i.group("id"), "title": i.group("title"), "rating_num": i.group("rating_num"), "comment_num": i.group("comment_num"), } f.write(json.dumps(dic,ensure_ascii=False)+'\n')if __name__ == '__main__': p = Pool(5) count = 0 for i in range(10): p.apply_async(get_page,args=(count,),callback=parser_page) count += 25 p.close() p.join()import jsonwith open('file2','w',encoding='utf-8') as f: json.dump({'你好':'alex'},f,ensure_ascii=False)