20. Concurrence Avancée
threading,multiprocessing,concurrent.futures, GIL,ProcessPoolExecutor, shared memory,queue.Queue, synchronisation,subprocess.
20.1 Threads vs Processes vs Async
| Mécanisme | Parallélisme | GIL | Mémoire | Usage |
|---|---|---|---|---|
threading | Concurrence (I/O) | Bloqué par le GIL | Partagée | I/O réseau, fichiers |
multiprocessing | Parallélisme (CPU) | Pas de GIL | Isolée (sauf shared mem) | Calcul lourd |
asyncio | Concurrence (I/O) | Un seul thread | Partagée | I/O haute performance |
20.2 threading
Création basique
import threading
def travail(nom):
print(f"Thread {nom} démarre")
threads = []
for i in range(5):
t = threading.Thread(target=travail, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join() # attendre la finClasse Thread personnalisée
class MonThread(threading.Thread):
def __init__(self, nom):
super().__init__(name=nom)
self.résultat = None
def run(self):
self.résultat = sum(range(10_000_000))
t = MonThread("calc")
t.start()
t.join()
print(t.résultat)Verrous (Lock)
from threading import Lock
compteur = 0
verrou = Lock()
def incrémenter():
global compteur
for _ in range(100_000):
with verrou:
compteur += 1
threads = [threading.Thread(target=incrémenter) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(compteur) # 1_000_000 (correct grâce au verrou)RLock — verrou réentrant
from threading import RLock
verrou = RLock() # même thread peut acquérir plusieurs fois
def f():
with verrou:
g()
def g():
with verrou: # OK avec RLock, bloquerait avec Lock
passSemaphore — nombre limité d’accès
from threading import Semaphore
sémaphore = Semaphore(3) # max 3 threads simultanés
def accès_ressource_limitée():
with sémaphore:
...Event — signal entre threads
from threading import Event
prêt = Event()
def travailleur():
print("En attente du signal")
prêt.wait() # bloque jusqu'au set()
print("Go !")
t = threading.Thread(target=travailleur)
t.start()
import time; time.sleep(1)
prêt.set()ThreadPoolExecutor
Voir section 20.4.
20.3 queue.Queue — file thread-safe
from queue import Queue
from threading import Thread
q = Queue()
def producteur():
for i in range(10):
q.put(i)
q.put(None) # signal de fin
def consommateur():
while True:
item = q.get()
if item is None:
break
print(f"Traitement: {item}")
q.task_done()
Thread(target=producteur).start()
Thread(target=consommateur).start()20.4 concurrent.futures
API unifiée threads/process :
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
def tâche(n):
time.sleep(0.1)
return n ** 2
# Par threads (I/O-bound)
with ThreadPoolExecutor(max_workers=4) as pool:
futurs = [pool.submit(tâche, i) for i in range(10)]
résultats = [f.result() for f in futurs]
# Version courte avec map
with ThreadPoolExecutor(max_workers=4) as pool:
résultats = list(pool.map(tâche, range(10)))
# Par processes (CPU-bound)
with ProcessPoolExecutor(max_workers=4) as pool:
résultats = list(pool.map(tâche, range(10)))as_completed — traiter au fur et à mesure :
from concurrent.futures import as_completed
with ProcessPoolExecutor() as pool:
futurs = {pool.submit(tâche, i): i for i in range(10)}
for futur in as_completed(futurs):
index = futurs[futur]
try:
résultat = futur.result()
print(f"Tâche {index} terminée: {résultat}")
except Exception as e:
print(f"Tâche {index} échouée: {e}")wait — attendre un sous-ensemble
from concurrent.futures import wait, FIRST_COMPLETED
futurs = [pool.submit(tâche, i) for i in range(10)]
terminés, _ = wait(futurs, return_when=FIRST_COMPLETED)20.5 multiprocessing
Pool
from multiprocessing import Pool
def carré(x):
return x ** 2
with Pool(processes=4) as pool:
résultats = pool.map(carré, range(100))
résultats_async = pool.map_async(carré, range(100))
print(résultats_async.get())Process
from multiprocessing import Process
def travail(nom):
print(f"Process {nom} dans {__name__}")
if __name__ == "__main__":
p = Process(target=travail, args=("A",))
p.start()
p.join()Value et Array — mémoire partagée
from multiprocessing import Process, Value, Array
def incrémenter(compteur):
with compteur.get_lock():
compteur.value += 1
if __name__ == "__main__":
compteur = Value("i", 0) # 'i' = int signé
procs = [Process(target=incrémenter, args=(compteur,)) for _ in range(10)]
for p in procs: p.start()
for p in procs: p.join()
print(compteur.value)Queue inter-process
from multiprocessing import Process, Queue
def producteur(q):
q.put("data")
def consommateur(q):
print(q.get())
if __name__ == "__main__":
q = Queue()
Process(target=producteur, args=(q,)).start()
Process(target=consommateur, args=(q,)).start()Pipe — communication directe
from multiprocessing import Process, Pipe
def envoyeur(conn):
conn.send(["data", 42])
conn.close()
if __name__ == "__main__":
parent, enfant = Pipe()
p = Process(target=envoyeur, args=(enfant,))
p.start()
print(parent.recv())
p.join()Manager — partage objets Python complexes
from multiprocessing import Process, Manager
def travailleur(d, lst):
d["modifié"] = True
lst.append(42)
if __name__ == "__main__":
with Manager() as manager:
d = manager.dict()
lst = manager.list()
p = Process(target=travailleur, args=(d, lst))
p.start()
p.join()
print(d, lst)20.6 GIL — Global Interpreter Lock
Le GIL permet à un seul thread d’exécuter du bytecode Python à la fois.
Conséquences :
threading: pas de parallélisme CPU (sauf I/O-bound)multiprocessing: contourne le GIL (process séparés)- Les extensions C (NumPy, PyTorch) relâchent le GIL pendant les calculs
Contournements :
# I/O-bound → threading
# CPU-bound → multiprocessing
# Mixte → ProcessPoolExecutor pour le CPU + asyncio/threading pour l'I/O20.7 subprocess — lancer des commandes
import subprocess
# Commande simple
résultat = subprocess.run(["ls", "-l"], capture_output=True, text=True)
print(résultat.stdout)
print(résultat.returncode)
# Avec shell (attention sécurité)
subprocess.run("echo hello | wc -c", shell=True)
# Timeout
try:
subprocess.run(["sleep", "10"], timeout=2)
except subprocess.TimeoutExpired:
print("Timeout")Communication :
proc = subprocess.Popen(
["python3", "-c", "print(input()*2)"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True,
)
stdout, _ = proc.communicate(input="hello\n")
print(stdout) # hellohello\n20.8 Bonnes pratiques
ThreadPoolExecutorI/O-bound,ProcessPoolExecutorCPU-bound- Toujours protéger les sections critiques avec
Lock - Éviter l’état partagé quand possible (préférer
Queue) - Utiliser
if __name__ == "__main__"avecmultiprocessing - Managers pour partager des structures complexes
as_completedpour le parallélisme avec retours progressifs