我花了几个小时来尝试并行化我的数字运算代码,但是当我这样做时它只会变慢.不幸的是,当我尝试将其减少到下面的示例时,问题就消失了,我真的不想在这里发布整个程序.所以问题是:在这类程序中我应该避免哪些陷阱?

(注意:Unutbu的答案在底部后跟进.)

以下是情况:

>它是关于一个模块,它定义了一个包含大量内部数据的类BigData.在该示例中,存在一个插值函数列表ff;在实际程序中,还有更多,例如ffA [k],ffB [k],ffC [k].
>计算将被归类为“令人尴尬的并行”:可以一次在较小的数据块上完成工作.在示例中,这是do_chunk().
>在我的实际程序中,示例中显示的方法将导致最差的性能:每个块约1秒(在单个线程中完成实际计算时间的0.1秒左右).因此,对于n = 50,do_single()将在5秒内运行,do_multi()将在55秒内运行.
>我还尝试通过将xi和yi数组切割成连续的块并迭代每个块中的所有k值来分解工作.这工作得更好一点.现在,无论是使用1,2,3或4个线程,总执行时间都没有差别.但当然,我希望看到实际的加速!
>这可能是相关的:Multiprocessing.Pool makes Numpy matrix multiplication slower.但是,在程序的其他地方,我使用多处理池进行更加孤立的计算:一个看起来像def do_chunk(array1,array2,array3)的函数(未绑定到类)并对该数组进行仅限numpy计算.在那里,有显着的速度提升.
> cpu使用率随预期的并行进程数量而变化(三个线程的cpu使用率为300%).

#!/usr/bin/python2.7

import numpy as np,time,sys
from multiprocessing import Pool
from scipy.interpolate import RectBivariateSpline

_tm=0
def stopwatch(msg=''):
    tm = time.time()
    global _tm
    if _tm==0: _tm = tm; return
    print("%s: %.2f seconds" % (msg,tm-_tm))
    _tm = tm

class BigData:
    def __init__(self,n):
        z = np.random.uniform(size=n*n*n).reshape((n,n,n))
        self.ff = []
        for i in range(n):
            f = RectBivariateSpline(np.arange(n),np.arange(n),z[i],kx=1,ky=1)
            self.ff.append(f)
        self.n = n

    def do_chunk(self,k,xi,yi):
        s = np.sum(np.exp(self.ff[k].ev(xi,yi)))
        sys.stderr.write(".")
        return s

    def do_multi(self,numproc,yi):
        procs = []
        pool = Pool(numproc)
        stopwatch('Pool setup')
        for k in range(self.n):
            p = pool.apply_async( _do_chunk_wrapper,(self,yi))
            procs.append(p)
        stopwatch('Jobs queued (%d processes)' % numproc)
        sum = 0.0
        for k in range(self.n):
            # Edit/bugfix: replaced p.get by procs[k].get
            sum += np.sum(procs[k].get(timeout=30)) # timeout allows ctrl-C interrupt
            if k == 0: stopwatch("\nFirst get() done")
        stopwatch('Jobs done')
        pool.close()
        pool.join()
        return sum

    def do_single(self,yi):
        sum = 0.0
        for k in range(self.n):
            sum += self.do_chunk(k,yi)
        stopwatch('\nAll in single process')
        return sum

def _do_chunk_wrapper(bd,yi): # must be outside class for apply_async to chunk
    return bd.do_chunk(k,yi)        

if __name__ == "__main__":
    stopwatch()
    n = 50
    bd = BigData(n)
    m = 1000*1000
    xi,yi = np.random.uniform(0,size=m*2).reshape((2,m))
    stopwatch('Initialized')
    bd.do_multi(2,yi)
    bd.do_multi(3,yi)
    bd.do_single(xi,yi)

输出:

Initialized: 0.06 seconds
Pool setup: 0.01 seconds
Jobs queued (2 processes): 0.03 seconds
..
First get() done: 0.34 seconds
................................................Jobs done: 7.89 seconds
Pool setup: 0.05 seconds
Jobs queued (3 processes): 0.03 seconds
..
First get() done: 0.50 seconds
................................................Jobs done: 6.19 seconds
..................................................
All in single process: 11.41 seconds

