Aller au contenu principal

Recette — Synchronisation planifiée de base

La charge de travail Nomaflow la plus courante : toutes les nuits, copier un ensemble de tables d'une base opérationnelle vers une base de reporting. Deux étapes SQL Copy, une politique de nouvelle tentative, une notification Slack en sortie.

Cette recette représente le modèle à adapter pour chaque cas d'usage « rafraîchir la table X de la base A vers la base B ».


Ce que vous construisez

ÉlémentValeur
DéclencheurCron — tous les jours à 02 h 00 heure de Paris.
SourceUn PostgreSQL opérationnel avec les tables customers et orders.
CibleUn PostgreSQL de reporting avec reporting.customers et reporting.orders.
Étapes2 × sql_copy (une par table) + 1 × http (webhook Slack).
Nouvelles tentatives2 essais, attente fixe de 60 secondes.
AlertesEn cas d'échec, notification sur le canal Slack d'astreinte.

Résultat final : chaque matin, les tables de reporting sont fraîches ; en cas d'échec de la synchronisation, l'astreinte reçoit un message Slack en quelques minutes.


Prérequis

Ce qu'il fautComment l'obtenir
Un connecteur source vers la base opérationnelle.Paramètres → Connecteurs → + Nouveau — pointez vers la base opérationnelle.
Un connecteur cible vers la base de reporting.Idem — pointez vers la base de reporting. Le pool default du framework convient pour un test rapide.
Une URL de webhook Slack, stockée en variable d'environnement.L'administrateur Slack fournit l'URL ; ajoutez SLACK_WEBHOOK_URL au fichier d'environnement du framework.

Les deux connecteurs doivent répondre en vert dans Paramètres → Connecteurs → Tester la connexion.


Étape 1 — Créer la coque de la tâche

Sur la page Nomaflow, cliquez sur + Nouvelle tâche.

ChampValeur
Idreporting-nightly-sync
DescriptionRafraîchissement nocturne de la base de reporting à partir de la base opérationnelle.
Tagsetl, nightly, reporting
Activée

Planification :

ChampValeur
Cron0 2 * * *
Fuseau horaireEurope/Paris

L'aperçu en direct doit afficher prochaine : demain 02 h 00 · le lendemain 02 h 00 · le surlendemain 02 h 00. Enregistrez.


Étape 2 — Ajouter la copie de customers

Cliquez sur + Ajouter une étapeSQL Copy.

ChampValeur
Nomcopy-customers
Connecteur SourceVotre connecteur source (par exemple ops-db).
Schéma Sourcepublic
Table Sourcecustomers
Connecteur CibleVotre connecteur cible (par exemple reporting-db).
Schéma Ciblereporting
Table Ciblecustomers
Modeoverwrite
Taille de lot10000 (par défaut)
Timeout1800 (30 min — généreux pour une table quotidienne)

Enregistrez l'étape.

Pourquoi le mode overwrite ? Il écrit d'abord dans une table tampon, puis bascule de manière atomique. La table customers du reporting n'est jamais vide en cours d'exécution — les opérateurs qui consultent les écrans de reporting à 02 h 00 min 30 voient encore les données de la veille jusqu'à la bascule, puis celles du jour.


Étape 3 — Ajouter la copie de orders

Même structure qu'à l'étape 2 avec orders à la place de customers. Portez le timeout à 3600 (1 h) — orders est en général plus volumineuse que customers, accordez-lui de la marge.

L'éditeur de tâche affiche désormais deux étapes sql_copy dans l'ordre. Enregistrez.


Étape 4 — Ajouter la notification de succès

Cliquez sur + Ajouter une étapeHTTP.

ChampValeur
Nomnotify-slack
URL${env:SLACK_WEBHOOK_URL}
MéthodePOST
En-têtesContent-Type: application/json
Corps{ "text": "✅ reporting-nightly-sync succeeded" }
Timeout30

Enregistrez l'étape. Le corps est rédigé en JSON ; l'étape HTTP sérialise le dictionnaire en JSON au moment de l'envoi.

