一、背景

  最近在Azkaban的测试工作中,需要在测试环境下模拟线上的调度场景进行稳定性测试。故而重操python旧业,通过python编写脚本来构造类似线上的调度场景。在脚本编写过程中,碰到这样一个需求:要在测试环境创建10000个作业流。

  最开始的想法是在一个azkaban project下循环调用10000次create job接口(每个Flow只包含一个job)。由于azkaban它本身没有增加/删除作业流的接口,所有的作业流修改、增加、删除其实都是通过重新上传项目zip包实现的,相应地每次调猛犸前端的create job接口,实际上是在猛犸端对zip包的内容进行了重新的整合后再重新上传zip包到azkaban,整个过程可以拆解成如下过程:解压zip包获得zip包内容,变更zip包内的文件内容,重新打包zip包,上传到azkaban。因此,随着循环次数越往后,zip包包含的内容会越多,接口执行一次的时间就越长。实践发现,第一次调该接口的时间大致不到1秒,到循环1000次的时候接口调用一次的时间就达到了将近3秒。因此,如果指望一个循环10000次来构造该场景,显然要耗费巨大的时间。

  在此背景下, 自然而然地就想到用多进程/多线程的方式来处理该问题。

二、“多任务”的操作系统基础

  大家都知道,操作系统可以同时运行多个任务。比如你一边听音乐,一边聊IM,一边写博客等。现在的cpu大都是多核的,但即使是过去的单核cpu也是支持多任务并行执行。

  单核cpu执行多任务的原理:操作系统交替轮流地执行各个任务。先让任务1执行0.01秒,然后切换到任务2执行0.01秒,再切换到任务3执行0.01秒...这样往复地执行下去。由于cpu的执行速度非常快,所以使用者的主观感受就是这些任务在并行地执行。

  多核cpu执行多任务的原理:由于实际应用中,任务的数量往往远超过cpu的核数,所以操作系统实际上是把这些多任务轮流地调度到每个核心上执行。

  对于操作系统来说,一个应用就是一个进程。比如打开一个浏览器,它是一个进程;打开一个记事本,它是一个进程。每个进程有它特定的进程号。他们共享系统的内存资源。进程是操作系统分配资源的最小单位。

  而对于每一个进程而言,比如一个视频播放器,它必须同时播放视频和音频,就至少需要同时运行两个“子任务”,进程内的这些子任务就是通过线程来完成。线程是最小的执行单元。一个进程它可以包含多个线程,这些线程相互独立,同时又共享进程所拥有的资源。

三、Python多进程编程

  1. multiprocessing

  multiprocessing是Python提供的一个跨平台的多进程模块,通过它可以很方便地编写多进程程序,在不同的平台(Unix/Linux, Windows)都可以执行。

  下面就是使用multiprocessing编写多进程程序的代码:  

#!/usr/bin/python
# -*- coding: utf-8 -*
__author__ = 'zni.feng'
import  sys
reload (sys)
sys.setdefaultencoding('utf-8')

from multiprocessing import Process
import os
import time

#子进程fun
def child_projcess_fun(name):

    print 'Child process %s with processId %s starts.' % (name, os.getpid())

    time.sleep(3)

    print 'Child process %s with processId %s ends.' % (name, os.getpid())

if __name__ == "__main__":

    print 'Parent processId is: %s.' % os.getpid()

    p = Process(target = child_projcess_fun, args=('zni',))

    print 'Process starts'

    p.start() #开始进程

    p.join() #等待子进程结束后再继续往下执行

    print 'Process ends.'

程序的输出:

Parent processId is: 11076.
Process starts
Child process zni with processId 11077 starts.
Child process zni with processId 11077 ends.
Process ends.
[Finished in 3.1s]

  2. Pool

  某些情况下,我们希望批量创建多个子进程,或者给定子进程数的上限,避免无限地消耗系统的资源。通过Pool(进程池)的方式,就可以完成这项工作,下面是使用Pool的代码:

 #!/usr/bin/python

 # -*- coding: utf-8 -*

 __author__ = 'zni.feng'

 import  sys

 reload (sys)

 sys.setdefaultencoding('utf-8')

 from multiprocessing import Pool

 import os, time

 def child_process_test(name, sleep_time):

     print 'Child process %s with processId %s starts.' % (name, os.getpid())

     time.sleep(sleep_time)

     print 'Child process %s with processId %s ends.' % (name, os.getpid())

 if __name__ == "__main__":

     print 'Parent processId is: %s.' % os.getpid()

     p = Pool()  #进程池默认大小是cpu的核数

     #p = Pool(10) #生成一个容量为10的进程池,即最大同时执行10个子进程

     for i in range(5):

         p.apply_async(child_process_test, args=('zni_'+str(i), i+1,)) #p.apply_async向进程池提交目标请求

     print 'Child processes are running.'

     p.close()

     p.join() #用来等待进程池中的所有子进程结束再向下执行代码,必须在p.close()或者p.terminate()之后执行

     print 'All Processes end.'

