Joblib并行化
2017-12-07 15:22阅读:
一般用法
Joblib提供了一个简单的帮助类来编写并行化的循环。其核心思想是把代码写成生成器表达式的样子,然会再将它转换为并行计算:
>>>
from math import sqrt
>>> [sqrt(i ** 2)
for i in range(10)]
[0.0, 1.0, 2.0,
3.0, 4.0, 5.0,
6.0, 7.0, 8.0,
9.0]
使用以下方式,可将计算分布到两个CPU上:
>>>
from math
import sqrt
>>> from joblib
import Parallel, delayed
>>>
Parallel(n_jobs=
2)(delayed(sqrt)(i **
2)
for i
in range(
10))
[
0.0,
1.0,
2.0,
3.0,
4.0,
5.0,
6.0,
7.0,
8.0,
9.0]
以上,
Parallel对象会创建一个进程池,以便在多进程中执行每一个列表项。函数
delayed是一个创建元组
(function, args, kwargs)的简单技巧。
注意
在Windows上使用joblib.Parallel的时候,要保护主循环以避免递归生成子进程
。换句话说,你的代码应该如下:
import .... def
function1(...): ... def
function2(...): ... ...
if __name__ == '__main__': # do
stuff with imports and functions defined about
...
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
在if __name__ ==
'__main__'
块的外面除了imports和definitions不能有代码被执行。
使用多线程
默认情况下,Parallel使用Python的多进程模块(multiprocessing)来fork工作进程,以便任务可以在独立的CPU上同时执行。这对于一般的Python程序来说是合理的,但这会产生一些开销,即,输入输出数据需要被序列化到一个排队,才能在工作进程之间进行通信。
当然,如果你知道,你调用的函数是基于编译扩展的,且它在执行的大部分时间都会释放Python的全局解释器锁(GIL),那么此时使用多线程可能会更高效。
为了使用多线程,只需在构造Parallel的时候设置
backend='threading'即可:
>>>
Parallel(n_jobs=2, backend='threading')(
... delayed(sqrt)(i ** 2)
for i in range(10))
[0.0, 1.0, 2.0,
3.0, 4.0, 5.0,
6.0, 7.0, 8.0,
9.0]
在共享内存(memmaping)中操作数值型数据
默认情况下,当
n_jobs !=
1时,joblib使用Python标准库的多进程模块(multiprocessing)来创建真实的Python工作进程
。传递给Parallel调用的参数被序列化,并且会在每一个工作进程中重新创建。
这对于大型参数会成为一个问题,因为它们会被工作进程创建
n_jobs次。
这在使用numpy进行科学计算中经常发生。joblib.Parallel对大型数组提供了一个特别的处理方法就是自动dump它们到文件系统,并将引用传递给工作进程,然后让工作进程使用
numpy.ndarray的子类
numpy.memmap以内存映射的方式打开它们
。这使得所有工作进程可以共享一段数据(更准确的说是共享一段内存)。
注意
如果你的代码能释放GIL,那么使用backend='threading'会更高效。
自动将array转换为memmap
通过在数组的大小上配置一个阀值自动触发将array转换为memmap:
>>> import numpy as np
>>> from joblib import Parallel, delayed >>> from
joblib.pool import has_shareable_memory >>>
Parallel(n_jobs=2, max_nbytes=1e6)(
... delayed(has_shareable_memory)(np.ones(int(i)))
... for i in
[1e2, 1e4, 1e6]) [False,
False, True]
默认情况下,数据被dump到/dev/shm共享内存分区,如果它存在且可写
(在Linux上就是这样的)。否则将使用操作系统的临时文件夹。可以通过设置Parallel构造函数的参数
temp_folder来自定义临时数据文件的位置 。
设置
max_nbytes=None可禁用自动转换。
手动映射输入数据
为了更好地使用内存,你可以手动将数组dump成memmap,然后在fork工作进程之前从父进程中删除原数组。
让我们在父进程中创建一个大型数组:
>>> large_array
= np.ones(int(1e6))
然后,将它dump到本地文件,以便内存映射:
>>>
import tempfile >>>
import os >>> from
joblib import load, dump >>>
temp_folder = tempfile.mkdtemp() >>> filename
= os.path.join(temp_folder, 'joblib_test.mmap')
>>> if os.path.exists(filename):
os.unlink(filename) >>> _ = dump(large_array,
filename) >>> large_memmap = load(filename,
mmap_mode='r+')
此时,变量
large_memmap指向一个
numpy.memmap实例:
>>>
large_memmap.__class__.__name__, large_array.nbytes,
large_array.shape ('memmap', 8000000,
(1000000,)) >>>
np.allclose(large_array, large_memmap)
True
然后,我们就可以释放原来的数组了:
>>>
del large_array >>>
import gc >>> _ =
gc.collect()
large_memmap还可以被切片成小的memmap:
>>> small_memmap
= large_memmap[2:5]
>>> small_memmap.__class__.__name__,
small_memmap.nbytes, small_memmap.shape ('memmap',
24, (3,))
最后,对
np.ndarray视图的修改会被写回原来的内存映射文件:
>>> small_array
= np.asarray(small_memmap) >>>
small_array.__class__.__name__, small_array.nbytes,
small_array.shape ('ndarray', 24,
(3,))
所有这三个结构都指向相同的内存区域,且这段内存能够被工作进程直接使用:
>>>
Parallel(n_jobs=2, max_nbytes=None)( ...
delayed(has_shareable_memory)(a) ... for
a in [large_memmap, small_memmap, small_array]) [True,
True, True]
注意 这里我们使用
max_nbytes=None 来禁用
Parallel自动转换功能。实际上small_array也在工作进程的共享内存中。
将并行计算结果写入共享内存
如果你在主进程中使用
w+或
r+模式打开你的数据,那么工作进程就可以使用
r+模式来访问它们,因此,工作进程就能够直接将结果写入内存映射,从而避免了使用串行通信的方式来将结果返回给父进程的操作。
这里是一个例子,为并行进程预先创建
numpy.memmap数据结构:
'''Demo:在joblib.Parallel中`numpy.memmap`的使用
这个例子演示了如何为并行工作进程的输入和输出欲创建memmap数组。 程序的输出样例:: [Worker 93486] Sum for
row 0 is -1599.756454 [Worker 93487] Sum for row 1 is -243.253165
[Worker 93488] Sum for row 3 is 610.201883 [Worker 93489] Sum for
row 2 is 187.982005 [Worker 93489] Sum for row 7 is 326.381617
[Worker 93486] Sum for row 4 is 137.324438 [Worker 93489] Sum for
row 8 is -198.225809 [Worker 93487] Sum for row 5 is -1062.852066
[Worker 93488] Sum for row 6 is 1666.334107 [Worker 93486] Sum for
row 9 is -463.711714 Expected sums computed in the parent process:
[-1599.75645426 -243.25316471 187.98200458 610.20188337
137.32443803 -1062.85206633 1666.33410715 326.38161713
-198.22580876 -463.71171369] Actual sums computed by the worker
processes: [-1599.75645426 -243.25316471 187.98200458 610.20188337
137.32443803 -1062.85206633 1666.33410715 326.38161713
-198.22580876 -463.71171369] ''' import
tempfile import shutil import os
import numpy as np from
joblib import Parallel, delayed from
joblib import load, dump def
sum_row(input, output, i):
'''Compute the sum of a row in input and store it in
output''' sum_ = input[i, :].sum() print('[Worker %d]
Sum for row %d is %f' % (os.getpid(), i, sum_)) output[i] =
sum_ if __name__ == '__main__': rng =
np.random.RandomState(42) folder = tempfile.mkdtemp()
samples_name = os.path.join(folder, 'samples')
sums_name = os.path.join(folder, 'sums')
try: # Generate some data and an allocate an
output buffer samples = rng.normal(size=(10,
int(1e6))) # Pre-allocate a writeable shared
memory map as a container for the # results of the
parallel computation sums = np.memmap(sums_name,
dtype=samples.dtype, shape=samples.shape[0],
mode='w+') # Dump the input data to disk to free
the memory dump(samples, samples_name) # Release the
reference on the original in memory array and replace it
# by a reference to the memmap array so that the garbage
collector can # release the memory before forking.
gc.collect() is internally called # in Parallel just
before forking. samples = load(samples_name,
mmap_mode='r') # Fork the worker processes to
perform computation concurrently
Parallel(n_jobs=4)(delayed(sum_row)(samples, sums, i)
for i in
range(samples.shape[0])) # Compare the results
from the output buffer with the ground truth
print('Expected sums computed in the parent process:')
expected_result = samples.sum(axis=1)
print(expected_result) print('Actual sums computed by the
worker processes:') print(sums) assert
np.allclose(expected_result, sums) finally:
try: shutil.rmtree(folder) except:
print('Failed to delete: ' + folder)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
警告
因为numpy没有提供原子操作,所以并发中的工作进程写共享内存可能会破坏数据。而之前的例子中,每一个任务都只更新结果数组中对应的一项,因此没有这个风险。
最后,在完成计算后,不要忘记清理临时文件夹:
>>> import shutil
>>> try: ...
shutil.rmtree(temp_folder) ... except OSError:
... pass # this can sometimes fail under
Windows