La tâche compte maintenant trois étapes ; l'étape HTTP ne s'exécute que si les deux SQL Copies ont réussi (les étapes s'enchaînent dans l'ordre ; un échec arrête l'exécution).


Étape 5 — Ajouter la politique de nouvelle tentative

Dans le bloc Retry de l'éditeur :

ChampValeur
Essais2
Backofffixed
Base seconds60

Cette politique s'applique par étape — si copy-orders échoue une fois sur un soubresaut réseau transitoire, l'exécuteur attend 60 secondes puis réessaie. Deux essais offrent une seule nouvelle tentative, ce qui suffit pour les défaillances transitoires sans masquer un vrai bug.


Étape 6 — Ajouter l'alerte d'échec

Dans le bloc Alerts :

ChampValeur
En cas d'échec
Sur exécution longue (minutes)90 (alerter si la tâche tourne encore au bout de 90 minutes — normalement elle se termine en 10)
Destinatairesvide (utiliser les valeurs par défaut du framework)

L'alerte sur échec est routée par le canal de notification configuré globalement dans le framework. Voir Notifications pour le câblage.

Enregistrez l'ensemble de la tâche.


Étape 7 — Test rapide

N'attendez pas 02 h 00 pour vérifier que tout fonctionne. Dans le catalogue de tâches :

  1. Cliquez sur ▶ Lancer maintenant sur reporting-nightly-sync.
  2. La fenêtre Exécuter avec paramètres s'ouvre, car la tâche compte plusieurs étapes (et non parce qu'elle a des paramètres — elle n'en a pas).
  3. Cliquez sur ▶ Exécuter sans rien modifier.
  4. L'exécution apparaît dans le catalogue en RUNNING. Cliquez sur le badge pour ouvrir le Détail d'exécution.
  5. Regardez les trois étapes passer au vert.
  6. À la réussite de la troisième étape, votre canal Slack reçoit le message ✅.

En cas d'échec, voir Diagnostic.


Ce que vous avez construit

JOB reporting-nightly-sync
├── schedule: "0 2 * * *" (Europe/Paris)
├── retry: { attempts: 2, backoff: fixed, base_seconds: 60 }
├── alerts: { on_failure: true, on_long_run_minutes: 90 }
└── steps:
1. copy-customers (sql_copy · overwrite · 30 min timeout)
2. copy-orders (sql_copy · overwrite · 1 h timeout)
3. notify-slack (http · POST · 30s timeout)

Chaque nuit à 02 h 00 heure de Paris, les tables customers et orders du reporting sont rafraîchies de manière atomique. Le canal Slack est informé du succès ; l'astreinte est alertée en cas d'échec.


Adapter à votre cas

Plus de tables

Ajoutez des étapes sql_copy avant notify-slack. Elles s'enchaînent dans l'ordre ; la notification de succès n'est émise que si toutes les copies réussissent. Pour 20 tables, l'historique d'exécution reste lisible — la chronologie affiche 21 lignes, une par étape.

Si la liste de tables devient assez longue pour rendre l'éditeur encombrant, envisagez :

MotifQuand
Regrouper les tables par fréquence.Une tâche daily + une tâche weekly qui copie les dimensions peu volatiles.
Regrouper les tables par source.Une tâche par base source.
Utiliser une étape Python qui pilote une boucle.Une étape qui lit une configuration et copie N tables. Perte de la chronologie par table ; gain en compacité.

Mode de copie différent

Motif sourceMode
Base opérationnelle, cible remplacée intégralement chaque nuit.overwrite (cette recette).
Motif snapshot — garder les lignes d'hier, ajouter celles d'aujourd'hui.append.
Dimensions à évolution lente — mettre à jour les lignes existantes, insérer les nouvelles.upsert (la cible doit avoir une clé primaire).

Routage d'alerte différent

Remplacez l'étape notify-slack par :

DestinationType d'étapeNotes
E-mailhttp vers une passerelle SMTP, ou une étape python utilisant le canal de notification du framework.La voie python représente la voie canonique — voir Notifications.
Microsoft Teamshttp vers un webhook entrant Teams.La forme du corps diffère de celle de Slack.
Webhook génériquehttp vers votre endpoint.Charge utile auto-descriptive.

Ajouter une assertion sur le nombre de lignes

Vérification de sûreté courante : interrompre la tâche si le nombre de lignes source du jour diffère fortement de celui de la veille (signal que le pipeline source s'est cassé en amont). Insérez une étape python avant les copies :

# plugins/reporting/sanity.py
async def assert_row_counts(ctx, *, threshold_percent: float = 50.0, **_) -> dict:
source = ctx.get_connector("ops-db")
target = ctx.get_connector("reporting-db")
src_count = await source.scalar("select count(*) from public.customers")
tgt_count = await target.scalar("select count(*) from reporting.customers")
delta = abs(src_count - tgt_count) / max(tgt_count, 1) * 100
ctx.log.info(f"source={src_count} · target={tgt_count} · delta={delta:.1f}%")
if delta > threshold_percent:
raise RuntimeError(f"row-count delta {delta:.1f}% > threshold {threshold_percent}%")
return {"source_rows": src_count, "target_rows": tgt_count, "delta_percent": delta}

Câblée comme étape python avec op_kwargs = { threshold_percent: 50 }. Si la source du jour compte moitié moins de lignes que la veille, l'étape échoue et l'astreinte est alertée — avant que des données erronées n'atterrissent dans la base de reporting.


Pièges fréquents

ErreurSymptômeCorrectif
Planification en UTC, heure de Paris attendue.La tâche se déclenche à 03 h 00 / 04 h 00 locales.Réglez le fuseau sur Europe/Paris.
Trop d'essais.Une requête mal configurée réessaie pendant 15 minutes avant de tomber.Plafonnez à 2 ou 3 essais.
Étape HTTP avec URL de webhook codée en dur.L'URL finit dans le contrôle de version / les résultats de recherche.Utilisez ${env:SLACK_WEBHOOK_URL} et placez le secret dans le fichier d'environnement.
Aucune alerte.Un échec silencieux que personne ne remarque pendant des jours.Activez toujours alerts.on_failure = true.
Mode append sur une tâche de rafraîchissement.La cible grossit sans fin ; les requêtes ralentissent.Utilisez overwrite pour les motifs de rafraîchissement.

Pour la suite