Aller au contenu principal

Jobs — Nomaflow

Nomaflow est l'ordonnanceur de jobs in-process du framework. Les jobs sont déclarés en TOML, se déclenchent sur planification cron ou à la demande, s'exécutent comme une séquence linéaire d'étapes typées et enregistrent leur historique d'exécution dans le pool default. L'ensemble du moteur se trouve dans le processus FastAPI — aucun worker séparé, aucun broker, aucun démon compagnon — de sorte que le même ./start.sh qui sert la SPA planifie et exécute aussi chaque job.

Nomaflow est destiné à la glu opérationnelle dont la plupart des installations ont besoin : ETL nocturnes, synchronisations horaires, imports LDAP, envois de rapports planifiés, reconstructions ponctuelles manuelles. Pour des charges qui demandent une exécution distribuée, du parallélisme à l'échelle de l'heure ou de la sémantique DAG, un orchestrateur externe reste l'outil approprié.


Vue d'ensemble

Nomaflow — comment un job s'exécuteDÉCLENCHEURcron · manuel · APIMOTEUR D'EXÉCUTIONséquence linéaire d'étapesTYPES D'ÉTAPEsql_query · sql_copy · python · http · ldap_syncENREGISTRER + DIFFUSERhistorique · queue de log · Socket.IOORDONNANCEUR — APScheduler dans le processus FastAPIcron triggers fire at the configured timemanual / API triggers go through the same dispatchretry / backoff policy applies on step failure (per job config)run rows persisted in ly2_job_runs + ly2_step_runs

Où il se situe

Nomaflow fait partie du binaire du framework — aucun processus séparé à installer, configurer ou superviser. Le même worker FastAPI qui sert l'API REST exécute l'ordonnanceur et l'exécuteur d'étape. Les implications :

  • La concurrence est bornée par le pool de threads du worker + la boucle d'événements asyncio. Une étape SQL attend sur la base, une étape HTTP attend sur le réseau ; les deux sont asynchrones et ne bloquent pas le reste du travail.
  • Le redémarrage interrompt les jobs en cours. Un déclenchement cron planifié en milieu d'étape est annulé via la politique de relance. Les jobs longs (à l'échelle de l'heure) doivent être conçus pour pointer un état intermédiaire ou être relançables sans risque.
  • Un ordonnanceur par processus. Faire tourner deux répliques du framework contre la même base déclencherait deux fois chaque cron ; l'ordonnanceur fourni utilise un verrou consultatif sur la ligne ly2_jobs_lock pour l'empêcher, mais les ops devraient quand même épingler les tâches d'ordonnancement à une réplique via [jobs] scheduler_enabled = true/false.

Pour les charges qui ne rentrent pas dans ces contraintes, l'API REST du framework permet de déclencher des jobs Nomaflow depuis un orchestrateur externe — Airflow, Dagster, ou même un cron + curl ordinaire. Le côté Liberty devient alors un « exécuteur d'étapes structuré avec une UI » plutôt qu'un ordonnanceur.


Un job minimal

plugins/billing/jobs.toml :

[[jobs]]
name = "billing-nightly-rebuild"
app = "billing"
schedule = "0 2 * * *" # cron : tous les jours à 02:00
timezone = "Europe/Paris"
enabled = true

[jobs.retry]
max_attempts = 3
backoff = "exponential"
initial_delay_seconds = 60

[[jobs.steps]]
name = "refresh-totals"
type = "sql_query"
connector = "billing"
query = "refresh-totals:write"

[[jobs.steps]]
name = "rebuild-vat"
type = "python"
callable = "billing.invoicing:rebuild_vat"
kwargs = { period = "${month.previous}" }

Le job se déclenche à 02:00 chaque jour. Les deux étapes s'exécutent dans l'ordre ; un échec sur refresh-totals déclenche la politique de relance sur cette étape, pas sur tout le job. L'historique d'exécution enregistre l'entrée, la sortie et le minutage de chaque étape.

jobs.toml décrit chaque champ. Types d'étape décrit ce que fait chaque type et les kwargs qu'il accepte.


Déclencher un job

DéclencheurSurfaceCas d'usage
Planification cronschedule = "0 2 * * *" dans jobs.toml. Cron standard à 5 champs avec un 6e optionnel pour les secondes.Travail de fond récurrent.
Manuel depuis l'UIParamètres → Jobs → ▶ Exécuter sur n'importe quel job. L'opérateur peut surcharger params pour cette unique exécution.Reconstructions ponctuelles, envois à la demande.
API RESTPOST /admin/jobs/<name>/run avec le JWT de l'opérateur. Le corps accepte des surcharges de params.Orchestrateurs externes, pipelines CI / CD.
CLIliberty-admin job run <name>.Scripts shell, travail ad-hoc des opérateurs.

Chaque déclencheur passe par le même dispatch — même moteur d'étape, même politique de relance, même enregistrement. La source du déclenchement est conservée sur la ligne d'exécution (triggered_by = "cron" | "user:alice" | "api" | "cli:bob").


