11. Programmation Asynchrone (Tokio)
L’async en Rust permet de gérer des milliers de connexions simultanées avec un seul thread. Tokio est le runtime standard : scheduler, I/O réseau, timers, canaux asynchrones. Utile pour la communication entre workers dans un système distribué.
11.1 async/await — Les Bases
use tokio::net::TcpStream;
// Une fonction async retourne un Future
// Elle ne fait RIEN tant qu'on ne l'a pas awaitée
async fn read_gradient(addr: &str) -> Result<Vec<f64>, std::io::Error> {
let mut stream = TcpStream::connect(addr).await?;
// ... lire depuis le stream
Ok(vec![])
}
// Appel
async fn main() {
let grad = read_gradient("127.0.0.1:8080").await.unwrap();
}Runtime Tokio
#[tokio::main] // macro qui crée le runtime
async fn main() {
// Ici on est dans un runtime multi-thread
let result = compute_async().await;
}
// Équivalent manuel :
fn main() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("dans le runtime");
});
}11.2 Tâches Concurrentes
tokio::spawn — lancer des tâches en parallèle
use tokio::task;
async fn aggregate_worker(id: usize, grads: Vec<f64>) -> f64 {
// Simulation : calcul lent
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
grads.iter().sum()
}
#[tokio::main]
async fn main() {
let mut handles = vec![];
for i in 0..10 {
let grads = vec![i as f64; 1000];
let handle = tokio::spawn(async move {
aggregate_worker(i, grads).await
});
handles.push(handle);
}
// Attendre toutes les tâches
for handle in handles {
let result = handle.await.unwrap();
println!("résultat: {result}");
}
}JoinSet — gérer un groupe de tâches
use tokio::task::JoinSet;
let mut set = JoinSet::new();
for i in 0..10 {
set.spawn(async move { compute(i) });
}
while let Some(res) = set.join_next().await {
match res {
Ok(val) => println!("succès: {val}"),
Err(e) => println!("échec: {e}"),
}
}11.3 Canaux Asynchrones
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<Vec<f64>>(32); // buffer de 32
// Worker : envoie les gradients
let worker = tokio::spawn(async move {
for i in 0..10 {
let grad = vec![i as f64; 100];
tx.send(grad).await.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
}
});
// Serveur : reçoit et agrège
let server = tokio::spawn(async move {
let mut all_grads = vec![];
while let Some(grad) = rx.recv().await {
all_grads.push(grad);
if all_grads.len() == 10 {
println!("prêt pour agrégation!");
break;
}
}
});
worker.await.unwrap();
server.await.unwrap();
}Broadcast (un-émetteur, plusieurs récepteurs)
use tokio::sync::broadcast;
let (tx, _) = broadcast::channel::<Vec<f64>>(16);
// Chaque worker reçoit le même gradient agrégé
for id in 0..5 {
let mut rx = tx.subscribe();
tokio::spawn(async move {
while let Ok(agg_grad) = rx.recv().await {
println!("worker {id} a reçu le gradient agrégé");
}
});
}11.4 Communication Réseau (TCP)
// Serveur asynchrone
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (mut socket, addr) = listener.accept().await?;
println!("nouveau worker: {addr}");
tokio::spawn(async move {
let mut buf = vec![0u8; 1024];
loop {
let n = socket.read(&mut buf).await.unwrap();
if n == 0 { break; } // connexion fermée
// Traiter les données reçues
socket.write_all(b"ACK").await.unwrap();
}
});
}
}11.5 Timeouts
use tokio::time::{timeout, Duration};
async fn slow_worker() -> Vec<f64> {
tokio::time::sleep(Duration::from_secs(10)).await;
vec![1.0, 2.0]
}
#[tokio::main]
async fn main() {
// Timeout de 1 seconde
match timeout(Duration::from_secs(1), slow_worker()).await {
Ok(grads) => println!("reçu: {grads:?}"),
Err(_) => println!("worker trop lent, timeout!"),
}
}11.6 Exemple : Agrégateur Distribué
use tokio::sync::mpsc;
use tokio::time::{timeout, Duration};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
struct DistributedAggregator {
n_workers: usize,
f: usize,
grads: Arc<Mutex<HashMap<usize, Vec<f64>>>>,
tx: mpsc::Sender<(usize, Vec<f64>)>,
}
impl DistributedAggregator {
fn new(n_workers: usize, f: usize) -> (Self, mpsc::Receiver<(usize, Vec<f64>)>) {
let (tx, rx) = mpsc::channel(64);
let agg = Self {
n_workers,
f,
grads: Arc::new(Mutex::new(HashMap::new())),
tx,
};
(agg, rx)
}
async fn run(mut rx: mpsc::Receiver<(usize, Vec<f64>)>) {
let grads = Arc::clone(&rx.grads); // ⛔ non, on utilise l'agrégateur
'outer: loop {
let batch_start = std::time::Instant::now();
let mut batch = Vec::new();
// Collecter n_workers - f gradients ou timeout
while batch.len() < rx.n_workers - rx.f {
match timeout(Duration::from_millis(500), rx.recv()).await {
Ok(Some((id, grad))) => {
batch.push((id, grad));
}
Ok(None) => break 'outer, // canal fermé
Err(_) => break, // timeout : on agrège avec ce qu'on a
}
}
// Agrégation
let result = median(&batch.iter().map(|(_, g)| g).cloned().collect::<Vec<_>>());
println!("batch aggrégé en {:?}", batch_start.elapsed());
}
}
async fn submit_gradient(&self, worker_id: usize, grad: Vec<f64>) {
self.tx.send((worker_id, grad)).await.unwrap();
}
}11.7 Résumé
| Concept | Usage | Analogue synchrone |
|---|---|---|
async fn | Déclare une fonction asynchrone | fn |
.await | Attend un Future sans bloquer le thread | block_on |
tokio::spawn | Lance une tâche concurrente | thread::spawn |
mpsc | Canal multiple-producteur, unique-consommateur | sync::mpsc |
broadcast | Un émetteur, N récepteurs | — |
TcpListener | Serveur TCP asynchrone | std::net::TcpListener |
timeout | Limite temporelle sur une opération | — |
JoinSet | Groupe de tâches | JoinHandle |