计时采用Intel Core i3-3227 cpu,具有2个内核,4个线程,运行64位Linux.对于实际程序,多处理版本(池机制,即使只使用一个核心)比单进程版本慢10倍.

跟进

Unutbu的回答让我走上正轨.在实际的程序中,self被腌制成一个需要传递给工作进程的37到140 MB的对象.更糟糕的是,Python酸洗非常缓慢;酸洗本身花了几秒钟,这发生在传递给工人流程的每一块工作中.除了挑选和传递大数据对象之外,Linux中apply_async的开销非常小;对于一个小函数(添加几个整数参数),每个apply_async / get对只需0.2 ms.因此,以非常小的块分割工作本身并不是问题.所以,我将所有大数组参数作为索引传递给全局变量.为了cpu缓存优化,我保持小块大小.

全局变量存储在全局字典中;在设置工作池之后,将立即在父进程中删除这些条目.只有dict的密钥才会传送给工作人员.酸洗/ IPC唯一的大数据是工人创建的新数据.

#!/usr/bin/python2.7

import numpy as np,sys
from multiprocessing import Pool

_mproc_data = {}  # global storage for objects during multiprocessing.

class BigData:
    def __init__(self,size):
        self.blah = np.random.uniform(0,1,size=size)

    def do_chunk(self,yi):
        # do the work and return an array of the same shape as xi,yi
        zi = k*np.ones_like(xi)
        return zi

    def do_all_work(self,yi,num_proc):
        global _mproc_data
        mp_key = str(id(self))
        _mproc_data['bd'+mp_key] = self # BigData
        _mproc_data['xi'+mp_key] = xi
        _mproc_data['yi'+mp_key] = yi
        pool = Pool(processes=num_proc)
        # processes have Now inherited the global variabele; clean up in the parent process
        for v in ['bd','xi','yi']:
            del _mproc_data[v+mp_key]

        # setup indices for the worker processes (placeholder)
        n_chunks = 45
        n = len(xi)
        chunk_len = n//n_chunks
        i1list = np.arange(0,chunk_len)
        i2list = i1list + chunk_len
        i2list[-1] = n
        klist = range(n_chunks) # placeholder

        procs = []
        for i in range(n_chunks):
            p = pool.apply_async( _do_chunk_wrapper,(mp_key,i1list[i],i2list[i],klist[i]) )
            sys.stderr.write(".")
            procs.append(p)
        sys.stderr.write("\n")

        # allocate space for combined results
        zi = np.zeros_like(xi)

        # get data from workers and finish  
        for i,p in enumerate(procs):
            zi[i1list[i]:i2list[i]] = p.get(timeout=30) # timeout allows ctrl-C handling

        pool.close()
        pool.join()

        return zi

def _do_chunk_wrapper(key,i1,i2,k):
    """All arguments are small objects."""
    global _mproc_data
    bd = _mproc_data['bd'+key]
    xi = _mproc_data['xi'+key][i1:i2]
    yi = _mproc_data['yi'+key][i1:i2]
    return bd.do_chunk(k,yi)


if __name__ == "__main__":
    xi,yi = np.linspace(1,100,100001),np.linspace(1,100001)
    bd = BigData(int(1e7))
    bd.do_all_work(xi,4)

以下是速度测试的结果(同样,2个内核,4个线程),改变了工作进程的数量和块中的内存量(xi,zi数组切片的总字节数).这些数字是“每秒百万结果值”,但这对比较并不重要. “1 process”的行是带有完整输入数据的do_chunk的直接调用,没有任何子进程.

#Proc   125K    250K    500K   1000K   unlimited
1                                      0.82 
2       4.28    1.96    1.3     1.31 
3       2.69    1.06    1.06    1.07 
4       2.17    1.27    1.23    1.28

数据大小对内存的影响非常大. cpu具有3 MB共享L3缓存,每个核心具有256 KB L2缓存.请注意,计算还需要访问BigData对象的几MB内部数据.因此,我们从中学到的是进行这种速度测试很有用.对于这个程序,2个进程最快,其次是4个,3个是最慢的.

