Jobs — Nomaflow
Nomaflow est l'ordonnanceur de jobs in-process du framework. Les jobs sont déclarés en TOML, se déclenchent sur planification cron ou à la demande, s'exécutent comme une séquence linéaire d'étapes typées et enregistrent leur historique d'exécution dans le pool default. L'ensemble du moteur se trouve dans le processus FastAPI — aucun worker séparé, aucun broker, aucun démon compagnon — de sorte que le même ./start.sh qui sert la SPA planifie et exécute aussi chaque job.
Nomaflow est destiné à la glu opérationnelle dont la plupart des installations ont besoin : ETL nocturnes, synchronisations horaires, imports LDAP, envois de rapports planifiés, reconstructions ponctuelles manuelles. Pour des charges qui demandent une exécution distribuée, du parallélisme à l'échelle de l'heure ou de la sémantique DAG, un orchestrateur externe reste l'outil approprié.
Vue d'ensemble
Où il se situe
Nomaflow fait partie du binaire du framework — aucun processus séparé à installer, configurer ou superviser. Le même worker FastAPI qui sert l'API REST exécute l'ordonnanceur et l'exécuteur d'étape. Les implications :
- La concurrence est bornée par le pool de threads du worker + la boucle d'événements asyncio. Une étape SQL attend sur la base, une étape HTTP attend sur le réseau ; les deux sont asynchrones et ne bloquent pas le reste du travail.
- Le redémarrage interrompt les jobs en cours. Un déclenchement cron planifié en milieu d'étape est annulé via la politique de relance. Les jobs longs (à l'échelle de l'heure) doivent être conçus pour pointer un état intermédiaire ou être relançables sans risque.
- Un ordonnanceur par processus. Faire tourner deux répliques du framework contre la même base déclencherait deux fois chaque cron ; l'ordonnanceur fourni utilise un verrou consultatif sur la ligne
ly2_jobs_lockpour l'empêcher, mais les ops devraient quand même épingler les tâches d'ordonnancement à une réplique via[jobs] scheduler_enabled = true/false.
Pour les charges qui ne rentrent pas dans ces contraintes, l'API REST du framework permet de déclencher des jobs Nomaflow depuis un orchestrateur externe — Airflow, Dagster, ou même un cron + curl ordinaire. Le côté Liberty devient alors un « exécuteur d'étapes structuré avec une UI » plutôt qu'un ordonnanceur.
Un job minimal
plugins/billing/jobs.toml :
[[jobs]]
name = "billing-nightly-rebuild"
app = "billing"
schedule = "0 2 * * *" # cron : tous les jours à 02:00
timezone = "Europe/Paris"
enabled = true
[jobs.retry]
max_attempts = 3
backoff = "exponential"
initial_delay_seconds = 60
[[jobs.steps]]
name = "refresh-totals"
type = "sql_query"
connector = "billing"
query = "refresh-totals:write"
[[jobs.steps]]
name = "rebuild-vat"
type = "python"
callable = "billing.invoicing:rebuild_vat"
kwargs = { period = "${month.previous}" }
Le job se déclenche à 02:00 chaque jour. Les deux étapes s'exécutent dans l'ordre ; un échec sur refresh-totals déclenche la politique de relance sur cette étape, pas sur tout le job. L'historique d'exécution enregistre l'entrée, la sortie et le minutage de chaque étape.
jobs.toml décrit chaque champ. Types d'étape décrit ce que fait chaque type et les kwargs qu'il accepte.
Déclencher un job
| Déclencheur | Surface | Cas d'usage |
|---|---|---|
| Planification cron | schedule = "0 2 * * *" dans jobs.toml. Cron standard à 5 champs avec un 6e optionnel pour les secondes. | Travail de fond récurrent. |
| Manuel depuis l'UI | Paramètres → Jobs → ▶ Exécuter sur n'importe quel job. L'opérateur peut surcharger params pour cette unique exécution. | Reconstructions ponctuelles, envois à la demande. |
| API REST | POST /admin/jobs/<name>/run avec le JWT de l'opérateur. Le corps accepte des surcharges de params. | Orchestrateurs externes, pipelines CI / CD. |
| CLI | liberty-admin job run <name>. | Scripts shell, travail ad-hoc des opérateurs. |
Chaque déclencheur passe par le même dispatch — même moteur d'étape, même politique de relance, même enregistrement. La source du déclenchement est conservée sur la ligne d'exécution (triggered_by = "cron" | "user:alice" | "api" | "cli:bob").
Types d'étape en un paragraphe
sql_query— exécute une requête SQL nommée (lecture ou écriture) sur un connecteur. Le nombre de lignes du résultat est enregistré.sql_copy— diffuse des lignes d'un pool vers un autre, avec coercition de types et bascule de table atomique. Utile pour l'ETL d'une base opérationnelle vers un entrepôt de reporting.python— appelle une fonction Python dansliberty-apps/plugins/. La porte de sortie pour tout ce qui ne rentre pas dans les étapes déclaratives.http— appelle un endpoint HTTP / API, passe la réponse à l'étape suivante.ldap_sync— récupère un sous-arbre d'annuaire, mappe les attributs via un bloc de configuration et fait un upsert vers un connecteur. Remplace les scripts LDAP sur mesure que la plupart des installations finissent par écrire.
Chacun est documenté sous Types d'étape avec la référence complète des kwargs.
Historique d'exécution
Chaque exécution de job produit :
- Une ligne d'exécution dans
ly2_job_runs— id, nom du job, started_at, finished_at, statut, triggered_by, instantané des params. - Une ligne d'exécution d'étape par étape dans
ly2_step_runs— id d'exécution, nom de l'étape, type, started_at, finished_at, statut, instantané d'entrée, instantané de sortie, message d'erreur. - Un flux de log dans
ly2_job_logs— chaque appellog.info()/log.warning()/log.error()depuis un appel d'étape, plus les événements structurés propres au framework.
La page Paramètres → Jobs → Exécutions parcourt l'historique ; le tiroir Détail d'exécution affiche la timeline par étape, les entrées et sorties, la queue de log (diffusée en direct via Socket.IO tant que l'exécution est en cours).
| Statut | Signification |
|---|---|
running | L'exécution est en cours. |
succeeded | Chaque étape a retourné avec succès. |
failed | Une étape a levé après épuisement de toutes les relances. |
aborted | Un opérateur a cliqué sur Abandonner — chaque étape en cours est annulée. |
skipped | Le job devait se déclencher mais son exécution précédente n'était pas terminée (une seule à la fois par job). |
La rétention est configurée sous [jobs] history_days dans app.toml (90 jours par défaut) ; les exécutions plus anciennes sont purgées par un job de nettoyage intégré qui se déclenche une fois par jour à 03:00.
Supervision en direct
Un opérateur connecté sur la page Jobs reçoit :
- Une ligne qui apparaît dans le panneau En cours dès qu'une exécution démarre.
- La liste des étapes qui se remplit en temps réel à mesure que chaque étape transite par
running→succeeded/failed. - La queue de log diffusée ligne par ligne — le même contenu qu'un
tail -fsur le serveur, simplement routé via Socket.IO.
La diffusion est passive — fermer le navigateur n'annule pas l'exécution. Rouvrir la page reprend l'état en direct depuis le suivi en mémoire.
Pour les opérateurs qui préfèrent le shell, liberty-admin job logs --follow <run-id> fait la même chose contre le bus d'événements côté serveur.
Quand utiliser Nomaflow plutôt qu'un orchestrateur externe
| Préférer Nomaflow quand… | Préférer un orchestrateur externe quand… |
|---|---|
| Les charges sont portées par l'installation et ne traversent pas plusieurs services. | Les charges traversent de nombreux services et demandent une vue globale. |
| L'ensemble du pipeline finit en quelques minutes. | Une seule étape prend des heures. |
| Cron + étapes linéaires + relance suffisent. | Une sémantique DAG, une expansion dynamique de tâches ou du calcul distribué est nécessaire. |
| Un seul outil, une seule UI, un seul flux de log est souhaité. | Airflow, Dagster ou Prefect tournent déjà pour tout le reste. |
| À défaut, il faudrait écrire à la main un script Python + un timer systemd. | À défaut, il faudrait écrire à la main un CronJob Kubernetes avec des sidecars. |
La plupart des installations se trouvent dans la colonne de gauche ; la colonne de droite commence à compter quand une seule charge dépasse les limites d'un seul serveur.
Conseils et bonnes pratiques
- Épingler les tâches d'ordonnancement à une seule réplique. Sur une installation multi-réplique, définir
[jobs] scheduler_enabled = falsesur chaque réplique sauf une. Le verrou consultatif empêche le double-déclenchement même quand cela est oublié, mais le réglage explicite rend la topologie évidente dans les logs. - Ne pas placer de travail long dans une étape
python. Un calcul de 90 minutes in-process bloque le worker ; déléguer à une file ou à un processus séparé et laisser l'étape le déclencher. - Enregistrer ce qui a été fait, pas ce qui aurait dû l'être. Une étape qui retourne
{"rows_affected": N}rend l'historique d'exécution exploitable ; une étape qui retourneNoneest une boîte noire à 3 h du matin. - Utiliser
dry_runsur chaque étape destructrice. Un kwarg booléen qui bascule l'étape en mode comptage seul permet de déboguer un job depuis l'UI sans muter les données. - Ne pas se reposer sur le cron seul pour le travail critique. Si un échec d'exécution doit alerter quelqu'un, router les événements d'échec de Nomaflow vers votre système d'alerte (Slack, OpsGenie, …) via une étape
httpqui poste sur le webhook.
Pour aller plus loin
jobs.toml— la référence TOML complète.- Types d'étape — ce que fait chaque étape.
- Exécutions et supervision — la page d'historique d'exécution, le flux de log en direct, le parcours d'abandon.
- Apps et Plugins → Plugins — écrire l'appel Python derrière une étape
python.