平行化
歷史上,CPython 受全域直譯器鎖(GIL)限制,一次只允許單一執行緒驅動 Python 直譯器。這使得 Python 的執行緒對CPU-bound 任務不太合適,並常迫使開發者接受多行程的額外負擔。
Rust 很適合多執行緒程式碼,而像 rayon 這樣的函式庫能讓你以最小成本運用安全的平行化。Python::detach 方法可讓 Python 直譯器在 Rust 工作進行時處理其他工作。
若要在應用程式中啟用完整平行化,也可考慮使用自 Python 3.14 起支援的自由執行緒 Python。
Python GIL 下的平行化
我們來看看 word-count 範例,其中的 search 函式使用 rayon 軟體箱以平行方式計算字數。
#![allow(dead_code)]
use pyo3::prelude::*;
// 這些特徵讓我們可以使用 `par_lines` 和 `map`。
use rayon::str::ParallelString;
use rayon::iter::ParallelIterator;
/// 計算某行中 needle 出現次數,不區分大小寫
fn count_line(line: &str, needle: &str) -> usize {
let mut total = 0;
for word in line.split(' ') {
if word == needle {
total += 1;
}
}
total
}
#[pyfunction]
fn search(contents: &str, needle: &str) -> usize {
contents
.par_lines()
.map(|line| count_line(line, needle))
.sum()
}
假設你有一個耗時的 Rust 函式,想要平行執行多次。以下以字數統計的序列版作為範例:
#![allow(dead_code)]
fn count_line(line: &str, needle: &str) -> usize {
let mut total = 0;
for word in line.split(' ') {
if word == needle {
total += 1;
}
}
total
}
fn search_sequential(contents: &str, needle: &str) -> usize {
contents.lines().map(|line| count_line(line, needle)).sum()
}
要讓此函式平行執行,可以使用 Python::detach 暫時釋放 GIL,讓其他 Python 執行緒得以運作。接著我們提供一個暴露給 Python 執行環境的函式,將 search_sequential 放在傳給 Python::detach 的閉包中,以實現真正的平行化:
#![allow(dead_code)]
use pyo3::prelude::*;
fn count_line(line: &str, needle: &str) -> usize {
let mut total = 0;
for word in line.split(' ') {
if word == needle {
total += 1;
}
}
total
}
fn search_sequential(contents: &str, needle: &str) -> usize {
contents.lines().map(|line| count_line(line, needle)).sum()
}
#[pyfunction]
fn search_sequential_detached(py: Python<'_>, contents: &str, needle: &str) -> usize {
py.detach(|| search_sequential(contents, needle))
}
現在 Python 執行緒可使用多個 CPU 核心,解決了 Python 多執行緒通常只適合 I/O-bound 任務的限制:
from concurrent.futures import ThreadPoolExecutor
from word_count import search_sequential_detached
executor = ThreadPoolExecutor(max_workers=2)
future_1 = executor.submit(
word_count.search_sequential_detached, contents, needle
)
future_2 = executor.submit(
word_count.search_sequential_detached, contents, needle
)
result_1 = future_1.result()
result_2 = future_2.result()
基準測試
讓我們對 word-count 範例做基準測試,確認 PyO3 確實解鎖了平行化。
我們使用 pytest-benchmark 來測試四個字數統計函式:
- 純 Python 版本
- Rust 平行版本
- Rust 序列版本
- Rust 序列版本在兩個 Python 執行緒中執行兩次
The benchmark script can be found in the PyO3 GitHub repository, and we can run nox in the word-count folder to benchmark these functions.
雖然基準測試結果會因機器而異,但相對結果應與下圖(2020 年中)相近:
-------------------------------------------------------------------------------------------------- benchmark: 4 tests -------------------------------------------------------------------------------------------------
Name (time in ms) Min Max Mean StdDev Median IQR Outliers OPS Rounds Iterations
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_word_count_rust_parallel 1.7315 (1.0) 4.6495 (1.0) 1.9972 (1.0) 0.4299 (1.0) 1.8142 (1.0) 0.2049 (1.0) 40;46 500.6943 (1.0) 375 1
test_word_count_rust_sequential 7.3348 (4.24) 10.3556 (2.23) 8.0035 (4.01) 0.7785 (1.81) 7.5597 (4.17) 0.8641 (4.22) 26;5 124.9457 (0.25) 121 1
test_word_count_rust_sequential_twice_with_threads 7.9839 (4.61) 10.3065 (2.22) 8.4511 (4.23) 0.4709 (1.10) 8.2457 (4.55) 0.3927 (1.92) 17;17 118.3274 (0.24) 114 1
test_word_count_python_sequential 27.3985 (15.82) 45.4527 (9.78) 28.9604 (14.50) 4.1449 (9.64) 27.5781 (15.20) 0.4638 (2.26) 3;5 34.5299 (0.07) 35 1
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
可以看到 Python 執行緒版本沒有比 Rust 序列版本慢太多,這代表相較於單一 CPU 核心的執行,速度翻倍。
在 Rust 執行緒間共享 Python 物件
在上述範例中,我們為底層 Rust 函式建立了 Python 介面,並利用 Python 的 threading 模組平行執行該函式。也可以在 Rust 中啟動執行緒,取得 GIL 並操作 Python 物件。不過在這些情況下必須小心,避免寫出會與 GIL 死結的程式碼。
- 注意:此範例用來示範如何釋放並重新取得 GIL 以避免死結。除非啟動的執行緒之後會釋放 GIL,或你使用的是 CPython 的自由執行緒建置,否則用
rayon平行化會在整個執行期間取得並持有 GIL 的程式碼,並不會帶來多執行緒的速度提升。
在下方範例中,我們共享一個使用 pyclass 巨集定義的使用者 ID 物件 Vec,並啟動執行緒,透過 rayon 的平行迭代器以判斷條件將資料集合處理成 Vec 的布林值:
use pyo3::prelude::*;
// 這些特徵讓我們可以使用 int_par_iter 和 map
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
#[pyclass]
struct UserID {
id: i64,
}
let allowed_ids: Vec<bool> = Python::attach(|outer_py| {
let instances: Vec<Py<UserID>> = (0..10).map(|x| Py::new(outer_py, UserID { id: x }).unwrap()).collect();
outer_py.detach(|| {
instances.par_iter().map(|instance| {
Python::attach(|inner_py| {
instance.borrow(inner_py).id > 5
})
}).collect()
})
});
assert!(allowed_ids.into_iter().filter(|b| *b).count() == 4);
請注意這裡同時有 outer_py 與 inner_py 兩個 Python token。在執行緒之間共享 Python token 是不允許的,執行緒必須各自附加到直譯器,才能存取 Python 物件包裹的資料。
此外,此範例使用 Python::detach 包裹透過 rayon 啟動 OS 執行緒的程式碼。若未使用 detach,rayon 工作執行緒會在取得 GIL 時阻塞,而擁有 GIL 的執行緒會無限等待 rayon 執行緒結果。呼叫 detach 可讓收集工作執行緒結果的執行緒釋放 GIL。只要會啟動工作執行緒,就應呼叫 detach,尤其在工作執行緒需要取得 GIL 的情況下,以避免死結。