解决方法

尝试减少进程间通信.
在多处理模块中,通过队列完成所有(单机)进程间通信.通过队列传递的对象
被腌制.因此,尝试通过队列发送更少和/或更小的对象.

>不要通过队列发送自我,BigData的实例.它相当大,随着自我数据量的增加而变大:

In [6]: import pickle
In [14]: len(pickle.dumps(BigData(50)))
Out[14]: 1052187

一切
调用time pool.apply_async(_do_chunk_wrapper,yi)),
self在主进程中被腌制并在工作进程中被取消.该
len的大小(pickle.dumps(BigData(N)))增加N增加.
>让数据从全局变量中读取.在Linux上,您可以利用copy-on-Write.如Jan-Philip Gehrcke explains:

After fork(),parent and child are in an equivalent state. It would be stupid to copy the entire memory of the parent to another place in the RAM. That’s [where] the copy-on-write principle [comes] in. As long as the child does not change its memory state,it actually accesses the parent’s memory. Only upon modification,the corresponding bits and pieces are copied into the memory space of the child.

因此,您可以避免通过Queue传递BigData实例
通过简单地将实例定义为全局,bd = BigData(n),(正如您已经在做的那样)并在工作进程中引用它的值(例如_do_chunk_wrapper).它基本上等于从对pool.apply_async的调用中删除self:

p = pool.apply_async(_do_chunk_wrapper,(k_start,k_end,yi))

并将bd作为全局访问,并对do_chunk_wrapper的呼叫签名进行必要的附加更改.
>尝试将运行时间较长的函数func传递给pool.apply_async.
如果你有很多快速完成对pool.apply_async的调用,那么传递参数和通过队列返回值的开销将成为整个时间的重要部分.相反,如果你对pool.apply_async进行较少的调用,并在返回结果之前给每个func做更多工作,那么进程间通信将占总时间的一小部分.

下面,我修改了_do_chunk_wrapper以接受k_start和k_end参数,这样每次调用pool.apply_async都会在返回结果之前计算k的许多值的总和.

import math
import numpy as np
import time
import sys
import multiprocessing as mp
import scipy.interpolate as interpolate

_tm=0
def stopwatch(msg=''):
    tm = time.time()
    global _tm
    if _tm==0: _tm = tm; return
    print("%s: %.2f seconds" % (msg,n))
        self.ff = []
        for i in range(n):
            f = interpolate.RectBivariateSpline(
                np.arange(n),yi):
        n = self.n
        s = np.sum(np.exp(self.ff[k].ev(xi,yi)))
        sys.stderr.write(".")
        return s

    def do_chunk_of_chunks(self,k_start,yi):
        s = sum(np.sum(np.exp(self.ff[k].ev(xi,yi)))
                    for k in range(k_start,k_end))
        sys.stderr.write(".")
        return s

    def do_multi(self,yi):
        procs = []
        pool = mp.Pool(numproc)
        stopwatch('\nPool setup')
        ks = list(map(int,np.linspace(0,self.n,numproc+1)))
        for i in range(len(ks)-1):
            k_start,k_end = ks[i:i+2]
            p = pool.apply_async(_do_chunk_wrapper,yi))
            procs.append(p)
        stopwatch('Jobs queued (%d processes)' % numproc)
        total = 0.0
        for k,p in enumerate(procs):
            total += np.sum(p.get(timeout=30)) # timeout allows ctrl-C interrupt
            if k == 0: stopwatch("\nFirst get() done")
        print(total)
        stopwatch('Jobs done')
        pool.close()
        pool.join()
        return total

    def do_single(self,yi):
        total = 0.0
        for k in range(self.n):
            total += self.do_chunk(k,yi)
        stopwatch('\nAll in single process')
        return total

def _do_chunk_wrapper(k_start,yi): 
    return bd.do_chunk_of_chunks(k_start,yi)

产量

Initialized: 0.15 seconds