程序的输出:

Parent processId is: 5050.
Child processes are running.
Child process zni_0 with processId 5052 starts.
Child process zni_1 with processId 5053 starts.
Child process zni_2 with processId 5054 starts.
Child process zni_3 with processId 5055 starts.
Child process zni_0 with processId 5052 ends.
Child process zni_4 with processId 5052 starts.
Child process zni_1 with processId 5053 ends.
Child process zni_2 with processId 5054 ends.
Child process zni_3 with processId 5055 ends.
Child process zni_4 with processId 5052 ends.
All Processes end.
[Finished in 6.2s]

close()方法和terminate()方法的区别:

  close:关闭进程池,使之不能再添加新的进程。已经执行的进程会等待继续执行直到结束。

  terminate:强制终止线程池,正在执行的进程也会被强制终止。

  3. 进程间通信

  Python的multiprocessing模块提供了多种进程间通信的方式,如Queue、Pipe等。

  3.1 Queue、Lock

  Queue是multiprocessing提供的一个模块,它的数据结构就是"FIFO——first in first out"的队列,常用的方法有:put(object)入队;get()出队;empty()判断队列是否为空。

  Lock:当多个子进程对同一个queue执行写操作时,为了避免并发操作产生冲突,可以通过加锁的方式使得某个子进程对queue拥有唯一的写权限,其他子进程必须等待该锁释放后才能再开始执行写操作。

  下面就是使用Queue进行进程间通信的代码:在父进程里创建两个子进程,分别实现对queue的读和写操作

 #!/usr/bin/python

 # -*- coding: utf-8 -*

 __author__ = 'zni.feng'

 import  sys

 reload (sys)

 sys.setdefaultencoding('utf-8')

 from multiprocessing import Process, Queue, Lock

 import os, time, random

 #写数据进程

 def write(q, lock, name):

     print 'Child Process %s starts' % name

     #获得锁

     lock.acquire()

     for value in ['A' , 'B', 'C']:

         print 'Put %s to queue...' % value

         q.put(value)

         time.sleep(random.random())

     #释放锁

     lock.release()

     print 'Child Process %s ends' % name

 #读数据进程

 def read(q, lock, name):

     print 'Child Process %s starts' % name

     while True: #持续地读取q中的数据

         value =q.get()

         print 'Get %s from queue.' % value

     print 'Child Process %s ends' % name

 if __name__ == "__main__":

     #父进程创建queue,并共享给各个子进程

     q= Queue()

     #创建锁

     lock = Lock()

     #创建第一个“写”子进程

     pw = Process(target = write , args=(q, lock, 'WRITE', ))

     #创建“读”进程

     pr = Process(target = read, args=(q,lock, 'READ',))

     #启动子进程pw,写入:

     pw.start()

     #启动子进程pr,读取:

     pr.start()

     #等待pw结束:

     pw.join()

     #pr是个死循环,通过terminate杀死:

     pr.terminate()

     print 'Test finish.'

  程序的输出结果为:

Child Process WRITE starts
Put A to queue...
Child Process READ starts
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
Child Process WRITE ends
Test finish.
[Finished in 2.0s]

  3.2 Pipe

  Pipe是另一种进程间通信的方式,俗称“管道”。它由两端组成,一端往管道里写入数据,另一端从管道里读取数据。
  下面就是使用Pipe通信的代码:

 #!/usr/bin/python

 # -*- coding: utf-8 -*

 __author__ = 'zni.feng'

 import  sys

 reload (sys)

 sys.setdefaultencoding('utf-8')

 from multiprocessing import Process, Pipe

 import os, time, random

 #发送数据进程

 def send(child_pipe, name):

     print 'Child Process %s starts' % name

     child_pipe.send('This is Mr.Ni')

     child_pipe.close()

     time.sleep(random.random())

     print 'Child Process %s ends' % name

 #接收数据进程

 def recv(parent_pipe, name):

     print 'Child Process %s starts' % name

     print parent_pipe.recv()

     time.sleep(random.random())

     print 'Child Process %s ends' % name

 if __name__ == "__main__":

     #创建管道

     parent,child = Pipe()

     #创建send进程

     ps = Process(target=send, args=(child, 'SEND'))

     #创建recv进程

     pr = Process(target=recv, args=(parent, 'RECEIVE'))

     #启动send进程

     ps.start()

     #等待send进程结束

     ps.join()

     #启动recv进程

     pr.start()

     #等待recv进程结束

     pr.join()

     print 'Test finish.'

  程序的输出结果如下:

