Aller au contenu principal

Recette — Récupération horaire d'API

Toutes les heures, appeler une API REST tierce, transformer la charge utile JSON, écrire les lignes dans une cible SQL. Le motif derrière n'importe quelle charge de travail « récupérer depuis Stripe / GitHub / Salesforce / votre propre microservice ».

Cette recette combine les étapes http, python et sql_query — et montre le rôle de chacune.


Ce que vous construisez

ÉlémentValeur
DéclencheurCron — toutes les heures à la minute :15.
SourceUne API REST tierce qui renvoie du JSON.
CibleUne table SQL avec une clé d'upsert (rejouer une heure devient idempotent).
Étapes1 × http (récupération) + 1 × python (transformation + upsert) + 1 × sql_query (marquage de l'horodatage de synchronisation).
Nouvelles tentatives3 essais sur l'étape HTTP, backoff exponentiel (limitations de débit + 5xx transitoires).
AlertesSlack en cas d'échec ; avertissement au bout de 10 minutes.

L'exemple récupère un flux de releases GitHub et les upserte dans une table releases. La même structure s'applique à n'importe quel flux d'API paginé et borné dans le temps.


Prérequis

Ce qu'il fautComment
Un connecteur vers la base SQL cible.Le pool default du framework convient pour une première exécution.
Le bearer token de l'API dans le fichier d'environnement du framework sous GITHUB_TOKEN.Le framework recharge l'environnement à l'enregistrement.
Une table cible créée au préalable.Voir le SQL ci-dessous.

La table cible :

CREATE TABLE releases (
id BIGINT PRIMARY KEY,
repo TEXT NOT NULL,
tag TEXT NOT NULL,
name TEXT,
published_at TIMESTAMPTZ NOT NULL,
body TEXT,
synced_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE TABLE sync_state (
name TEXT PRIMARY KEY,
last_sync TIMESTAMPTZ
);

sync_state est une petite table de comptabilité — une ligne par flux de données, qui enregistre la dernière récupération réussie.


Étape 1 — Créer la coque de la tâche

Sur la page Nomaflow, cliquez sur + Nouvelle tâche.

ChampValeur
Idgithub-releases-hourly
DescriptionRécupération horaire des releases GitHub dans la table releases.
Tagsapi-pull, hourly, github
Activée

Planification :

ChampValeur
Cron15 * * * *
Fuseau horaireUTC (l'API utilise des horodatages UTC ; les aligner évite les surprises)

Le décalage :15 évite la cohue des tâches qui se déclenchent pile à l'heure partout ailleurs dans l'organisation.


Étape 2 — Paramètres partagés

Dans le bloc Shared params :

CléValeurPourquoi
repofblettner/fblettner.github.ioLe dépôt à récupérer.
target_connectordefaultLa cible de l'upsert.

Ces paramètres sont fusionnés dans les op_kwargs de chaque étape Python à l'exécution — et peuvent être remplacés par déclenchement dans la fenêtre Exécuter avec paramètres.


Étape 3 — La récupération HTTP

Cliquez sur + Ajouter une étapeHTTP.

ChampValeur
Nomfetch-releases
URLhttps://api.github.com/repos/${kw:repo}/releases?per_page=30
MéthodeGET
En-têtesAuthorization: Bearer ${env:GITHUB_TOKEN}
Accept: application/vnd.github+json
X-GitHub-Api-Version: 2022-11-28
Timeout60

${kw:repo} substitue le kwarg fusionné repo à l'exécution — un remplacement par déclenchement de repo se propage donc naturellement.

L'étape HTTP enregistre le corps de réponse dans sa sortie. L'étape suivante le lit.


Étape 4 — Nouvelle tentative sur l'étape HTTP

Dans le bloc Retry de la tâche :

ChampValeur
Essais3
Backoffexponential
Base seconds30

Attente de 30 s avant la première nouvelle tentative, 60 s avant la deuxième, 120 s avant la troisième. Cas pris en charge :

  • 429 limitation de débit côté GitHub.
  • 502 / 503 transitoires quand GitHub subit un incident.
  • Timeouts réseau lors d'à-coups du chemin sortant du framework.

La politique de nouvelle tentative s'applique à toutes les étapes, mais seule l'étape HTTP en tire vraiment parti. L'étape Python et l'écriture de l'état de synchronisation sont déterministes — les rejouer ne change rien au résultat.


Étape 5 — La transformation Python + upsert

Cliquez sur + Ajouter une étapePython.

ChampValeur
Nomupsert-releases
Callablereleases.sync:upsert_releases
Op kwargsvide (tout vient des paramètres partagés + de la sortie de l'étape précédente)
Timeout300

La fonction Python réside dans plugins/releases/sync.py :

from datetime import datetime, timezone
from liberty.jobs.types import StepContext

async def upsert_releases(
ctx: StepContext,
*,
target_connector: str,
repo: str,
**_,
) -> dict:
"""Read the previous step's HTTP body and upsert each release."""
# Fetch the HTTP step's output from the run context.
http_step = ctx.previous_step("fetch-releases")
if http_step.output["status"] != 200:
raise RuntimeError(
f"GitHub returned status {http_step.output['status']}"
)
releases = http_step.output["body"] # decoded JSON list

target = ctx.get_connector(target_connector)
inserted = 0
updated = 0
for r in releases:
result = await target.execute(
"""
INSERT INTO releases (id, repo, tag, name, published_at, body, synced_at)
VALUES (:id, :repo, :tag, :name, :pub, :body, now())
ON CONFLICT (id) DO UPDATE SET
tag = excluded.tag,
name = excluded.name,
published_at = excluded.published_at,
body = excluded.body,
synced_at = now()
RETURNING (xmax = 0) AS inserted
""",
params={
"id": r["id"],
"repo": repo,
"tag": r["tag_name"],
"name": r.get("name"),
"pub": r["published_at"],
"body": r.get("body"),
},
)
if result.first()["inserted"]:
inserted += 1
else:
updated += 1

ctx.log.info(f"upserted {len(releases)} releases · {inserted} new · {updated} updated")
return {"fetched": len(releases), "inserted": inserted, "updated": updated}

Trois points méritent attention :

MotifPourquoi
ctx.previous_step("fetch-releases").outputLecture de la sortie enregistrée de l'étape HTTP. Les étapes d'une même exécution partagent ainsi leur contexte.
ON CONFLICT (id) DO UPDATEUpsert idempotent. Rejouer l'heure ne crée pas de doublons.
La valeur de retourAtterrit dans la sortie de l'étape dans l'historique d'exécution. Les opérateurs qui lisent la chronologie voient « fetched: 30, inserted: 2, updated: 28 » au lieu de « succeeded ».

Étape 6 — Marquer l'horodatage de synchronisation

Cliquez sur + Ajouter une étapeSQL Query.

ChampValeur
Nommark-synced
Connecteur${kw:target_connector}
Requête(voir ci-dessous)
Timeout30
INSERT INTO sync_state (name, last_sync)
VALUES ('github-releases', now())
ON CONFLICT (name) DO UPDATE SET last_sync = now();

Cette étape reste volontairement minuscule — sa fonction est de consigner le succès dans une forme interrogeable. Un tableau de bord de supervision peut exécuter SELECT max(last_sync) FROM sync_state WHERE name = 'github-releases' et alerter si la valeur dépasse deux heures.


Étape 7 — Alertes

Dans le bloc Alerts :

ChampValeur
En cas d'échec
Sur exécution longue (minutes)10
Destinatairesvide

L'avertissement à 10 minutes capte une étape figée — l'exécution se termine en général en bien moins d'une minute, donc tout dépassement de dix minutes signale un problème.

Enregistrez la tâche.


Étape 8 — Test rapide

Cliquez sur ▶ Lancer maintenant. La fenêtre Exécuter avec paramètres s'ouvre (la tâche possède des paramètres partagés + plusieurs étapes). Validez sans modification.

Observez la page Détail d'exécution :

✓ fetch-releases · 430 ms · status=200 · body=30 items
✓ upsert-releases · 220 ms · fetched: 30 · inserted: 2 · updated: 28
✓ mark-synced · 12 ms · 1 row affected

À la première exécution, comptez 30 insertions (ou autant de releases qu'il en existe) et 0 mise à jour. À l'heure suivante, comptez 0 insertion et 30 mises à jour (les données n'ont pas changé).

En cas d'échec, voir Diagnostic.


Ce que vous avez construit

JOB github-releases-hourly
├── schedule: "15 * * * *" (UTC)
├── params: { repo: "fblettner/fblettner.github.io", target_connector: "default" }
├── retry: { attempts: 3, backoff: exponential, base_seconds: 30 }
├── alerts: { on_failure: true, on_long_run_minutes: 10 }
└── steps:
1. fetch-releases (http · GET · 60s)
2. upsert-releases (python · releases.sync:upsert_releases · 5m)
3. mark-synced (sql_query · default · 30s)

Chaque heure, les releases sont récupérées. La première heure réalise un chargement complet ; les heures suivantes mettent surtout à jour des lignes inchangées (peu coûteux, rapide). La table sync_state rend la supervision triviale. Un échec déclenche une notification Slack en quelques minutes.


Adapter à votre cas

Pagination

L'endpoint /releases de GitHub renvoie 30 éléments par défaut. Pour des flux plus volumineux, l'étape HTTP récupère une page ; l'étape Python boucle pour récupérer les suivantes :

async def upsert_paginated(ctx, *, repo, target_connector, **_):
fetched = 0
page = 1
while True:
url = f"https://api.github.com/repos/{repo}/releases?per_page=100&page={page}"
# Use httpx or the framework's HTTP client
items = await ctx.http.get(url, headers={"Authorization": f"Bearer {os.environ['GITHUB_TOKEN']}"})
if not items: break
# ... upsert ...
fetched += len(items)
page += 1
return {"fetched": fetched, "pages": page - 1}

Pour des flux très volumineux, remplacez l'étape Python unique par une forme à deux étapes : une étape Python qui liste les numéros de page et les écrit dans une table de file d'attente ; une seconde étape Python qui traite un lot. La première tâche tourne toutes les heures, la seconde toutes les 5 minutes en consommant la file.

Récupérations incrémentales

Pour les API qui prennent en charge des filtres since=…, lisez la dernière valeur de sync_state.last_sync en début d'étape Python et passez-la à l'URL HTTP. La tâche devient une récupération delta au lieu d'une récupération complète — moins coûteuse, plus rapide, plus légère pour la limite de débit du service amont.

Transformations variées

L'étape Python sert à résoudre les divergences de forme. Motifs courants :

Forme sourceTransformation
JSON imbriqué (une release porte auteurs, assets, …).Aplatir — une ligne par release, colonnes dénormalisées. Ou deux tables — releases + release_assets avec une clé étrangère.
Snake_case côté cible, camelCase côté source.Mapper les clés explicitement dans l'upsert.
Dates source en timestamps Unix.Convertir en chaînes ISO en Python avant l'upsert.

Motifs d'authentification

AuthEn-têtes
Bearer tokenAuthorization: Bearer ${env:TOKEN}
Clé APIX-API-Key: ${env:KEY}
Basic authAuthorization: Basic <base64>
AWS Sig v4Utiliser une étape python — l'étape HTTP ne signe pas.
Flux OAuth refreshUtiliser une étape python — récupérer un token frais, puis appeler l'endpoint protégé.

Pièges fréquents

ErreurSymptômeCorrectif
Cron à 0 * * * *.La tâche entre en collision avec toutes les autres charges « chaque heure » de l'organisation.Décaler — 15 * * * * ou 27 * * * *.
Absence de clause ON CONFLICT.Rejouer l'heure explose sur des clés en doublon.Toujours upserter quand la source est rejouable.
Token d'API codé en dur.Le token fuite dans le contrôle de version / les résultats de recherche.Utiliser ${env:…} et placer le secret dans le fichier d'environnement.
Trop d'essais.Un 401 (token erroné) réessaie pendant 15 minutes.Plafonner à 3. Un 4xx ne réussit jamais sans intervention.
Absence de table sync_state.La supervision ne sait pas si la dernière synchronisation a réussi.Toujours inscrire un point de contrôle.

Pour la suite