Concurrency
Rust has a few different ways to do concurrency:
- OS Threads
- Channels
- Mutexes
Rust used to have green threads, but they were removed in favor of OS threads. The reason for this is the real cost of green threads. Green threads require a runtime to manage them, which means that the runtime has to be shipped with the application. This is not a problem for languages like Go, but it is a problem for Rust, which is often used for low-level applications. Hence, since rfc 230 green threads were removed.
OS Threads
Rust threads work similarly to other languages, they’re created with std::thread::spawn
:
use std::thread;
use std::time::Duration;
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("Count in thread: {i}!");
thread::sleep(Duration::from_millis(5));
}
});
for i in 1..5 {
println!("Main thread: {i}");
thread::sleep(Duration::from_millis(5));
}
}
- Threads are all daemon threads, the main thread does not wait for them.
- Threads
panic
are independent from each other, apanic
in a thread will not affect the main thread.- Panics can carry a payload, which can be unpacked with downcast_ref.
Normally, threads can’t share information from the surrounding scope, this would fail:
use std::thread;
fn foo() {
let s = String::from("Hello");
thread::spawn(|| {
println!("Length: {}", s.len());
});
}
fn main() {
foo();
}
// Compiling playground v0.0.1 (/playground)
// error[E0373]: closure may outlive the current function, but it borrows `s`, which is owned by the current function
// --> src/main.rs:5:19
// |
// 5 | thread::spawn(|| {
// | ^^ may outlive borrowed value `s`
// 6 | println!("Length: {}", s.len());
// | - `s` is borrowed here
// |
// note: function requires argument type to outlive `'static`
// --> src/main.rs:5:5
// |
// 5 | / thread::spawn(|| {
// 6 | | println!("Length: {}", s.len());
// 7 | | });
// | |______^
// help: to force the closure to take ownership of `s` (and any other referenced variables), use the `move` keyword
// |
// 5 | thread::spawn(move || {
// | ++++
//
// For more information about this error, try `rustc --explain E0373`.
// error: could not compile `playground` (bin "playground") due to previous error
However, we can use scoped threads
to share information from the surrounding scope:
use std::thread;
// Normal Rust borrowing rules apply: you can either borrow mutably by one thread, or immutably by any number of threads.
fn main() {
let s = String::from("Hello");
thread::scope(|scope| {
scope.spawn(|| {
println!("Length: {}", s.len());
});
});
}
Channels
Channels have two ends, a Sender
and a Receiver
. They can be used to send messages between threads:
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
tx.send(10).unwrap();
tx.send(20).unwrap();
println!("Received: {:?}", rx.recv());
println!("Received: {:?}", rx.recv());
let tx2 = tx.clone();
tx2.send(30).unwrap();
println!("Received: {:?}", rx.recv());
}
mpsc
stands formultiple producer, single consumer
. For that reason we can clone the Sender but not the Receiver.send()
andrecv()
return Result. If they return Err, it means the counterpart Sender or Receiver is dropped and the channel is closed.
Unbounded Channels
By default, channels are unbounded and asynchronus:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let thread_id = thread::current().id();
for i in 1..10 {
tx.send(format!("Message {i}")).unwrap();
println!("{thread_id:?}: sent Message {i}");
}
println!("{thread_id:?}: done");
});
thread::sleep(Duration::from_millis(100));
for msg in rx.iter() {
println!("Main: got {msg}");
}
}
Bounded Channels
Bounded channels have a limited capacity, if the capacity is reached the send()
call will block until there’s space in the channel:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::sync_channel(3);
thread::spawn(move || {
let thread_id = thread::current().id();
for i in 1..10 {
tx.send(format!("Message {i}")).unwrap();
println!("{thread_id:?}: sent Message {i}");
}
println!("{thread_id:?}: done");
});
thread::sleep(Duration::from_millis(100));
for msg in rx.iter() {
println!("Main: got {msg}");
}
}
- A bounded channel with a size of zero is called a “rendezvous channel”. Every send will block the current thread until another thread calls read.
Send and Sync
Rust has two traits to mark types as thread-safe: Send
and Sync
. If this traits are implemented for a type, it means that it’s safe to share
the resource between threads.
Send
means that it’s safe to send the typeT
to another thread:A type
T
isSend
if it is safe to move aT
value to another thread.Sync
means that it’s safe to share the reference&T
between threads.A type
T
isSync
if it is safe to accessT
from multiple threads (i.e.&T
isSend
).
Examples
Most of the basic types in Rust are Send
and Sync
:
i8, f32, bool, char, &str, …
(T1, T2), [T; N], &[T], struct { x: T }, …
String, Option<T>, Vec<T>, Box<T>, …
Arc<T>: Explicitly thread-safe via atomic reference count.
Mutex<T>: Explicitly thread-safe via internal locking.
AtomicBool, AtomicU8, …: Uses special atomic instructions.
The generic types are typically Send + Sync when the type parameters are Send + Sync.
There are also types that are Send but not Sync, or Sync but not Send:
Send + !Sync
These types can be moved to other threads, but they’re not thread-safe (i.e. cannot be accessed from multiple threads), tipically because of inner mutability:
mpsc::Sender<T>
mpsc::Receiver<T>
Cell<T>
RefCell<T>
!Send + Sync
These types can be shared between threads, but they can’t be moved to other threads:
MutexGuard<T: Sync>
: Uses OS level primitives which must be deallocated on the thread which created them.
!Send + !Sync
These types can’t be moved to other threads, and they can’t be shared between threads:
Rc<T>
: eachRc<T>
has a reference to anRcBox<T>
, which contains a non-atomic reference count.*const T
,*mut T
: Rust assumes raw pointers may have special concurrency considerations.
Shared State
Rust has a few types to share state between threads:
Arc<T>
: Atomic reference count, allows multiple threads to share a value.Mutex<T>
: Mutual exclusion, allows one thread to access a value at a time.
Arc
Arc is a thread-safe reference-counted pointer. It’s similar to Rc, but it uses atomic operations to ensure thread safety.
It gives read-only
access to the inner value, and it can be cloned and shared between threads:
use std::thread;
use std::sync::Arc;
fn main() {
let v = Arc::new(vec![10, 20, 30]);
let mut handles = Vec::new();
for _ in 1..5 {
let v = Arc::clone(&v);
handles.push(thread::spawn(move || {
let thread_id = thread::current().id();
println!("{thread_id:?}: {v:?}");
}));
}
handles.into_iter().for_each(|h| h.join().unwrap());
println!("v: {v:?}");
}
Arc::clone()
has the cost of atomic operations that get executed, but after that the use of the T is free.Arc<T>
implements Clone whether or notT
does. It implementsSend
andSync
if and only ifT
implements them both.
Mutex
Mutex is a thread-safe wrapper around a value. It allows one thread to access the value at a time.
use std::sync::Mutex;
fn main() {
let v = Mutex::new(vec![10, 20, 30]);
println!("v: {:?}", v.lock().unwrap());
{
let mut guard = v.lock().unwrap();
guard.push(40);
}
println!("v: {:?}", v.lock().unwrap());
}
- A read-write lock counterpart -
RwLock
. Mutex<T>
implementsSend
andSync
if and only ifT
implements them both.