並行
並行會在程式的不同部分可能於不同時間或不同順序執行時發生。在嵌入式情境中,這包括:
- 中斷處理器:在相關中斷發生時執行,
- 各種形式的多執行緒:微處理器會定期在程式的不同部分之間切換,
- 以及在某些系統中,多核心微處理器:每個核心都能在同一時間獨立執行程式的不同部分。
由於許多嵌入式程式需要處理中斷,並行遲早會出現,也正是許多細微且難解的錯誤出現之處。幸運的是,Rust 提供多種抽象與安全保證,協助我們寫出正確的程式碼。
無並行
嵌入式程式最簡單的並行就是沒有並行:軟體由單一主迴圈構成,持續運行,且完全沒有中斷。有時這非常適合手邊的問題!通常你的迴圈會讀取一些輸入、做一些處理,再輸出一些結果。
#[entry]
fn main() {
let peripherals = setup_peripherals();
loop {
let inputs = read_inputs(&peripherals);
let outputs = process(inputs);
write_outputs(&peripherals, outputs);
}
}
由於沒有並行,就不需要擔心程式各部分之間的資料共享或周邊存取同步。如果你可以採用這麼簡單的方法,這會是很棒的解決方案。
全域可變資料
與非嵌入式 Rust 不同,我們通常無法奢望建立堆積配置並把對該資料的參考傳給新建執行緒。相反地,我們的中斷處理器可能在任何時候被呼叫,必須知道如何存取我們正在使用的共享記憶體。在最低層,這代表我們必須擁有_靜態配置_的可變記憶體,讓中斷處理器與主程式碼都能參考。
在 Rust 中,這類 static mut 變數的讀寫永遠是不安全的,因為若不特別小心,你可能觸發資料競爭:對變數的存取途中被同樣會存取該變數的中斷打斷。
為了示範此行為如何造成程式中的細微錯誤,想像一個嵌入式程式會在每一秒內計算某個輸入訊號的上升沿(頻率計數器):
static mut COUNTER: u32 = 0;
#[entry]
fn main() -> ! {
set_timer_1hz();
let mut last_state = false;
loop {
let state = read_signal_level();
if state && !last_state {
// 危險 - 並不安全!可能造成資料競爭。
unsafe { COUNTER += 1 };
}
last_state = state;
}
}
#[interrupt]
fn timer() {
unsafe { COUNTER = 0; }
}
每一秒,計時器中斷會把計數器設回 0。同時,主迴圈會持續量測訊號,並在看到低轉高時遞增計數器。我們必須用 unsafe 存取 COUNTER,因為它是 static mut,這代表我們向編譯器保證不會造成未定義行為。你能找出資料競爭嗎?COUNTER 的遞增_不_保證是原子操作——事實上,在多數嵌入式平台上,它會被拆成載入、遞增、儲存三步驟。如果中斷在載入之後、儲存之前觸發,回到中斷後會忽略重設為 0 的結果——因此該期間的轉換次數會被計兩次。
臨界區
那麼,我們能如何處理資料競爭?一個簡單的作法是使用_臨界區_,也就是停用中斷的情境。把 main 中對 COUNTER 的存取包在臨界區內,我們就能確定在完成遞增COUNTER 之前計時器中斷不會觸發:
static mut COUNTER: u32 = 0;
#[entry]
fn main() -> ! {
set_timer_1hz();
let mut last_state = false;
loop {
let state = read_signal_level();
if state && !last_state {
// 新的臨界區確保對 COUNTER 的同步存取
cortex_m::interrupt::free(|_| {
unsafe { COUNTER += 1 };
});
}
last_state = state;
}
}
#[interrupt]
fn timer() {
unsafe { COUNTER = 0; }
}
在此例中,我們使用 cortex_m::interrupt::free,但其他平台也會有類似機制可在臨界區中執行程式碼。這等同於停用中斷、執行某段程式碼,然後再重新啟用中斷。
注意,我們不需要在計時器中斷內加入臨界區,原因有二:
- 將 0 寫入
COUNTER不會受競爭影響,因為我們沒有讀取它 - 反正也不會被
main執行緒打斷
若 COUNTER 由多個可能互相_搶占_的中斷處理器共享,則每一個也可能都需要臨界區。
這解決了當下問題,但我們仍需撰寫大量 unsafe 程式碼並小心推理,而且可能會不必要地使用臨界區。由於每個臨界區會暫停中斷處理,因此會帶來額外的程式碼大小成本,以及更高的中斷延遲與抖動(中斷可能需要更久才被處理,且等待時間更不穩定)。這是否是問題取決於你的系統,但一般而言我們希望避免它。
值得注意的是,臨界區雖然保證不會有中斷觸發,但在多核心系統上並不提供互斥保證!即使沒有中斷,其他核心也可能同時存取相同記憶體。如果你使用多核心,就需要更強的同步原語。
原子存取
在某些平台上,有特殊的原子指令,可為讀-改-寫操作提供保證。以 Cortex-M 為例:thumbv6(Cortex-M0、Cortex-M0+)僅提供原子載入與儲存指令,而 thumbv7(Cortex-M3 及以上)提供完整的 Compare and Swap(CAS)指令。這些 CAS 指令可作為粗暴停用所有中斷的替代方案:我們可嘗試遞增,大多時候會成功;若被中斷,它會自動重試整個遞增操作。這些原子操作即使跨多核心也安全。
use core::sync::atomic::{AtomicUsize, Ordering};
static COUNTER: AtomicUsize = AtomicUsize::new(0);
#[entry]
fn main() -> ! {
set_timer_1hz();
let mut last_state = false;
loop {
let state = read_signal_level();
if state && !last_state {
// 使用 `fetch_add` 以原子方式將 COUNTER 加 1
COUNTER.fetch_add(1, Ordering::Relaxed);
}
last_state = state;
}
}
#[interrupt]
fn timer() {
// 使用 `store` 直接將 0 寫入 COUNTER
COUNTER.store(0, Ordering::Relaxed)
}
此時 COUNTER 是安全的 static 變數。多虧 AtomicUsize 型別,COUNTER可在不中斷的情況下由中斷處理器與主執行緒安全地修改。若可行,這是更好的解決方案——但你的平台可能不支援。
關於 Ordering:它會影響編譯器與硬體可能重排序指令的方式,並且會影響快取可見性。假設目標是單核心平台,在此情況下 Relaxed 足夠且最有效率。更嚴格的排序會讓編譯器在原子操作周圍插入記憶體屏障;視你使用原子的目的而定,可能需要也可能不需要。原子模型的精確細節相當複雜,最好參考其他資料。
更多關於原子操作與排序的細節,請參考 nomicon。
抽象、Send 與 Sync
上述解法都不算理想。它們需要 unsafe 區塊,必須非常仔細地檢查,且不夠易用。我們在 Rust 中一定能做得更好!
我們可以把計數器抽象成安全的介面,讓程式其他地方都能安全使用。此例中我們會使用臨界區計數器,但用原子操作也能做出很相似的作法。
use core::cell::UnsafeCell;
use cortex_m::interrupt;
// 我們的計數器只是對 UnsafeCell<u32> 的包裝,這是 Rust 內部可變性的核心
// 透過內部可變性,我們可以讓 COUNTER 成為 `static` 而不是 `static mut`,但
// 仍可變更其計數值。
struct CSCounter(UnsafeCell<u32>);
const CS_COUNTER_INIT: CSCounter = CSCounter(UnsafeCell::new(0));
impl CSCounter {
pub fn reset(&self, _cs: &interrupt::CriticalSection) {
// 透過要求傳入 CriticalSection,我們知道必須
// 在臨界區內操作,因此可以放心使用這個 unsafe 區塊(呼叫
// UnsafeCell::get 所必需)。
unsafe { *self.0.get() = 0 };
}
pub fn increment(&self, _cs: &interrupt::CriticalSection) {
unsafe { *self.0.get() += 1 };
}
}
// 允許 static CSCounter 所需,詳見下方說明。
unsafe impl Sync for CSCounter {}
// COUNTER 因使用內部可變性而不再是 `mut`;
// 因此存取它也不再需要 unsafe 區塊。
static COUNTER: CSCounter = CS_COUNTER_INIT;
#[entry]
fn main() -> ! {
set_timer_1hz();
let mut last_state = false;
loop {
let state = read_signal_level();
if state && !last_state {
// 這裡沒有 unsafe!
interrupt::free(|cs| COUNTER.increment(cs));
}
last_state = state;
}
}
#[interrupt]
fn timer() {
// 我們仍需進入臨界區才能取得有效的 cs token,
// 即便我們知道沒有其他中斷能搶占這個中斷。
interrupt::free(|cs| COUNTER.reset(cs));
// 如果真的想避免開銷,我們可以用 unsafe 產生假的 CriticalSection:
// let cs = unsafe { interrupt::CriticalSection::new() };
}
我們把 unsafe 程式碼移到精心設計的抽象內部,現在應用程式碼不再含有任何 unsafe 區塊。
This design requires that the application pass a CriticalSection token in: these tokens are only safely generated by interrupt::free, so by requiring one be passed in, we ensure we are operating inside a critical section, without having to actually do the lock ourselves. This guarantee is provided statically by the compiler: there won’t be any runtime overhead associated with cs. If we had multiple counters, they could all be given the same cs, without requiring multiple nested critical sections.
This also brings up an important topic for concurrency in Rust: the Send and Sync traits. To summarise the Rust book, a type is Send when it can safely be moved to another thread, while it is Sync when it can be safely shared between multiple threads. In an embedded context, we consider interrupts to be executing in a separate thread to the application code, so variables accessed by both an interrupt and the main code must be Sync.
For most types in Rust, both of these traits are automatically derived for you by the compiler. However, because CSCounter contains an UnsafeCell, it is not Sync, and therefore we could not make a static CSCounter: static variables must be Sync, since they can be accessed by multiple threads.
To tell the compiler we have taken care that the CSCounter is in fact safe to share between threads, we implement the Sync trait explicitly. As with the previous use of critical sections, this is only safe on single-core platforms: with multiple cores, you would need to go to greater lengths to ensure safety.
Mutexes
We’ve created a useful abstraction specific to our counter problem, but there are many common abstractions used for concurrency.
One such synchronisation primitive is a mutex, short for mutual exclusion. These constructs ensure exclusive access to a variable, such as our counter. A thread can attempt to lock (or acquire) the mutex, and either succeeds immediately, or blocks waiting for the lock to be acquired, or returns an error that the mutex could not be locked. While that thread holds the lock, it is granted access to the protected data. When the thread is done, it unlocks (or releases) the mutex, allowing another thread to lock it. In Rust, we would usually implement the unlock using the Drop trait to ensure it is always released when the mutex goes out of scope.
Using a mutex with interrupt handlers can be tricky: it is not normally acceptable for the interrupt handler to block, and it would be especially disastrous for it to block waiting for the main thread to release a lock, since we would then deadlock (the main thread will never release the lock because execution stays in the interrupt handler). Deadlocking is not considered unsafe: it is possible even in safe Rust.
To avoid this behaviour entirely, we could implement a mutex which requires a critical section to lock, just like our counter example. So long as the critical section must last as long as the lock, we can be sure we have exclusive access to the wrapped variable without even needing to track the lock/unlock state of the mutex.
This is in fact done for us in the cortex_m crate! We could have written our counter using it:
use core::cell::Cell;
use cortex_m::interrupt::Mutex;
static COUNTER: Mutex<Cell<u32>> = Mutex::new(Cell::new(0));
#[entry]
fn main() -> ! {
set_timer_1hz();
let mut last_state = false;
loop {
let state = read_signal_level();
if state && !last_state {
interrupt::free(|cs|
COUNTER.borrow(cs).set(COUNTER.borrow(cs).get() + 1));
}
last_state = state;
}
}
#[interrupt]
fn timer() {
// We still need to enter a critical section here to satisfy the Mutex.
interrupt::free(|cs| COUNTER.borrow(cs).set(0));
}
We’re now using Cell, which along with its sibling RefCell is used to provide safe interior mutability. We’ve already seen UnsafeCell which is the bottom layer of interior mutability in Rust: it allows you to obtain multiple mutable references to its value, but only with unsafe code. A Cell is like an UnsafeCell but it provides a safe interface: it only permits taking a copy of the current value or replacing it, not taking a reference, and since it is not Sync, it cannot be shared between threads. These constraints mean it’s safe to use, but we couldn’t use it directly in a static variable as a static must be Sync.
So why does the example above work? The Mutex<T> implements Sync for any T which is Send — such as a Cell. It can do this safely because it only gives access to its contents during a critical section. We’re therefore able to get a safe counter with no unsafe code at all!
This is great for simple types like the u32 of our counter, but what about more complex types which are not Copy? An extremely common example in an embedded context is a peripheral struct, which generally is not Copy. For that, we can turn to RefCell.
Sharing Peripherals
Device crates generated using svd2rust and similar abstractions provide safe access to peripherals by enforcing that only one instance of the peripheral struct can exist at a time. This ensures safety, but makes it difficult to access a peripheral from both the main thread and an interrupt handler.
To safely share peripheral access, we can use the Mutex we saw before. We’ll also need to use RefCell, which uses a runtime check to ensure only one reference to a peripheral is given out at a time. This has more overhead than the plain Cell, but since we are giving out references rather than copies, we must be sure only one exists at a time.
Finally, we’ll also have to account for somehow moving the peripheral into the shared variable after it has been initialised in the main code. To do this we can use the Option type, initialised to None and later set to the instance of the peripheral.
use core::cell::RefCell;
use cortex_m::interrupt::{self, Mutex};
use stm32f4::stm32f405;
static MY_GPIO: Mutex<RefCell<Option<stm32f405::GPIOA>>> =
Mutex::new(RefCell::new(None));
#[entry]
fn main() -> ! {
// Obtain the peripheral singletons and configure it.
// This example is from an svd2rust-generated crate, but
// most embedded device crates will be similar.
let dp = stm32f405::Peripherals::take().unwrap();
let gpioa = &dp.GPIOA;
// Some sort of configuration function.
// Assume it sets PA0 to an input and PA1 to an output.
configure_gpio(gpioa);
// Store the GPIOA in the mutex, moving it.
interrupt::free(|cs| MY_GPIO.borrow(cs).replace(Some(dp.GPIOA)));
// We can no longer use `gpioa` or `dp.GPIOA`, and instead have to
// access it via the mutex.
// Be careful to enable the interrupt only after setting MY_GPIO:
// otherwise the interrupt might fire while it still contains None,
// and as-written (with `unwrap()`), it would panic.
set_timer_1hz();
let mut last_state = false;
loop {
// We'll now read state as a digital input, via the mutex
let state = interrupt::free(|cs| {
let gpioa = MY_GPIO.borrow(cs).borrow();
gpioa.as_ref().unwrap().idr.read().idr0().bit_is_set()
});
if state && !last_state {
// Set PA1 high if we've seen a rising edge on PA0.
interrupt::free(|cs| {
let gpioa = MY_GPIO.borrow(cs).borrow();
gpioa.as_ref().unwrap().odr.modify(|_, w| w.odr1().set_bit());
});
}
last_state = state;
}
}
#[interrupt]
fn timer() {
// This time in the interrupt we'll just clear PA0.
interrupt::free(|cs| {
// We can use `unwrap()` because we know the interrupt wasn't enabled
// until after MY_GPIO was set; otherwise we should handle the potential
// for a None value.
let gpioa = MY_GPIO.borrow(cs).borrow();
gpioa.as_ref().unwrap().odr.modify(|_, w| w.odr1().clear_bit());
});
}
That’s quite a lot to take in, so let’s break down the important lines.
static MY_GPIO: Mutex<RefCell<Option<stm32f405::GPIOA>>> =
Mutex::new(RefCell::new(None));
Our shared variable is now a Mutex around a RefCell which contains an Option. The Mutex ensures we only have access during a critical section, and therefore makes the variable Sync, even though a plain RefCell would not be Sync. The RefCell gives us interior mutability with references, which we’ll need to use our GPIOA. The Option lets us initialise this variable to something empty, and only later actually move the variable in. We cannot access the peripheral singleton statically, only at runtime, so this is required.
interrupt::free(|cs| MY_GPIO.borrow(cs).replace(Some(dp.GPIOA)));
Inside a critical section we can call borrow() on the mutex, which gives us a reference to the RefCell. We then call replace() to move our new value into the RefCell.
interrupt::free(|cs| {
let gpioa = MY_GPIO.borrow(cs).borrow();
gpioa.as_ref().unwrap().odr.modify(|_, w| w.odr1().set_bit());
});
Finally, we use MY_GPIO in a safe and concurrent fashion. The critical section prevents the interrupt firing as usual, and lets us borrow the mutex. The RefCell then gives us an &Option<GPIOA>, and tracks how long it remains borrowed - once that reference goes out of scope, the RefCell will be updated to indicate it is no longer borrowed.
Since we can’t move the GPIOA out of the &Option, we need to convert it to an &Option<&GPIOA> with as_ref(), which we can finally unwrap() to obtain the &GPIOA which lets us modify the peripheral.
If we need a mutable reference to a shared resource, then borrow_mut and deref_mut should be used instead. The following code shows an example using the TIM2 timer.
use core::cell::RefCell;
use core::ops::DerefMut;
use cortex_m::interrupt::{self, Mutex};
use cortex_m::asm::wfi;
use stm32f4::stm32f405;
static G_TIM: Mutex<RefCell<Option<Timer<stm32::TIM2>>>> =
Mutex::new(RefCell::new(None));
#[entry]
fn main() -> ! {
let mut cp = cm::Peripherals::take().unwrap();
let dp = stm32f405::Peripherals::take().unwrap();
// Some sort of timer configuration function.
// Assume it configures the TIM2 timer, its NVIC interrupt,
// and finally starts the timer.
let tim = configure_timer_interrupt(&mut cp, dp);
interrupt::free(|cs| {
G_TIM.borrow(cs).replace(Some(tim));
});
loop {
wfi();
}
}
#[interrupt]
fn timer() {
interrupt::free(|cs| {
if let Some(ref mut tim)) = G_TIM.borrow(cs).borrow_mut().deref_mut() {
tim.start(1.hz());
}
});
}
Whew! This is safe, but it is also a little unwieldy. Is there anything else we can do?
RTIC
One alternative is the RTIC framework, short for Real Time Interrupt-driven Concurrency. It enforces static priorities and tracks accesses to static mut variables (“resources”) to statically ensure that shared resources are always accessed safely, without requiring the overhead of always entering critical sections and using reference counting (as in RefCell). This has a number of advantages such as guaranteeing no deadlocks and giving extremely low time and memory overhead.
The framework also includes other features like message passing, which reduces the need for explicit shared state, and the ability to schedule tasks to run at a given time, which can be used to implement periodic tasks. Check out the documentation for more information!
Real Time Operating Systems
Another common model for embedded concurrency is the real-time operating system (RTOS). While currently less well explored in Rust, they are widely used in traditional embedded development. Open source examples include FreeRTOS and ChibiOS. These RTOSs provide support for running multiple application threads which the CPU swaps between, either when the threads yield control (called cooperative multitasking) or based on a regular timer or interrupts (preemptive multitasking). The RTOS typically provide mutexes and other synchronisation primitives, and often interoperate with hardware features such as DMA engines.
At the time of writing, there are not many Rust RTOS examples to point to, but it’s an interesting area so watch this space!
Multiple Cores
It is becoming more common to have two or more cores in embedded processors, which adds an extra layer of complexity to concurrency. All the examples using a critical section (including the cortex_m::interrupt::Mutex) assume the only other execution thread is the interrupt thread, but on a multi-core system that’s no longer true. Instead, we’ll need synchronisation primitives designed for multiple cores (also called SMP, for symmetric multi-processing).
These typically use the atomic instructions we saw earlier, since the processing system will ensure that atomicity is maintained over all cores.
Covering these topics in detail is currently beyond the scope of this book, but the general patterns are the same as for the single-core case.