Child Process SEND starts
Child Process SEND ends
Child Process RECEIVE starts
This is Mr.Ni
Child Process RECEIVE ends
Test finish.
[Finished in 1.8s]

  


Python中的多进程与多线程(一)的更多相关文章

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. js中‘!.’是什么意思

  3. InnoDB 和 MyISAM 引擎恢复数据库,使用 .frm、.ibd文件恢复数据库

  4. Error: Cannot find module ‘node:util‘问题解决

    控制台 安装 Vue-Cli 最后一步出现 Error: Cannot find module 'node:util' 问题解决方案1.问题C:\Windows\System32>cnpm install -g @vue/cli@4.0.3internal/modules/cjs/loader.js:638 throw err; &nbs

  5. yarn的安装和使用(全网最详细)

    一、yarn的简介:Yarn是facebook发布的一款取代npm的包管理工具。二、yarn的特点:速度超快。Yarn 缓存了每个下载过的包,所以再次使用时无需重复下载。 同时利用并行下载以最大化资源利用率,因此安装速度更快。超级安全。在执行代码之前,Yarn 会通过算法校验每个安装包的完整性。超级可靠。使用详细、简洁的锁文件格式和明确的安装算法,Yarn 能够保证在不同系统上无差异的工作。三、y

  6. 前端环境 本机可切换node多版本 问题源头是node使用的高版本

    前言投降投降 重头再来 重装环境 也就分分钟的事 偏要折腾 这下好了1天了 还没折腾出来问题的源头是node 使用的高版本 方案那就用 本机可切换多版本最终问题是因为nodejs的版本太高,导致的node-sass不兼容问题,我的node是v16.14.0的版本,项目中用了"node-sass": "^4.7.2"版本,无法匹配当前的node版本根据文章的提

  7. 宝塔Linux的FTP连接不上的解决方法

    宝塔Linux的FTP连接不上的解决方法常见的几个可能,建议先排查。1.注意内网IP和外网IP2.检查ftp服务是否启动 (面板首页即可看到)3.检查防火墙20端口 ftp 21端口及被动端口39000 - 40000是否放行 (如是腾讯云/阿里云等还需检查安全组)4.是否主动/被动模式都不能连接5.新建一个用户看是否能连接6.修改ftp配置文件 将ForcePassiveIP前面的#去掉 将19

  8. 扩展element-ui el-upload组件,实现复制粘贴上传图片文件,带图片预览功能

  9. 微信小程序canvas实现水平、垂直居中效果

    这篇文章主要介绍了小程序中canvas实现水平、垂直居中效果,本文图文实例代码相结合给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下

  10. 使用HTML5做的导航条详细步骤

    这篇文章主要介绍了用HTML5做的导航条详细步骤,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

随机推荐

  1. 10 个Python中Pip的使用技巧分享

    众所周知,pip 可以安装、更新、卸载 Python 的第三方库,非常方便。本文小编为大家总结了Python中Pip的使用技巧,需要的可以参考一下

  2. python数学建模之三大模型与十大常用算法详情

    这篇文章主要介绍了python数学建模之三大模型与十大常用算法详情,文章围绕主题展开详细的内容介绍,具有一定的参考价值,感想取得小伙伴可以参考一下

  3. Python爬取奶茶店数据分析哪家最好喝以及性价比

    这篇文章主要介绍了用Python告诉你奶茶哪家最好喝性价比最高,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧

  4. 使用pyinstaller打包.exe文件的详细教程

    PyInstaller是一个跨平台的Python应用打包工具,能够把 Python 脚本及其所在的 Python 解释器打包成可执行文件,下面这篇文章主要给大家介绍了关于使用pyinstaller打包.exe文件的相关资料,需要的朋友可以参考下

  5. 基于Python实现射击小游戏的制作

    这篇文章主要介绍了如何利用Python制作一个自己专属的第一人称射击小游戏,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起动手试一试

  6. Python list append方法之给列表追加元素

    这篇文章主要介绍了Python list append方法如何给列表追加元素,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

  7. Pytest+Request+Allure+Jenkins实现接口自动化

    这篇文章介绍了Pytest+Request+Allure+Jenkins实现接口自动化的方法,文中通过示例代码介绍的非常详细。对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  8. 利用python实现简单的情感分析实例教程

    商品评论挖掘、电影推荐、股市预测……情感分析大有用武之地,下面这篇文章主要给大家介绍了关于利用python实现简单的情感分析的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下

  9. 利用Python上传日志并监控告警的方法详解

    这篇文章将详细为大家介绍如何通过阿里云日志服务搭建一套通过Python上传日志、配置日志告警的监控服务,感兴趣的小伙伴可以了解一下

  10. Pycharm中运行程序在Python console中执行,不是直接Run问题

    这篇文章主要介绍了Pycharm中运行程序在Python console中执行,不是直接Run问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

返回
顶部