26. Network et APIs
requests, httpx, aiohttp, sessions, timeouts, retry, authentification, REST APIs, téléchargement.
26.1 requests — le standard HTTP
pip install requests
GET
import requests
resp = requests.get( "https://api.github.com/user" , timeout = 5 )
resp.status_code # 200
resp.ok # True
resp.headers # dict des en-têtes
resp.json() # dict depuis JSON
resp.text # contenu texte
resp.content # contenu bytes
POST
resp = requests.post(
"https://httpbin.org/post" ,
json = { "nom" : "Alice" , "âge" : 30 },
timeout = 5 ,
)
Paramètres
resp = requests.get(
"https://api.github.com/search/repositories" ,
params = { "q" : "python" , "sort" : "stars" },
)
headers = { "Authorization" : "Bearer token123" , "User-Agent" : "MonApp" }
resp = requests.get( "https://api.github.com/user" , headers = headers)
# Basic Auth
from requests.auth import HTTPBasicAuth
resp = requests.get( "https://api.example.com" , auth = HTTPBasicAuth( "user" , "pass" ))
Sessions
session = requests.Session()
session.headers.update({ "User-Agent" : "MonApp/1.0" })
# Réutilise les cookies et la connexion TCP
resp1 = session.get( "https://httpbin.org/cookies/set?nom=Alice" )
resp2 = session.get( "https://httpbin.org/cookies" )
# resp2.json()["cookies"]["nom"] == "Alice"
Téléchargement de fichiers
resp = requests.get( "https://example.com/grand_fichier.bin" , stream = True )
with open ( "sortie.bin" , "wb" ) as f:
for chunk in resp.iter_content( chunk_size = 8192 ):
f.write(chunk)
Gestion d’erreurs
try :
resp = requests.get( "https://api.github.com/unknown" , timeout = 5 )
resp.raise_for_status() # lève HTTPError si 4xx/5xx
except requests.exceptions.Timeout:
print ( "Timeout" )
except requests.exceptions.ConnectionError:
print ( "Connexion impossible" )
except requests.exceptions.HTTPError as e:
print ( f "Erreur HTTP: { e } " )
26.2 httpx — moderne, compatible async
pip install httpx
API synchrone (identique à requests)
import httpx
with httpx.Client() as client:
resp = client.get( "https://example.com" , timeout = 5 )
print (resp.status_code)
print (resp.json())
API asynchrone
import httpx
import asyncio
async def fetch (url: str ) -> dict :
async with httpx.AsyncClient() as client:
resp = await client.get(url, timeout = 5 )
return resp.json()
async def main ():
urls = [
"https://api.github.com" ,
"https://httpbin.org/get" ,
"https://example.com" ,
]
async with httpx.AsyncClient() as client:
tasks = [client.get(url) for url in urls]
résultats = await asyncio.gather( * tasks)
for resp in résultats:
print (resp.url, resp.status_code)
asyncio.run(main())
Timeout granulaire
timeout = httpx.Timeout( 5.0 , connect = 2.0 , read = 3.0 , write = 3.0 )
client = httpx.Client( timeout = timeout)
Retry (httpx n’a pas de retry intégré)
from tenacity import retry, stop_after_attempt, wait_exponential
@retry ( stop = stop_after_attempt( 3 ), wait = wait_exponential( multiplier = 1 , min = 1 , max = 10 ))
def fetch_avec_retry (url: str ) -> dict :
with httpx.Client() as client:
resp = client.get(url, timeout = 5 )
resp.raise_for_status()
return resp.json()
26.3 aiohttp — async natif
pip install aiohttp
import aiohttp
import asyncio
async def fetch (session: aiohttp.ClientSession, url: str ) -> str :
async with session.get(url) as resp:
return await resp.text()
async def main ():
async with aiohttp.ClientSession() as session:
html = await fetch(session, "https://example.com" )
print ( len (html))
# Concurrence
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, f "https://api.github.com" ) for _ in range ( 10 )]
résultats = await asyncio.gather( * tasks)
asyncio.run(main())
Limiter les connexions simultanées
connector = aiohttp.TCPConnector( limit = 10 ) # max 10 connexions
async with aiohttp.ClientSession( connector = connector) as session:
...
26.4 Comparaison
Bibliothèque Sync Async Sessions Retry natif Popularité requests✅ ❌ ✅ ❌ (tenacity) Très élevée httpx✅ ✅ ✅ ❌ (tenacity) Élevée aiohttp❌ ✅ ✅ ❌ Élevée urllib✅ ❌ ✅ ❌ Standard
26.5 REST API — design patterns
Client REST générique
import httpx
from typing import Any
class APIClient :
def __init__ (self, base_url: str , token: str | None = None ):
self .client = httpx.Client( base_url = base_url, timeout = 10 )
if token:
self .client.headers[ "Authorization" ] = f "Bearer { token } "
def get (self, endpoint: str , ** params) -> dict[ str , Any]:
resp = self .client.get(endpoint, params = params)
resp.raise_for_status()
return resp.json()
def post (self, endpoint: str , data: dict[ str , Any] | None = None ) -> dict[ str , Any]:
resp = self .client.post(endpoint, json = data)
resp.raise_for_status()
return resp.json()
def close (self):
self .client.close()
# Usage
api = APIClient( "https://api.github.com" , token = "ghp_xxx" )
repos = api.get( "/search/repositories" , q = "python" , sort = "stars" )
api.close()
Retry with exponential backoff
import time
from functools import wraps
def retry (max_attempts = 3 , base_delay = 1.0 ):
def decorator (f):
@wraps (f)
def wrapper ( * args, ** kwargs):
for attempt in range (max_attempts):
try :
return f( * args, ** kwargs)
except (httpx.HTTPError, httpx.TimeoutException) as e:
if attempt == max_attempts - 1 :
raise
delay = base_delay * ( 2 ** attempt)
print ( f "Tentative { attempt + 1} échouée, nouvelle tentative dans { delay :.1f } s" )
time.sleep(delay)
return None
return wrapper
return decorator
class APIClientRobuste ( APIClient ):
@retry ( max_attempts = 3 )
def get (self, endpoint, ** params):
return super ().get(endpoint, ** params)
@retry ( max_attempts = 3 )
def post (self, endpoint, data = None ):
return super ().post(endpoint, data)
26.6 WebSocket
pip install websockets
import asyncio
import websockets
async def écouter ():
async with websockets.connect( "wss://echo.websocket.org" ) as ws:
await ws.send( "Hello" )
réponse = await ws.recv()
print (réponse)
asyncio.run(écouter())
26.7 Bonnes pratiques
Toujours définir un timeout (évite les blocages infinis)
Utiliser les sessions pour réutiliser les connexions TCP
raise_for_status() pour détecter les erreurs HTTP
Streaming pour les gros fichiers (iter_content)
Retry avec exponential backoff pour les APIs
Limiter les connexions simultanées (asyncio.Semaphore, TCPConnector)
Fermer les sessions (client.close() ou async with)
Pour une thèse ML : httpx est le meilleur compromis (sync + async)
🔗 ← Retour au cours · ← précédent · Suivant →