26. Network et APIs

requests, httpx, aiohttp, sessions, timeouts, retry, authentification, REST APIs, téléchargement.


26.1 requests — le standard HTTP

pip install requests

GET

import requests
 
resp = requests.get("https://api.github.com/user", timeout=5)
resp.status_code          # 200
resp.ok                   # True
resp.headers              # dict des en-têtes
resp.json()               # dict depuis JSON
resp.text                 # contenu texte
resp.content              # contenu bytes

POST

resp = requests.post(
    "https://httpbin.org/post",
    json={"nom": "Alice", "âge": 30},
    timeout=5,
)

Paramètres

resp = requests.get(
    "https://api.github.com/search/repositories",
    params={"q": "python", "sort": "stars"},
)

Headers et authentification

headers = {"Authorization": "Bearer token123", "User-Agent": "MonApp"}
resp = requests.get("https://api.github.com/user", headers=headers)
 
# Basic Auth
from requests.auth import HTTPBasicAuth
resp = requests.get("https://api.example.com", auth=HTTPBasicAuth("user", "pass"))

Sessions

session = requests.Session()
session.headers.update({"User-Agent": "MonApp/1.0"})
 
# Réutilise les cookies et la connexion TCP
resp1 = session.get("https://httpbin.org/cookies/set?nom=Alice")
resp2 = session.get("https://httpbin.org/cookies")
# resp2.json()["cookies"]["nom"] == "Alice"

Téléchargement de fichiers

resp = requests.get("https://example.com/grand_fichier.bin", stream=True)
with open("sortie.bin", "wb") as f:
    for chunk in resp.iter_content(chunk_size=8192):
        f.write(chunk)

Gestion d’erreurs

try:
    resp = requests.get("https://api.github.com/unknown", timeout=5)
    resp.raise_for_status()  # lève HTTPError si 4xx/5xx
except requests.exceptions.Timeout:
    print("Timeout")
except requests.exceptions.ConnectionError:
    print("Connexion impossible")
except requests.exceptions.HTTPError as e:
    print(f"Erreur HTTP: {e}")

26.2 httpx — moderne, compatible async

pip install httpx

API synchrone (identique à requests)

import httpx
 
with httpx.Client() as client:
    resp = client.get("https://example.com", timeout=5)
    print(resp.status_code)
    print(resp.json())

API asynchrone

import httpx
import asyncio
 
async def fetch(url: str) -> dict:
    async with httpx.AsyncClient() as client:
        resp = await client.get(url, timeout=5)
        return resp.json()
 
async def main():
    urls = [
        "https://api.github.com",
        "https://httpbin.org/get",
        "https://example.com",
    ]
    async with httpx.AsyncClient() as client:
        tasks = [client.get(url) for url in urls]
        résultats = await asyncio.gather(*tasks)
        for resp in résultats:
            print(resp.url, resp.status_code)
 
asyncio.run(main())

Timeout granulaire

timeout = httpx.Timeout(5.0, connect=2.0, read=3.0, write=3.0)
client = httpx.Client(timeout=timeout)

Retry (httpx n’a pas de retry intégré)

from tenacity import retry, stop_after_attempt, wait_exponential
 
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
def fetch_avec_retry(url: str) -> dict:
    with httpx.Client() as client:
        resp = client.get(url, timeout=5)
        resp.raise_for_status()
        return resp.json()

26.3 aiohttp — async natif

pip install aiohttp
import aiohttp
import asyncio
 
async def fetch(session: aiohttp.ClientSession, url: str) -> str:
    async with session.get(url) as resp:
        return await resp.text()
 
async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, "https://example.com")
        print(len(html))
 
    # Concurrence
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, f"https://api.github.com") for _ in range(10)]
        résultats = await asyncio.gather(*tasks)
 
asyncio.run(main())

Limiter les connexions simultanées

connector = aiohttp.TCPConnector(limit=10)  # max 10 connexions
async with aiohttp.ClientSession(connector=connector) as session:
    ...

26.4 Comparaison

BibliothèqueSyncAsyncSessionsRetry natifPopularité
requests❌ (tenacity)Très élevée
httpx❌ (tenacity)Élevée
aiohttpÉlevée
urllibStandard

26.5 REST API — design patterns

Client REST générique

import httpx
from typing import Any
 
class APIClient:
    def __init__(self, base_url: str, token: str | None = None):
        self.client = httpx.Client(base_url=base_url, timeout=10)
        if token:
            self.client.headers["Authorization"] = f"Bearer {token}"
 
    def get(self, endpoint: str, **params) -> dict[str, Any]:
        resp = self.client.get(endpoint, params=params)
        resp.raise_for_status()
        return resp.json()
 
    def post(self, endpoint: str, data: dict[str, Any] | None = None) -> dict[str, Any]:
        resp = self.client.post(endpoint, json=data)
        resp.raise_for_status()
        return resp.json()
 
    def close(self):
        self.client.close()
 
# Usage
api = APIClient("https://api.github.com", token="ghp_xxx")
repos = api.get("/search/repositories", q="python", sort="stars")
api.close()

Retry with exponential backoff

import time
from functools import wraps
 
def retry(max_attempts=3, base_delay=1.0):
    def decorator(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return f(*args, **kwargs)
                except (httpx.HTTPError, httpx.TimeoutException) as e:
                    if attempt == max_attempts - 1:
                        raise
                    delay = base_delay * (2 ** attempt)
                    print(f"Tentative {attempt+1} échouée, nouvelle tentative dans {delay:.1f}s")
                    time.sleep(delay)
            return None
        return wrapper
    return decorator
 
class APIClientRobuste(APIClient):
    @retry(max_attempts=3)
    def get(self, endpoint, **params):
        return super().get(endpoint, **params)
 
    @retry(max_attempts=3)
    def post(self, endpoint, data=None):
        return super().post(endpoint, data)

26.6 WebSocket

pip install websockets
import asyncio
import websockets
 
async def écouter():
    async with websockets.connect("wss://echo.websocket.org") as ws:
        await ws.send("Hello")
        réponse = await ws.recv()
        print(réponse)
 
asyncio.run(écouter())

26.7 Bonnes pratiques

  • Toujours définir un timeout (évite les blocages infinis)
  • Utiliser les sessions pour réutiliser les connexions TCP
  • raise_for_status() pour détecter les erreurs HTTP
  • Streaming pour les gros fichiers (iter_content)
  • Retry avec exponential backoff pour les APIs
  • Limiter les connexions simultanées (asyncio.Semaphore, TCPConnector)
  • Fermer les sessions (client.close() ou async with)
  • Pour une thèse ML : httpx est le meilleur compromis (sync + async)

🔗 ← Retour au cours · ← précédent · Suivant →