Types d'étape en un paragraphe

  • sql_query — exécute une requête SQL nommée (lecture ou écriture) sur un connecteur. Le nombre de lignes du résultat est enregistré.
  • sql_copy — diffuse des lignes d'un pool vers un autre, avec coercition de types et bascule de table atomique. Utile pour l'ETL d'une base opérationnelle vers un entrepôt de reporting.
  • python — appelle une fonction Python dans liberty-apps/plugins/. La porte de sortie pour tout ce qui ne rentre pas dans les étapes déclaratives.
  • http — appelle un endpoint HTTP / API, passe la réponse à l'étape suivante.
  • ldap_sync — récupère un sous-arbre d'annuaire, mappe les attributs via un bloc de configuration et fait un upsert vers un connecteur. Remplace les scripts LDAP sur mesure que la plupart des installations finissent par écrire.

Chacun est documenté sous Types d'étape avec la référence complète des kwargs.


Historique d'exécution

Chaque exécution de job produit :

  • Une ligne d'exécution dans ly2_job_runs — id, nom du job, started_at, finished_at, statut, triggered_by, instantané des params.
  • Une ligne d'exécution d'étape par étape dans ly2_step_runs — id d'exécution, nom de l'étape, type, started_at, finished_at, statut, instantané d'entrée, instantané de sortie, message d'erreur.
  • Un flux de log dans ly2_job_logs — chaque appel log.info() / log.warning() / log.error() depuis un appel d'étape, plus les événements structurés propres au framework.

La page Paramètres → Jobs → Exécutions parcourt l'historique ; le tiroir Détail d'exécution affiche la timeline par étape, les entrées et sorties, la queue de log (diffusée en direct via Socket.IO tant que l'exécution est en cours).

StatutSignification
runningL'exécution est en cours.
succeededChaque étape a retourné avec succès.
failedUne étape a levé après épuisement de toutes les relances.
abortedUn opérateur a cliqué sur Abandonner — chaque étape en cours est annulée.
skippedLe job devait se déclencher mais son exécution précédente n'était pas terminée (une seule à la fois par job).

La rétention est configurée sous [jobs] history_days dans app.toml (90 jours par défaut) ; les exécutions plus anciennes sont purgées par un job de nettoyage intégré qui se déclenche une fois par jour à 03:00.


Supervision en direct

Un opérateur connecté sur la page Jobs reçoit :

  • Une ligne qui apparaît dans le panneau En cours dès qu'une exécution démarre.
  • La liste des étapes qui se remplit en temps réel à mesure que chaque étape transite par runningsucceeded / failed.
  • La queue de log diffusée ligne par ligne — le même contenu qu'un tail -f sur le serveur, simplement routé via Socket.IO.

La diffusion est passive — fermer le navigateur n'annule pas l'exécution. Rouvrir la page reprend l'état en direct depuis le suivi en mémoire.

Pour les opérateurs qui préfèrent le shell, liberty-admin job logs --follow <run-id> fait la même chose contre le bus d'événements côté serveur.


Quand utiliser Nomaflow plutôt qu'un orchestrateur externe

Préférer Nomaflow quand…Préférer un orchestrateur externe quand…
Les charges sont portées par l'installation et ne traversent pas plusieurs services.Les charges traversent de nombreux services et demandent une vue globale.
L'ensemble du pipeline finit en quelques minutes.Une seule étape prend des heures.
Cron + étapes linéaires + relance suffisent.Une sémantique DAG, une expansion dynamique de tâches ou du calcul distribué est nécessaire.
Un seul outil, une seule UI, un seul flux de log est souhaité.Airflow, Dagster ou Prefect tournent déjà pour tout le reste.
À défaut, il faudrait écrire à la main un script Python + un timer systemd.À défaut, il faudrait écrire à la main un CronJob Kubernetes avec des sidecars.

La plupart des installations se trouvent dans la colonne de gauche ; la colonne de droite commence à compter quand une seule charge dépasse les limites d'un seul serveur.


Conseils et bonnes pratiques

  • Épingler les tâches d'ordonnancement à une seule réplique. Sur une installation multi-réplique, définir [jobs] scheduler_enabled = false sur chaque réplique sauf une. Le verrou consultatif empêche le double-déclenchement même quand cela est oublié, mais le réglage explicite rend la topologie évidente dans les logs.
  • Ne pas placer de travail long dans une étape python. Un calcul de 90 minutes in-process bloque le worker ; déléguer à une file ou à un processus séparé et laisser l'étape le déclencher.
  • Enregistrer ce qui a été fait, pas ce qui aurait dû l'être. Une étape qui retourne {"rows_affected": N} rend l'historique d'exécution exploitable ; une étape qui retourne None est une boîte noire à 3 h du matin.
  • Utiliser dry_run sur chaque étape destructrice. Un kwarg booléen qui bascule l'étape en mode comptage seul permet de déboguer un job depuis l'UI sans muter les données.
  • Ne pas se reposer sur le cron seul pour le travail critique. Si un échec d'exécution doit alerter quelqu'un, router les événements d'échec de Nomaflow vers votre système d'alerte (Slack, OpsGenie, …) via une étape http qui poste sur le webhook.

Pour aller plus loin