20. Concurrence Avancée

threading, multiprocessing, concurrent.futures, GIL, ProcessPoolExecutor, shared memory, queue.Queue, synchronisation, subprocess.


20.1 Threads vs Processes vs Async

MécanismeParallélismeGILMémoireUsage
threadingConcurrence (I/O)Bloqué par le GILPartagéeI/O réseau, fichiers
multiprocessingParallélisme (CPU)Pas de GILIsolée (sauf shared mem)Calcul lourd
asyncioConcurrence (I/O)Un seul threadPartagéeI/O haute performance

20.2 threading

Création basique

import threading
 
def travail(nom):
    print(f"Thread {nom} démarre")
 
threads = []
for i in range(5):
    t = threading.Thread(target=travail, args=(i,))
    threads.append(t)
    t.start()
 
for t in threads:
    t.join()  # attendre la fin

Classe Thread personnalisée

class MonThread(threading.Thread):
    def __init__(self, nom):
        super().__init__(name=nom)
        self.résultat = None
 
    def run(self):
        self.résultat = sum(range(10_000_000))
 
t = MonThread("calc")
t.start()
t.join()
print(t.résultat)

Verrous (Lock)

from threading import Lock
 
compteur = 0
verrou = Lock()
 
def incrémenter():
    global compteur
    for _ in range(100_000):
        with verrou:
            compteur += 1
 
threads = [threading.Thread(target=incrémenter) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
 
print(compteur)  # 1_000_000 (correct grâce au verrou)

RLock — verrou réentrant

from threading import RLock
 
verrou = RLock()  # même thread peut acquérir plusieurs fois
 
def f():
    with verrou:
        g()
 
def g():
    with verrou:  # OK avec RLock, bloquerait avec Lock
        pass

Semaphore — nombre limité d’accès

from threading import Semaphore
 
sémaphore = Semaphore(3)  # max 3 threads simultanés
 
def accès_ressource_limitée():
    with sémaphore:
        ...

Event — signal entre threads

from threading import Event
 
prêt = Event()
 
def travailleur():
    print("En attente du signal")
    prêt.wait()  # bloque jusqu'au set()
    print("Go !")
 
t = threading.Thread(target=travailleur)
t.start()
import time; time.sleep(1)
prêt.set()

ThreadPoolExecutor

Voir section 20.4.

20.3 queue.Queue — file thread-safe

from queue import Queue
from threading import Thread
 
q = Queue()
 
def producteur():
    for i in range(10):
        q.put(i)
    q.put(None)  # signal de fin
 
def consommateur():
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Traitement: {item}")
        q.task_done()
 
Thread(target=producteur).start()
Thread(target=consommateur).start()

20.4 concurrent.futures

API unifiée threads/process :

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
 
def tâche(n):
    time.sleep(0.1)
    return n ** 2
 
# Par threads (I/O-bound)
with ThreadPoolExecutor(max_workers=4) as pool:
    futurs = [pool.submit(tâche, i) for i in range(10)]
    résultats = [f.result() for f in futurs]
 
# Version courte avec map
with ThreadPoolExecutor(max_workers=4) as pool:
    résultats = list(pool.map(tâche, range(10)))
 
# Par processes (CPU-bound)
with ProcessPoolExecutor(max_workers=4) as pool:
    résultats = list(pool.map(tâche, range(10)))

as_completed — traiter au fur et à mesure :

from concurrent.futures import as_completed
 
with ProcessPoolExecutor() as pool:
    futurs = {pool.submit(tâche, i): i for i in range(10)}
    for futur in as_completed(futurs):
        index = futurs[futur]
        try:
            résultat = futur.result()
            print(f"Tâche {index} terminée: {résultat}")
        except Exception as e:
            print(f"Tâche {index} échouée: {e}")

wait — attendre un sous-ensemble

from concurrent.futures import wait, FIRST_COMPLETED
 
futurs = [pool.submit(tâche, i) for i in range(10)]
terminés, _ = wait(futurs, return_when=FIRST_COMPLETED)

20.5 multiprocessing

Pool

from multiprocessing import Pool
 
def carré(x):
    return x ** 2
 
with Pool(processes=4) as pool:
    résultats = pool.map(carré, range(100))
    résultats_async = pool.map_async(carré, range(100))
    print(résultats_async.get())

Process

from multiprocessing import Process
 
def travail(nom):
    print(f"Process {nom} dans {__name__}")
 
if __name__ == "__main__":
    p = Process(target=travail, args=("A",))
    p.start()
    p.join()

Value et Array — mémoire partagée

from multiprocessing import Process, Value, Array
 
def incrémenter(compteur):
    with compteur.get_lock():
        compteur.value += 1
 
if __name__ == "__main__":
    compteur = Value("i", 0)  # 'i' = int signé
    procs = [Process(target=incrémenter, args=(compteur,)) for _ in range(10)]
    for p in procs: p.start()
    for p in procs: p.join()
    print(compteur.value)

Queue inter-process

from multiprocessing import Process, Queue
 
def producteur(q):
    q.put("data")
 
def consommateur(q):
    print(q.get())
 
if __name__ == "__main__":
    q = Queue()
    Process(target=producteur, args=(q,)).start()
    Process(target=consommateur, args=(q,)).start()

Pipe — communication directe

from multiprocessing import Process, Pipe
 
def envoyeur(conn):
    conn.send(["data", 42])
    conn.close()
 
if __name__ == "__main__":
    parent, enfant = Pipe()
    p = Process(target=envoyeur, args=(enfant,))
    p.start()
    print(parent.recv())
    p.join()

Manager — partage objets Python complexes

from multiprocessing import Process, Manager
 
def travailleur(d, lst):
    d["modifié"] = True
    lst.append(42)
 
if __name__ == "__main__":
    with Manager() as manager:
        d = manager.dict()
        lst = manager.list()
        p = Process(target=travailleur, args=(d, lst))
        p.start()
        p.join()
        print(d, lst)

20.6 GIL — Global Interpreter Lock

Le GIL permet à un seul thread d’exécuter du bytecode Python à la fois.

Conséquences :

  • threading : pas de parallélisme CPU (sauf I/O-bound)
  • multiprocessing : contourne le GIL (process séparés)
  • Les extensions C (NumPy, PyTorch) relâchent le GIL pendant les calculs

Contournements :

# I/O-bound → threading
# CPU-bound → multiprocessing
# Mixte → ProcessPoolExecutor pour le CPU + asyncio/threading pour l'I/O

20.7 subprocess — lancer des commandes

import subprocess
 
# Commande simple
résultat = subprocess.run(["ls", "-l"], capture_output=True, text=True)
print(résultat.stdout)
print(résultat.returncode)
 
# Avec shell (attention sécurité)
subprocess.run("echo hello | wc -c", shell=True)
 
# Timeout
try:
    subprocess.run(["sleep", "10"], timeout=2)
except subprocess.TimeoutExpired:
    print("Timeout")

Communication :

proc = subprocess.Popen(
    ["python3", "-c", "print(input()*2)"],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    text=True,
)
stdout, _ = proc.communicate(input="hello\n")
print(stdout)  # hellohello\n

20.8 Bonnes pratiques

  • ThreadPoolExecutor I/O-bound, ProcessPoolExecutor CPU-bound
  • Toujours protéger les sections critiques avec Lock
  • Éviter l’état partagé quand possible (préférer Queue)
  • Utiliser if __name__ == "__main__" avec multiprocessing
  • Managers pour partager des structures complexes
  • as_completed pour le parallélisme avec retours progressifs

🔗 ← Retour au cours · ← précédent · Suivant →