使用 @jit
实现自动并行化
设置 jit()
的 parallel 选项会启用一个 Numba 转换过程,该过程尝试自动并行化并执行其他优化(部分)函数。目前,此功能仅适用于 CPU。
用户定义函数内部的一些操作,例如将标量值添加到数组中,已知具有并行语义。用户程序可能包含许多此类操作,尽管每个操作都可以单独并行化,但由于缓存行为不佳,这种方法通常性能不佳。相反,通过自动并行化,Numba尝试在用户程序中识别此类操作,并将相邻的操作融合在一起,以形成一个或多个自动并行运行的内核。该过程是完全自动化的,无需修改用户程序,这与Numba的 vectorize()
或 guvectorize()
机制形成对比,后者需要手动努力来创建并行内核。
支持的操作
在本节中,我们列出了所有具有并行语义的数组操作,并尝试对其进行并行化。
所有由 案例研究-数组表达式 支持的 numba 数组操作,包括 Numpy 数组之间的常见算术函数,以及数组和标量之间的操作,以及 Numpy 的 ufuncs。它们通常被称为 逐元素 或 逐点 数组操作:
一元运算符:
+
-
~
二元运算符:
+
-
*
/
/?
%
|
>>
^
<<
&
**
//
比较运算符:
==
!=
<
<=
>
>=
Numpy ufuncs 在 nopython 模式 中支持。
用户定义的
DUFunc
通过vectorize()
。
Numpy 的归约函数
sum
、prod
、min
、max
、argmin
和argmax
。此外,还有数组数学函数mean
、var
和std
。Numpy 数组创建函数
zeros
、ones
、arange
、linspace
,以及多个随机函数(rand、randn、ranf、random_sample、sample、random、standard_normal、chisquare、weibull、power、geometric、exponential、poisson、rayleigh、normal、uniform、beta、binomial、f、gamma、lognormal、laplace、randint、triangular)。Numpy
dot
函数用于矩阵和向量之间,或两个向量之间的运算。在所有其他情况下,使用 Numba 的默认实现。当操作数的维度与大小匹配时,上述操作也支持多维数组。Numpy 在混合维度或大小的数组之间的广播的完整语义不受支持,也不支持在选定维度上的归约。
数组赋值,其中目标是通过切片或布尔数组选择的数组,并且被赋值的值是标量或另一个选择,其中切片范围或位数组被推断为兼容。
functools
的reduce
操作符支持在 1D Numpy 数组上指定并行归约,但初始值参数是必需的。
显式并行循环
代码转换传递的另一个特性(当 parallel=True
时)是支持显式并行循环。可以使用 Numba 的 prange
代替 range
来指定一个循环可以并行化。用户需要确保循环没有跨迭代依赖性,除了支持的归约操作。
如果在循环体中,一个变量通过支持的二元函数/运算符使用其先前的值进行更新,则会自动推断出归约操作。以下函数/运算符受支持:+=
、+
、-=
、-
、*=
、*
、/=
、/
、max()
、min()
。对于受支持的运算符(即不是``max``和``min``函数),归约的初始值会自动推断。请注意,//=``运算符不受支持,因为在一般情况下,结果取决于除数的应用顺序。然而,如果所有除数都是整数,那么程序员可能能够将
//=``归约重写为``*=``归约,然后在并行区域之后进行一次单次地板除法,其中除数是累积乘积。对于``max``和``min``函数,归约变量应在进入``prange``循环之前持有标识值。这种方式的归约操作支持标量和任意维度的数组。
下面的示例展示了一个带有缩减的并行循环(A
是一个一维的 Numpy 数组):
from numba import njit, prange
@njit(parallel=True)
def prange_test(A):
s = 0
# Without "parallel=True" in the jit-decorator
# the prange statement is equivalent to range
for i in prange(A.shape[0]):
s += A[i]
return s
以下示例演示了对二维数组进行的产品缩减:
from numba import njit, prange
import numpy as np
@njit(parallel=True)
def two_d_array_reduction_prod(n):
shp = (13, 17)
result1 = 2 * np.ones(shp, np.int_)
tmp = 2 * np.ones_like(result1)
for i in prange(n):
result1 *= tmp
return result1
备注
在使用 Python 的 range
来引发循环时,Numba 将归纳变量类型化为有符号整数。当 parallel=False
时,Numba 的 prange
也是如此。然而,对于 parallel=True
,如果范围可以识别为严格正数,归纳变量的类型将是 uint64
。uint64
归纳变量的影响通常在涉及它和有符号整数的操作中最明显。根据 Numba 的类型强制规则,这种情况通常会导致操作产生浮点结果类型。
备注
只有具有单一入口块和单一出口块的 prange 循环才能被转换为并行运行。循环中的异常控制流,例如断言,可能会生成多个出口块,导致循环无法并行运行。如果出现这种情况,Numba 将发出警告,指示哪个循环无法并行化。
然而,在将数组切片或元素减少时,如果由切片或索引指定的元素被多个并行线程同时写入,则应小心。编译器可能无法检测到这种情况,从而导致竞争条件。
以下示例演示了这种情况,其中并行for循环执行中的竞争条件导致返回值不正确:
from numba import njit, prange
import numpy as np
@njit(parallel=True)
def prange_wrong_result(x):
n = x.shape[0]
y = np.zeros(4)
for i in prange(n):
# accumulating into the same element of `y` from different
# parallel iterations of the loop results in a race condition
y[:] += x[i]
return y
如下例所示,累加元素被明确指定:
from numba import njit, prange
import numpy as np
@njit(parallel=True)
def prange_wrong_result(x):
n = x.shape[0]
y = np.zeros(4)
for i in prange(n):
# accumulating into the same element of `y` from different
# parallel iterations of the loop results in a race condition
y[i % 4] += x[i]
return y
而执行整个数组归约是可以的:
from numba import njit, prange
import numpy as np
@njit(parallel=True)
def prange_ok_result_whole_arr(x):
n = x.shape[0]
y = np.zeros(4)
for i in prange(n):
y += x[i]
return y
如下所示,在并行缩减循环外部创建切片引用:
from numba import njit, prange
import numpy as np
@njit(parallel=True)
def prange_ok_result_outer_slice(x):
n = x.shape[0]
y = np.zeros(4)
z = y[:]
for i in prange(n):
z += x[i]
return y
示例
在本节中,我们给出一个示例,展示此功能如何帮助并行化逻辑回归:
@numba.jit(nopython=True, parallel=True)
def logistic_regression(Y, X, w, iterations):
for i in range(iterations):
w -= np.dot(((1.0 / (1.0 + np.exp(-Y * np.dot(X, w))) - 1.0) * Y), X)
return w
我们不会讨论算法的细节,而是专注于这个程序在自动并行化下的行为:
输入
Y
是一个大小为N
的向量,X
是一个N x D
的矩阵,而w
是一个大小为D
的向量。函数体是一个迭代循环,用于更新变量
w
。循环体由一系列向量和矩阵操作组成。内部
dot
操作生成一个大小为N
的向量,随后是一系列算术操作,这些操作要么是在一个标量和大小为N
的向量之间进行,要么是在两个大小均为N
的向量之间进行。外部的
dot
生成一个大小为D
的向量,然后对变量w
进行原地数组减法。通过自动并行化,所有生成大小为
N
的数组的操作都被融合在一起,成为一个单独的并行内核。这包括内部的dot
操作以及其后的所有逐点数组操作。外部的
dot
操作生成一个不同维度的结果数组,并且不会与上述内核融合。
在这里,利用并行硬件的唯一要求是为 jit()
设置 parallel 选项,而不需要修改 logistic_regression
函数本身。如果我们使用 guvectorize()
提供一个等效的并行实现,这将需要进行广泛的更改,重写代码以提取可以并行化的内核计算,这既繁琐又具有挑战性。
不支持的操作
本节包含一个非详尽的常见但目前不支持的功能列表:
修改列表不是线程安全的
在
prange
并行区域中,对容器类型(例如列表、集合和字典)的并发写操作不是线程安全的,例如:@njit(parallel=True) def invalid(): z = [] for i in prange(10000): z.append(i) return z
上述操作极有可能导致损坏或访问冲突,因为容器在变异时需要线程安全,但此功能尚未实现。
归纳变量不与线程ID关联
在使用
prange
基础循环诱导的归纳变量时,结合get_num_threads
作为确保安全写入预设大小容器的手段是无效的,例如:@njit(parallel=True) def invalid(): n = get_num_threads() z = [0 for _ in range(n)] for i in prange(100): z[i % n] += i return z
上述方法有时可能会看似有效,但这只是侥幸。无法保证哪些索引被分配给哪些执行线程,也无法保证循环迭代的执行顺序。
诊断
备注
目前并非所有的并行变换和函数都能通过代码生成过程进行跟踪。偶尔可能会缺少一些循环或变换的诊断信息。
对于 jit()
的 parallel 选项,可以生成关于装饰代码自动并行化过程中所进行转换的诊断信息。这些信息可以通过两种方式访问,第一种是设置环境变量 NUMBA_PARALLEL_DIAGNOSTICS
,第二种是调用 parallel_diagnostics()
,这两种方法提供相同的信息并打印到 STDOUT
。诊断信息中的详细程度由一个介于1到4之间的整数参数控制,1表示最不详细,4表示最详细。例如:
@njit(parallel=True)
def test(x):
n = x.shape[0]
a = np.sin(x)
b = np.cos(a * a)
acc = 0
for i in prange(n - 2):
for j in prange(n - 1):
acc += b[i] + b[j + 1]
return acc
test(np.arange(10))
test.parallel_diagnostics(level=4)
生成:
================================================================================
======= Parallel Accelerator Optimizing: Function test, example.py (4) =======
================================================================================
Parallel loop listing for Function test, example.py (4)
--------------------------------------|loop #ID
@njit(parallel=True) |
def test(x): |
n = x.shape[0] |
a = np.sin(x)---------------------| #0
b = np.cos(a * a)-----------------| #1
acc = 0 |
for i in prange(n - 2):-----------| #3
for j in prange(n - 1):-------| #2
acc += b[i] + b[j + 1] |
return acc |
--------------------------------- Fusing loops ---------------------------------
Attempting fusion of parallel loops (combines loops with similar properties)...
Trying to fuse loops #0 and #1:
- fusion succeeded: parallel for-loop #1 is fused into for-loop #0.
Trying to fuse loops #0 and #3:
- fusion failed: loop dimension mismatched in axis 0. slice(0, x_size0.1, 1)
!= slice(0, $40.4, 1)
----------------------------- Before Optimization ------------------------------
Parallel region 0:
+--0 (parallel)
+--1 (parallel)
Parallel region 1:
+--3 (parallel)
+--2 (parallel)
--------------------------------------------------------------------------------
------------------------------ After Optimization ------------------------------
Parallel region 0:
+--0 (parallel, fused with loop(s): 1)
Parallel region 1:
+--3 (parallel)
+--2 (serial)
Parallel region 0 (loop #0) had 1 loop(s) fused.
Parallel region 1 (loop #3) had 0 loop(s) fused and 1 loop(s) serialized as part
of the larger parallel loop (#3).
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
---------------------------Loop invariant code motion---------------------------
Instruction hoisting:
loop #0:
Failed to hoist the following:
dependency: $arg_out_var.10 = getitem(value=x, index=$parfor__index_5.99)
dependency: $0.6.11 = getattr(value=$0.5, attr=sin)
dependency: $expr_out_var.9 = call $0.6.11($arg_out_var.10, func=$0.6.11, args=[Var($arg_out_var.10, example.py (7))], kws=(), vararg=None)
dependency: $arg_out_var.17 = $expr_out_var.9 * $expr_out_var.9
dependency: $0.10.20 = getattr(value=$0.9, attr=cos)
dependency: $expr_out_var.16 = call $0.10.20($arg_out_var.17, func=$0.10.20, args=[Var($arg_out_var.17, example.py (8))], kws=(), vararg=None)
loop #3:
Has the following hoisted:
$const58.3 = const(int, 1)
$58.4 = _n_23 - $const58.3
--------------------------------------------------------------------------------
为了帮助不熟悉在使用 parallel 选项时所进行的转换的用户,并协助理解后续章节,以下定义提供:
- 循环融合
循环融合 是一种技术,通过在某些条件下将具有相同边界的循环合并,生成一个主体更大的循环(旨在提高数据局部性)。
- 循环序列化
当任意数量的
prange
驱动的循环出现在另一个prange
驱动的循环内部时,会发生循环序列化。在这种情况下,所有prange
循环中最外层的循环并行执行,而任何内部的prange
循环(嵌套或其他)都被视为标准的range
循环。本质上,不会发生嵌套并行。
- 循环不变代码移动
循环不变代码移动 是一种优化技术,它分析循环以寻找可以移出循环体而不改变循环执行结果的语句,这些语句随后被“提升”出循环以节省重复计算。
- 分配提升
分配提升是一种循环不变代码移动的特殊情况,由于一些常见的 NumPy 分配方法的设计,这是可能的。这种技术的解释最好通过一个例子来驱动:
@njit(parallel=True) def test(n): for i in prange(n): temp = np.zeros((50, 50)) # <--- Allocate a temporary array with np.zeros() for j in range(50): temp[j, j] = i # ...do something with temp
在内部,这被转换为大致如下:
@njit(parallel=True) def test(n): for i in prange(n): temp = np.empty((50, 50)) # <--- np.zeros() is rewritten as np.empty() temp[:] = 0 # <--- and then a zero initialisation for j in range(50): temp[j, j] = i # ...do something with temp
然后提升之后:
@njit(parallel=True) def test(n): temp = np.empty((50, 50)) # <--- allocation is hoisted as a loop invariant as `np.empty` is considered pure for i in prange(n): temp[:] = 0 # <--- this remains as assignment is a side effect for j in range(50): temp[j, j] = i # ...do something with temp
可以看出,
np.zeros
分配被拆分为一次分配和一次赋值,然后将分配从i
循环中提升出来,这样生成的代码更高效,因为分配只发生一次。
并行诊断报告部分
报告分为以下几个部分:
- 代码注释
这是第一部分,包含带有循环的装饰函数的源代码,这些循环具有被识别和枚举的并行语义。源代码右侧的
loop #ID
列与识别出的并行循环对齐。从示例中,#0
是np.sin
,#1
是np.cos
,而#2
和#3
是prange()
:Parallel loop listing for Function test, example.py (4) --------------------------------------|loop #ID @njit(parallel=True) | def test(x): | n = x.shape[0] | a = np.sin(x)---------------------| #0 b = np.cos(a * a)-----------------| #1 acc = 0 | for i in prange(n - 2):-----------| #3 for j in prange(n - 1):-------| #2 acc += b[i] + b[j + 1] | return acc |
值得注意的是,循环ID是按照它们被发现的顺序进行枚举的,这不一定与源代码中的顺序相同。此外,还应注意到并行变换使用了一个静态计数器来进行循环ID索引。因此,由于内部优化/变换使用了相同的计数器,循环ID索引可能不会从0开始,这些优化/变换对用户是不可见的。
- 融合循环
本节描述了尝试融合发现的循环,并指出哪些成功,哪些失败。在融合失败的情况下,会给出原因(例如:依赖于其他数据)。从示例中:
--------------------------------- Fusing loops --------------------------------- Attempting fusion of parallel loops (combines loops with similar properties)... Trying to fuse loops #0 and #1: - fusion succeeded: parallel for-loop #1 is fused into for-loop #0. Trying to fuse loops #0 and #3: - fusion failed: loop dimension mismatched in axis 0. slice(0, x_size0.1, 1) != slice(0, $40.4, 1)
可以看出,尝试了循环
#0
和#1
的融合,并且这一操作成功了(两者都基于x
的相同维度)。在#0
和#1
成功融合之后,尝试了#0``(现在包括融合的 ``#1
循环)和#3
之间的融合。这一融合失败了,因为存在循环维度不匹配的问题,#0
的大小是x.shape
,而#3
的大小是x.shape[0] - 2
。
- 优化前
本节展示了代码中并行区域的结构,在任何优化发生之前,但循环与其最终的并行区域相关联(这是为了使优化前后的输出可以直接比较)。如果存在无法融合的循环,则可能存在多个并行区域,在这种情况下,每个区域内的代码将并行执行,但每个并行区域将按顺序运行。从示例中:
Parallel region 0: +--0 (parallel) +--1 (parallel) Parallel region 1: +--3 (parallel) +--2 (parallel)
正如 Fusing loops 部分所暗示的,代码中必然存在两个并行区域。第一个包含循环
#0
和#1
,第二个包含#3
和#2
,所有循环都被标记为parallel
,因为尚未进行优化。
- 优化后
本节展示了代码在优化后并行区域的结构。同样,并行区域按其对应的循环进行编号,但这次合并或序列化的循环会被特别标注,并提供一个总结。例如:
Parallel region 0: +--0 (parallel, fused with loop(s): 1) Parallel region 1: +--3 (parallel) +--2 (serial) Parallel region 0 (loop #0) had 1 loop(s) fused. Parallel region 1 (loop #3) had 0 loop(s) fused and 1 loop(s) serialized as part of the larger parallel loop (#3).
可以注意到,并行区域 0 包含循环
#0
,并且在 融合循环 部分中可以看到,循环#1
被融合到循环#0
中。还可以注意到,并行区域 1 包含循环#3
,并且循环#2``(内部的 ``prange()
)已被序列化,以便在循环#3
的主体中执行。
- 循环不变代码移动
本节展示了每次循环后,优化发生的情况:
未能提升的指令及其失败原因(依赖/不纯)。
被提升的指令。
任何可能发生的分配提升。
从示例中:
Instruction hoisting: loop #0: Failed to hoist the following: dependency: $arg_out_var.10 = getitem(value=x, index=$parfor__index_5.99) dependency: $0.6.11 = getattr(value=$0.5, attr=sin) dependency: $expr_out_var.9 = call $0.6.11($arg_out_var.10, func=$0.6.11, args=[Var($arg_out_var.10, example.py (7))], kws=(), vararg=None) dependency: $arg_out_var.17 = $expr_out_var.9 * $expr_out_var.9 dependency: $0.10.20 = getattr(value=$0.9, attr=cos) dependency: $expr_out_var.16 = call $0.10.20($arg_out_var.17, func=$0.10.20, args=[Var($arg_out_var.17, example.py (8))], kws=(), vararg=None) loop #3: Has the following hoisted: $const58.3 = const(int, 1) $58.4 = _n_23 - $const58.3
首先需要注意的是,这些信息是为高级用户准备的,因为它涉及到正在转换的函数的 Numba IR 。例如,示例源代码中的表达式
a * a
部分翻译为 IR 中的表达式$arg_out_var.17 = $expr_out_var.9 * $expr_out_var.9
,这显然不能从loop #0
中提升出来,因为它不是循环不变的!而在loop #3
中,表达式$const58.3 = const(int, 1)
来自源代码b[j + 1]
,数字1
显然是一个常量,因此可以提升出循环。
调度
默认情况下,Numba 将并行区域的迭代分成大致相等大小的块,并将每个块分配给每个配置的线程。(参见 设置线程数)。这种调度方法等同于 OpenMP 的静态调度,没有指定块大小,并且适用于每次迭代所需的工作量几乎恒定的情况。相反,如果每次迭代所需的工作量,如下面的 prange
循环所示,变化很大,那么这种静态调度方法可能会导致负载不平衡和更长的执行时间。
1from numba import (njit,
2 prange,
3 )
4import numpy as np
5
6@njit(parallel=True)
7def func1():
8 n = 100
9 vals = np.empty(n)
10 # The work in each iteration of the following prange
11 # loop is proportional to its index.
12 for i in prange(n):
13 cur = i + 1
14 for j in range(i):
15 if cur % 2 == 0:
16 cur //= 2
17 else:
18 cur = cur * 3 + 1
19 vals[i] = cur
20 return vals
21
22result = func1()
在这种情况下,Numba 提供了一种机制来控制并行区域(即块大小)的多少次迭代进入每个块。Numba 然后计算所需块的数量,该数量等于迭代次数除以块大小,并截断为最接近的整数。所有这些块的大小大致相等。Numba 然后如上所述将一个这样的块分配给每个配置的线程,当一个线程完成一个块时,Numba 将下一个可用的块分配给该线程。这种调度方法类似于 OpenMP 的动态调度选项,具有指定的块大小。(请注意,只有当底层 Numba 线程后端,线程层,也能够进行动态调度时,Numba 才能支持这种并行区域的动态调度。目前,只有 tbb
后端能够进行动态调度,因此如果要从这种块大小选择机制中获得任何性能优势,则需要使用该后端。)为了最小化执行时间,程序员必须选择一个块大小,该大小在较小块大小带来的更好负载均衡和较大块大小带来的较少调度开销之间取得平衡。有关块大小的内部实现的更多详细信息,请参见 并行块大小详情。
在一个块中的并行区域的迭代次数被存储为一个线程局部变量,并且可以使用 numba.set_parallel_chunksize()
进行设置。这个函数接受一个整数参数,其值必须大于或等于0。值为0是默认值,指示Numba使用上述的静态调度方法。大于0的值指示Numba使用该值作为上述动态调度方法中的块大小。numba.set_parallel_chunksize()
返回块大小的先前值。这个线程局部变量的当前值被用作该线程调用的所有后续并行区域的块大小。然而,在进入一个并行区域时,Numba将执行该并行区域的每个线程的块大小线程局部变量设置回默认值0,因为嵌套的并行区域不太可能需要相同的块大小。如果同一个线程用于执行顺序区域和并行区域,那么该线程的块大小变量在并行区域开始时被设置为0,并在退出并行区域时恢复到其原始值。这种行为在下面的例子中的 func1
中得到了展示,即在 prange
并行区域内部报告的块大小是0,但在并行区域外部是4。请注意,如果由于任何原因(例如,设置 parallel=False
)``prange`` 没有并行执行,那么在非并行prange内部报告的块大小将被报告为4。这种行为可能最初对程序员来说是违反直觉的,因为它与其他语言中线程局部变量的典型行为不同。如果程序员希望为可能启动的任何嵌套并行区域指定块大小,程序员可以在执行并行区域的线程中使用本节中描述的块大小API。可以通过调用 numba.get_parallel_chunksize()
获取并行块大小的当前值。这两个函数都可以从标准Python中使用,也可以在Numba JIT编译的函数中使用,如下所示。两个 func1
的调用都将以4的块大小执行,而 func2
将使用8的块大小。
1from numba import (njit,
2 prange,
3 set_parallel_chunksize,
4 get_parallel_chunksize,
5 )
6
7@njit(parallel=True)
8def func1(n):
9 acc = 0
10 print(get_parallel_chunksize()) # Will print 4.
11 for i in prange(n):
12 print(get_parallel_chunksize()) # Will print 0.
13 acc += i
14 print(get_parallel_chunksize()) # Will print 4.
15 return acc
16
17@njit(parallel=True)
18def func2(n):
19 acc = 0
20 # This version gets the previous chunksize explicitly.
21 old_chunksize = get_parallel_chunksize()
22 set_parallel_chunksize(8)
23 for i in prange(n):
24 acc += i
25 set_parallel_chunksize(old_chunksize)
26 return acc
27
28# This version saves the previous chunksize as returned
29# by set_parallel_chunksize.
30old_chunksize = set_parallel_chunksize(4)
31result1 = func1(12)
32result2 = func2(12)
33result3 = func1(12)
34set_parallel_chunksize(old_chunksize)
由于这种保存和恢复的习惯用法非常常见,Numba 提供了 parallel_chunksize()
with 子句上下文管理器来简化这种习惯用法。如下所示,这个 with 子句可以从标准 Python 和 Numba JIT 编译的函数中调用。与其他 Numba 上下文管理器一样,请注意在作为 Numba JIT 编译函数一部分的上下文管理块中不支持引发异常。
1from numba import njit, prange, parallel_chunksize
2
3@njit(parallel=True)
4def func1(n):
5 acc = 0
6 for i in prange(n):
7 acc += i
8 return acc
9
10@njit(parallel=True)
11def func2(n):
12 acc = 0
13 with parallel_chunksize(8):
14 for i in prange(n):
15 acc += i
16 return acc
17
18with parallel_chunksize(4):
19 result1 = func1(12)
20 result2 = func2(12)
21 result3 = func1(12)
请注意,这些设置块大小的函数仅在使用 parallel 选项进行 Numba 自动并行化时有效。块大小规范对 vectorize()
装饰器或 guvectorize()
装饰器没有影响。
参见
并行JIT选项, 并行常见问题解答