11. Programmation Asynchrone (Tokio)

L’async en Rust permet de gérer des milliers de connexions simultanées avec un seul thread. Tokio est le runtime standard : scheduler, I/O réseau, timers, canaux asynchrones. Utile pour la communication entre workers dans un système distribué.


11.1 async/await — Les Bases

use tokio::net::TcpStream;
 
// Une fonction async retourne un Future
// Elle ne fait RIEN tant qu'on ne l'a pas awaitée
async fn read_gradient(addr: &str) -> Result<Vec<f64>, std::io::Error> {
    let mut stream = TcpStream::connect(addr).await?;
    // ... lire depuis le stream
    Ok(vec![])
}
 
// Appel
async fn main() {
    let grad = read_gradient("127.0.0.1:8080").await.unwrap();
}

Runtime Tokio

#[tokio::main]  // macro qui crée le runtime
async fn main() {
    // Ici on est dans un runtime multi-thread
    let result = compute_async().await;
}
 
// Équivalent manuel :
fn main() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        println!("dans le runtime");
    });
}

11.2 Tâches Concurrentes

tokio::spawn — lancer des tâches en parallèle

use tokio::task;
 
async fn aggregate_worker(id: usize, grads: Vec<f64>) -> f64 {
    // Simulation : calcul lent
    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
    grads.iter().sum()
}
 
#[tokio::main]
async fn main() {
    let mut handles = vec![];
 
    for i in 0..10 {
        let grads = vec![i as f64; 1000];
        let handle = tokio::spawn(async move {
            aggregate_worker(i, grads).await
        });
        handles.push(handle);
    }
 
    // Attendre toutes les tâches
    for handle in handles {
        let result = handle.await.unwrap();
        println!("résultat: {result}");
    }
}

JoinSet — gérer un groupe de tâches

use tokio::task::JoinSet;
 
let mut set = JoinSet::new();
 
for i in 0..10 {
    set.spawn(async move { compute(i) });
}
 
while let Some(res) = set.join_next().await {
    match res {
        Ok(val) => println!("succès: {val}"),
        Err(e) => println!("échec: {e}"),
    }
}

11.3 Canaux Asynchrones

use tokio::sync::mpsc;
 
#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<Vec<f64>>(32);  // buffer de 32
 
    // Worker : envoie les gradients
    let worker = tokio::spawn(async move {
        for i in 0..10 {
            let grad = vec![i as f64; 100];
            tx.send(grad).await.unwrap();
            tokio::time::sleep(Duration::from_millis(10)).await;
        }
    });
 
    // Serveur : reçoit et agrège
    let server = tokio::spawn(async move {
        let mut all_grads = vec![];
        while let Some(grad) = rx.recv().await {
            all_grads.push(grad);
            if all_grads.len() == 10 {
                println!("prêt pour agrégation!");
                break;
            }
        }
    });
 
    worker.await.unwrap();
    server.await.unwrap();
}

Broadcast (un-émetteur, plusieurs récepteurs)

use tokio::sync::broadcast;
 
let (tx, _) = broadcast::channel::<Vec<f64>>(16);
 
// Chaque worker reçoit le même gradient agrégé
for id in 0..5 {
    let mut rx = tx.subscribe();
    tokio::spawn(async move {
        while let Ok(agg_grad) = rx.recv().await {
            println!("worker {id} a reçu le gradient agrégé");
        }
    });
}

11.4 Communication Réseau (TCP)

// Serveur asynchrone
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
 
    loop {
        let (mut socket, addr) = listener.accept().await?;
        println!("nouveau worker: {addr}");
 
        tokio::spawn(async move {
            let mut buf = vec![0u8; 1024];
            loop {
                let n = socket.read(&mut buf).await.unwrap();
                if n == 0 { break; }  // connexion fermée
                // Traiter les données reçues
                socket.write_all(b"ACK").await.unwrap();
            }
        });
    }
}

11.5 Timeouts

use tokio::time::{timeout, Duration};
 
async fn slow_worker() -> Vec<f64> {
    tokio::time::sleep(Duration::from_secs(10)).await;
    vec![1.0, 2.0]
}
 
#[tokio::main]
async fn main() {
    // Timeout de 1 seconde
    match timeout(Duration::from_secs(1), slow_worker()).await {
        Ok(grads) => println!("reçu: {grads:?}"),
        Err(_) => println!("worker trop lent, timeout!"),
    }
}

11.6 Exemple : Agrégateur Distribué

use tokio::sync::mpsc;
use tokio::time::{timeout, Duration};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
 
struct DistributedAggregator {
    n_workers: usize,
    f: usize,
    grads: Arc<Mutex<HashMap<usize, Vec<f64>>>>,
    tx: mpsc::Sender<(usize, Vec<f64>)>,
}
 
impl DistributedAggregator {
    fn new(n_workers: usize, f: usize) -> (Self, mpsc::Receiver<(usize, Vec<f64>)>) {
        let (tx, rx) = mpsc::channel(64);
        let agg = Self {
            n_workers,
            f,
            grads: Arc::new(Mutex::new(HashMap::new())),
            tx,
        };
        (agg, rx)
    }
 
    async fn run(mut rx: mpsc::Receiver<(usize, Vec<f64>)>) {
        let grads = Arc::clone(&rx.grads); // ⛔ non, on utilise l'agrégateur
        'outer: loop {
            let batch_start = std::time::Instant::now();
            let mut batch = Vec::new();
 
            // Collecter n_workers - f gradients ou timeout
            while batch.len() < rx.n_workers - rx.f {
                match timeout(Duration::from_millis(500), rx.recv()).await {
                    Ok(Some((id, grad))) => {
                        batch.push((id, grad));
                    }
                    Ok(None) => break 'outer,  // canal fermé
                    Err(_) => break,  // timeout : on agrège avec ce qu'on a
                }
            }
 
            // Agrégation
            let result = median(&batch.iter().map(|(_, g)| g).cloned().collect::<Vec<_>>());
            println!("batch aggrégé en {:?}", batch_start.elapsed());
        }
    }
 
    async fn submit_gradient(&self, worker_id: usize, grad: Vec<f64>) {
        self.tx.send((worker_id, grad)).await.unwrap();
    }
}

11.7 Résumé

ConceptUsageAnalogue synchrone
async fnDéclare une fonction asynchronefn
.awaitAttend un Future sans bloquer le threadblock_on
tokio::spawnLance une tâche concurrentethread::spawn
mpscCanal multiple-producteur, unique-consommateursync::mpsc
broadcastUn émetteur, N récepteurs
TcpListenerServeur TCP asynchronestd::net::TcpListener
timeoutLimite temporelle sur une opération
JoinSetGroupe de tâchesJoinHandle

🔗 Voir aussi