Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

平行化

歷史上,CPython 受全域直譯器鎖(GIL)限制,一次只允許單一執行緒驅動 Python 直譯器。這使得 Python 的執行緒對CPU-bound 任務不太合適,並常迫使開發者接受多行程的額外負擔。

Rust 很適合多執行緒程式碼,而像 rayon 這樣的函式庫能讓你以最小成本運用安全的平行化。Python::detach 方法可讓 Python 直譯器在 Rust 工作進行時處理其他工作。

若要在應用程式中啟用完整平行化,也可考慮使用自 Python 3.14 起支援的自由執行緒 Python

Python GIL 下的平行化

我們來看看 word-count 範例,其中的 search 函式使用 rayon 軟體箱以平行方式計算字數。

#![allow(dead_code)]
use pyo3::prelude::*;

// 這些特徵讓我們可以使用 `par_lines` 和 `map`。
use rayon::str::ParallelString;
use rayon::iter::ParallelIterator;

/// 計算某行中 needle 出現次數,不區分大小寫
fn count_line(line: &str, needle: &str) -> usize {
    let mut total = 0;
    for word in line.split(' ') {
        if word == needle {
            total += 1;
        }
    }
    total
}

#[pyfunction]
fn search(contents: &str, needle: &str) -> usize {
    contents
        .par_lines()
        .map(|line| count_line(line, needle))
        .sum()
}

假設你有一個耗時的 Rust 函式,想要平行執行多次。以下以字數統計的序列版作為範例:

#![allow(dead_code)]
fn count_line(line: &str, needle: &str) -> usize {
    let mut total = 0;
    for word in line.split(' ') {
        if word == needle {
            total += 1;
        }
    }
    total
}

fn search_sequential(contents: &str, needle: &str) -> usize {
    contents.lines().map(|line| count_line(line, needle)).sum()
}

要讓此函式平行執行,可以使用 Python::detach 暫時釋放 GIL,讓其他 Python 執行緒得以運作。接著我們提供一個暴露給 Python 執行環境的函式,將 search_sequential 放在傳給 Python::detach 的閉包中,以實現真正的平行化:

#![allow(dead_code)]
use pyo3::prelude::*;

fn count_line(line: &str, needle: &str) -> usize {
    let mut total = 0;
    for word in line.split(' ') {
        if word == needle {
            total += 1;
        }
    }
    total
}

fn search_sequential(contents: &str, needle: &str) -> usize {
   contents.lines().map(|line| count_line(line, needle)).sum()
}
#[pyfunction]
fn search_sequential_detached(py: Python<'_>, contents: &str, needle: &str) -> usize {
    py.detach(|| search_sequential(contents, needle))
}

現在 Python 執行緒可使用多個 CPU 核心,解決了 Python 多執行緒通常只適合 I/O-bound 任務的限制:

from concurrent.futures import ThreadPoolExecutor
from word_count import search_sequential_detached

executor = ThreadPoolExecutor(max_workers=2)

future_1 = executor.submit(
    word_count.search_sequential_detached, contents, needle
)
future_2 = executor.submit(
    word_count.search_sequential_detached, contents, needle
)
result_1 = future_1.result()
result_2 = future_2.result()

基準測試

讓我們對 word-count 範例做基準測試,確認 PyO3 確實解鎖了平行化。

我們使用 pytest-benchmark 來測試四個字數統計函式:

  1. 純 Python 版本
  2. Rust 平行版本
  3. Rust 序列版本
  4. Rust 序列版本在兩個 Python 執行緒中執行兩次

The benchmark script can be found in the PyO3 GitHub repository, and we can run nox in the word-count folder to benchmark these functions.

雖然基準測試結果會因機器而異,但相對結果應與下圖(2020 年中)相近:

-------------------------------------------------------------------------------------------------- benchmark: 4 tests -------------------------------------------------------------------------------------------------
Name (time in ms)                                          Min                Max               Mean            StdDev             Median               IQR            Outliers       OPS            Rounds  Iterations
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_word_count_rust_parallel                           1.7315 (1.0)       4.6495 (1.0)       1.9972 (1.0)      0.4299 (1.0)       1.8142 (1.0)      0.2049 (1.0)         40;46  500.6943 (1.0)         375           1
test_word_count_rust_sequential                         7.3348 (4.24)     10.3556 (2.23)      8.0035 (4.01)     0.7785 (1.81)      7.5597 (4.17)     0.8641 (4.22)         26;5  124.9457 (0.25)        121           1
test_word_count_rust_sequential_twice_with_threads      7.9839 (4.61)     10.3065 (2.22)      8.4511 (4.23)     0.4709 (1.10)      8.2457 (4.55)     0.3927 (1.92)        17;17  118.3274 (0.24)        114           1
test_word_count_python_sequential                      27.3985 (15.82)    45.4527 (9.78)     28.9604 (14.50)    4.1449 (9.64)     27.5781 (15.20)    0.4638 (2.26)          3;5   34.5299 (0.07)         35           1
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

可以看到 Python 執行緒版本沒有比 Rust 序列版本慢太多,這代表相較於單一 CPU 核心的執行,速度翻倍。

在 Rust 執行緒間共享 Python 物件

在上述範例中,我們為底層 Rust 函式建立了 Python 介面,並利用 Python 的 threading 模組平行執行該函式。也可以在 Rust 中啟動執行緒,取得 GIL 並操作 Python 物件。不過在這些情況下必須小心,避免寫出會與 GIL 死結的程式碼。

  • 注意:此範例用來示範如何釋放並重新取得 GIL 以避免死結。除非啟動的執行緒之後會釋放 GIL,或你使用的是 CPython 的自由執行緒建置,否則用 rayon 平行化會在整個執行期間取得並持有 GIL 的程式碼,並不會帶來多執行緒的速度提升。

在下方範例中,我們共享一個使用 pyclass 巨集定義的使用者 ID 物件 Vec,並啟動執行緒,透過 rayon 的平行迭代器以判斷條件將資料集合處理成 Vec 的布林值:

use pyo3::prelude::*;

// 這些特徵讓我們可以使用 int_par_iter 和 map
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};

#[pyclass]
struct UserID {
    id: i64,
}

let allowed_ids: Vec<bool> = Python::attach(|outer_py| {
    let instances: Vec<Py<UserID>> = (0..10).map(|x| Py::new(outer_py, UserID { id: x }).unwrap()).collect();
    outer_py.detach(|| {
        instances.par_iter().map(|instance| {
            Python::attach(|inner_py| {
                instance.borrow(inner_py).id > 5
            })
        }).collect()
    })
});
assert!(allowed_ids.into_iter().filter(|b| *b).count() == 4);

請注意這裡同時有 outer_pyinner_py 兩個 Python token。在執行緒之間共享 Python token 是不允許的,執行緒必須各自附加到直譯器,才能存取 Python 物件包裹的資料。

此外,此範例使用 Python::detach 包裹透過 rayon 啟動 OS 執行緒的程式碼。若未使用 detachrayon 工作執行緒會在取得 GIL 時阻塞,而擁有 GIL 的執行緒會無限等待 rayon 執行緒結果。呼叫 detach 可讓收集工作執行緒結果的執行緒釋放 GIL。只要會啟動工作執行緒,就應呼叫 detach,尤其在工作執行緒需要取得 GIL 的情況下,以避免死結。