Pool setup: 0.06 seconds
Jobs queued (2 processes): 0.00 seconds

First get() done: 6.56 seconds
83963796.0404
Jobs done: 0.55 seconds
..
Pool setup: 0.08 seconds
Jobs queued (3 processes): 0.00 seconds

First get() done: 5.19 seconds
83963796.0404
Jobs done: 1.57 seconds
...
All in single process: 12.13 seconds

与原始代码相比:

Initialized: 0.10 seconds
Pool setup: 0.03 seconds
Jobs queued (2 processes): 0.00 seconds

First get() done: 10.47 seconds
Jobs done: 0.00 seconds
..................................................
Pool setup: 0.12 seconds
Jobs queued (3 processes): 0.00 seconds

First get() done: 9.21 seconds
Jobs done: 0.00 seconds
..................................................
All in single process: 12.12 seconds

使用numpy / scipy最大限度地减少Python multiprocessing.Pool的开销的更多相关文章

  1. iOS:核心图像和多线程应用程序

    我试图以最有效的方式运行一些核心图像过滤器.试图避免内存警告和崩溃,这是我在渲染大图像时得到的.我正在看Apple的核心图像编程指南.关于多线程,它说:“每个线程必须创建自己的CIFilter对象.否则,你的应用程序可能会出现意外行为.”这是什么意思?我实际上是试图在后台线程上运行我的过滤器,所以我可以在主线程上运行HUD(见下文).这在coreImage的上下文中是否有意义?

  2. XCode 3.2 Ruby和Python模板

    在xcode3.2下,我的ObjectiveCPython/Ruby项目仍然可以打开更新和编译,但是你无法创建新项目.鉴于xcode3.2中缺少ruby和python的所有痕迹(即创建项目并添加新的ruby/python文件),是否有一种简单的方法可以再次安装模板?我发现了一些关于将它们复制到某个文件夹的信息,但我似乎无法让它工作,我怀疑文件夹的位置已经改变为3.2.解决方法3.2中的应用程序模板

  3. UIWebView stringByEvaluatingJavaScriptFromString在使用GCD调用时挂在iOS5.0 / 5.1上

    我在viewDidLoad中有以下代码,它在iOS4.3上正常工作,但它挂在iOS5/5.1上.在iOS5/5.1上,显示警告对话框,但无法关闭,UI线程冻结,OK按钮无法单击.这是一个bug吗?解决方法经过测试,我认为它是一个Bug,并改变使用的代码会解决

  4. ios – iPhone:一段时间后,所有动画都停止工作

    我最近有一些奇怪的行为.所有动画有时会突然停止工作.有时候一切顺利,其他时候就会发生.推送和弹出视图只是捕捉到位,UITableViewcellrow动画不起作用.该应用程序使用了很多后台线程,所以也许有东西在那里?我不能真正发布代码,因为我不知道问题在哪里.有人有同样的问题吗?解决方法你可以尝试在不同的后台线程中更新UI/animate吗?

  5. ios – 在后台线程上创建一个视图,在主线程中添加主视图

    我是C的新来的,来自.NET和java背景.所以我需要异步地创建一些UIwebviews,我在自己的队列中使用这个因为你会想象这会引发错误:那么如何在主线程上添加子视图?

  6. 在iOS模拟器上显示GMSMarkers时发生GMSThreadException

    我正在开发一个应用程序,在GMSMapView上显示大约200个GMSMarkers我尝试了2种方法来显示标记.Method1有点慢,但没有错误发生,但是,Method2在真实设备上运行顺畅但我在iOS模拟器上测试它时得到了GMSThreadException以下是问题:1.继续使用method2可以吗?任何帮助是赞赏OrzUPDATE1正如@ztan在下面回答的那样,我必须在主线程中完成所有这些,有没有比这更好的解决方案?

  7. ios – 在分离的线程问题中使用块的异步FB请求

    我正在使用IOSFacebookSDK3,我正在尝试使用更高效的方法.所以我想在单独的线程中管理一些请求.例如这个请求:>我正在使用这个在我的Feed上发布内容,我调用一个方法来自动加载此请求的内容,然后在方法中调用此块以启动请求.这个很好用.>问题是如果我不将此请求放在一个块中,那就不起作用了.此请求不起作用我想弄清楚,但我不明白是什么问题.在此先感谢您的帮助.解决方法我有一点这个问题.确保在主线程上分派代码.

  8. ios – UIAlertController有时会阻止UIRefreshControl隐藏

    编辑:这是我在代码中创建刷新控件的方法:解决方法我相信它真的与tableView在UIAlertController呈现之前没有滚动回来有关.我试图设置showUpdateInfo方法的延迟,这似乎有效.我猜当用户只拉一次它需要半秒钟来显示UIAlertController检查是否有帮助.这是我的代码如果有帮助,请告诉我.

  9. Swift之旅三函数与闭包

    用func来定义一个函数。试一试去掉day参数。你可以用嵌套函数来把又长又臭的代码组织一下。函数其实是闭包的一个特例。闭包内的代码可以访问到变量和函数必须是与闭包创建的范围是一致的,即便闭包是在另一个范围内执行——在讲嵌套函数时就说过这个例子了。试一试重写这个闭包,对所有奇数都返回0有几种方法可以更简明地写闭包。单行语句的闭包隐式返回语句中的值。作为最后一个参数传到函数里的闭包可以在括号后面马上出现。

  10. 在Swift中应用Grand Central Dispatch(上

    在这两篇教程中,你会学到GCD的来龙去脉。起步libdispatch是Apple所提供的在IOS和OSX上进行并发编程的库,而GCD正是它市场化的名字。Swift中的闭包和OC中的块类似甚至于他们几乎就是可交换使用的。但OC中的块可以安全的替换成Swift中的闭包。再一次,这完全取决于GCD。QoS等级表示了提交任务的意图,使得GCD可以决定如何制定优先级。QOS_CLASS_USER_INteraCTIVE:userinteractive等级表示任务需要被立即执行以提供好的用户体验。

随机推荐

  1. 10 个Python中Pip的使用技巧分享

    众所周知,pip 可以安装、更新、卸载 Python 的第三方库,非常方便。本文小编为大家总结了Python中Pip的使用技巧,需要的可以参考一下

  2. python数学建模之三大模型与十大常用算法详情

    这篇文章主要介绍了python数学建模之三大模型与十大常用算法详情,文章围绕主题展开详细的内容介绍,具有一定的参考价值,感想取得小伙伴可以参考一下

  3. Python爬取奶茶店数据分析哪家最好喝以及性价比

    这篇文章主要介绍了用Python告诉你奶茶哪家最好喝性价比最高,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧

  4. 使用pyinstaller打包.exe文件的详细教程

    PyInstaller是一个跨平台的Python应用打包工具,能够把 Python 脚本及其所在的 Python 解释器打包成可执行文件,下面这篇文章主要给大家介绍了关于使用pyinstaller打包.exe文件的相关资料,需要的朋友可以参考下

  5. 基于Python实现射击小游戏的制作

    这篇文章主要介绍了如何利用Python制作一个自己专属的第一人称射击小游戏,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起动手试一试

  6. Python list append方法之给列表追加元素

    这篇文章主要介绍了Python list append方法如何给列表追加元素,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

  7. Pytest+Request+Allure+Jenkins实现接口自动化

    这篇文章介绍了Pytest+Request+Allure+Jenkins实现接口自动化的方法,文中通过示例代码介绍的非常详细。对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  8. 利用python实现简单的情感分析实例教程

    商品评论挖掘、电影推荐、股市预测……情感分析大有用武之地,下面这篇文章主要给大家介绍了关于利用python实现简单的情感分析的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下

  9. 利用Python上传日志并监控告警的方法详解

    这篇文章将详细为大家介绍如何通过阿里云日志服务搭建一套通过Python上传日志、配置日志告警的监控服务,感兴趣的小伙伴可以了解一下

  10. Pycharm中运行程序在Python console中执行,不是直接Run问题

    这篇文章主要介绍了Pycharm中运行程序在Python console中执行,不是直接Run问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

返回
顶部