Skip to main content

Recipe — Hourly API pull

Every hour, hit a third-party REST API, transform the JSON payload, write rows into a SQL target. The pattern behind any "pull from Stripe / GitHub / Salesforce / your-own-microservice" workload.

This recipe mixes http, python and sql_query steps — and shows where each one shines.


What you build

PieceValue
TriggerCron — every hour at minute :15.
SourceA third-party REST API returning JSON.
TargetA SQL table with an upsert key (so re-running the hour is idempotent).
Steps1 × http (fetch) + 1 × python (transform + upsert) + 1 × sql_query (mark sync timestamp).
Retry3 attempts on the HTTP step, exponential backoff (rate-limits + transient 5xx).
AlertsSlack on failure; warn after 10 minutes.

The example pulls a feed of GitHub releases and upserts them into a releases table. The same shape applies to any paginated, time-bounded API feed.


Prerequisites

You needHow
A connector to the target SQL DB.The framework's default pool works for a first run.
The API's bearer token in the framework's env file as GITHUB_TOKEN.The framework reloads env on save.
A target table created upfront.See the SQL below.

The target table:

CREATE TABLE releases (
id BIGINT PRIMARY KEY,
repo TEXT NOT NULL,
tag TEXT NOT NULL,
name TEXT,
published_at TIMESTAMPTZ NOT NULL,
body TEXT,
synced_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE TABLE sync_state (
name TEXT PRIMARY KEY,
last_sync TIMESTAMPTZ
);

sync_state is a tiny bookkeeping table — one row per data feed, recording the last successful pull.


Step 1 — Create the job shell

On the Nomaflow page, click + New Job.

FieldValue
Idgithub-releases-hourly
DescriptionHourly pull of GitHub releases into the releases table.
Tagsapi-pull, hourly, github
Enabled

Schedule:

FieldValue
Cron15 * * * *
TimezoneUTC (the API uses UTC timestamps; matching them avoids surprises)

The :15 offset avoids the rush of jobs that fire on the hour everywhere else in the building.


Step 2 — Shared params

In the Shared params block:

KeyValueWhy
repofblettner/fblettner.github.ioWhich repo to fetch.
target_connectordefaultWhere to upsert.

These get merged into every Python step's op_kwargs at runtime — and they're overridable per-fire in the Run-with-parameters modal.


Step 3 — The HTTP fetch

Click + Add stepHTTP.

FieldValue
Namefetch-releases
URLhttps://api.github.com/repos/${kw:repo}/releases?per_page=30
MethodGET
HeadersAuthorization: Bearer ${env:GITHUB_TOKEN}
Accept: application/vnd.github+json
X-GitHub-Api-Version: 2022-11-28
Timeout60

The ${kw:repo} substitutes the merged kwarg repo at runtime — so a per-fire override of repo flows through naturally.

The HTTP step records the response body on its output. The next step reads it.


Step 4 — Retry on the HTTP step

In the job's Retry block:

FieldValue
Attempts3
Backoffexponential
Base seconds30

Waits 30s before the first retry, 60s before the second, 120s before the third. Catches:

  • 429 rate-limit from GitHub.
  • Transient 502 / 503 when GitHub is having an incident.
  • Network timeouts when the framework's outbound path hiccups.

The retry policy applies to every step, but the HTTP step is the only one likely to benefit. The Python step and the sync-state write are deterministic — retrying them won't change the outcome.


Step 5 — The Python transform + upsert

Click + Add stepPython.

FieldValue
Nameupsert-releases
Callablereleases.sync:upsert_releases
Op kwargsempty (everything comes from shared params + the previous step's output)
Timeout300

The Python function lives in plugins/releases/sync.py:

from datetime import datetime, timezone
from liberty.jobs.types import StepContext

async def upsert_releases(
ctx: StepContext,
*,
target_connector: str,
repo: str,
**_,
) -> dict:
"""Read the previous step's HTTP body and upsert each release."""
# Fetch the HTTP step's output from the run context.
http_step = ctx.previous_step("fetch-releases")
if http_step.output["status"] != 200:
raise RuntimeError(
f"GitHub returned status {http_step.output['status']}"
)
releases = http_step.output["body"] # decoded JSON list

target = ctx.get_connector(target_connector)
inserted = 0
updated = 0
for r in releases:
result = await target.execute(
"""
INSERT INTO releases (id, repo, tag, name, published_at, body, synced_at)
VALUES (:id, :repo, :tag, :name, :pub, :body, now())
ON CONFLICT (id) DO UPDATE SET
tag = excluded.tag,
name = excluded.name,
published_at = excluded.published_at,
body = excluded.body,
synced_at = now()
RETURNING (xmax = 0) AS inserted
""",
params={
"id": r["id"],
"repo": repo,
"tag": r["tag_name"],
"name": r.get("name"),
"pub": r["published_at"],
"body": r.get("body"),
},
)
if result.first()["inserted"]:
inserted += 1
else:
updated += 1

ctx.log.info(f"upserted {len(releases)} releases · {inserted} new · {updated} updated")
return {"fetched": len(releases), "inserted": inserted, "updated": updated}

Three things to notice:

PatternWhy
ctx.previous_step("fetch-releases").outputRead the HTTP step's recorded output. Steps inside a run share context this way.
ON CONFLICT (id) DO UPDATEIdempotent upsert. Re-running the hour doesn't create duplicates.
The return valueLands on the step's output in the run history. Operators reading the timeline see "fetched: 30, inserted: 2, updated: 28" instead of "succeeded".

Step 6 — Mark the sync timestamp

Click + Add stepSQL Query.

FieldValue
Namemark-synced
Connector${kw:target_connector}
Query(see below)
Timeout30
INSERT INTO sync_state (name, last_sync)
VALUES ('github-releases', now())
ON CONFLICT (name) DO UPDATE SET last_sync = now();

This step is tiny on purpose — its job is to record success in a queryable way. A monitoring dashboard can SELECT max(last_sync) FROM sync_state WHERE name = 'github-releases' and alert if the value is more than two hours old.


Step 7 — Alerts

In the Alerts block:

FieldValue
On failure
On long run minutes10
Recipientsempty

The 10-minute warning catches a step that's hung — the run typically finishes in well under a minute, so anything past ten is a sign something's wrong.

Save the job.


Step 8 — Smoke test

Click ▶ Run now. The Run-with-parameters modal opens (because the job has shared params + multiple steps). Submit without changes.

Watch the Run detail page:

✓ fetch-releases · 430 ms · status=200 · body=30 items
✓ upsert-releases · 220 ms · fetched: 30 · inserted: 2 · updated: 28
✓ mark-synced · 12 ms · 1 row affected

The first time, expect 30 inserts (or however many releases exist) and 0 updates. On the next hour, expect 0 inserts and 30 updates (because the data didn't change).

If anything fails, see Troubleshoot.


What you've built

JOB github-releases-hourly
├── schedule: "15 * * * *" (UTC)
├── params: { repo: "fblettner/fblettner.github.io", target_connector: "default" }
├── retry: { attempts: 3, backoff: exponential, base_seconds: 30 }
├── alerts: { on_failure: true, on_long_run_minutes: 10 }
└── steps:
1. fetch-releases (http · GET · 60s)
2. upsert-releases (python · releases.sync:upsert_releases · 5m)
3. mark-synced (sql_query · default · 30s)

Every hour, releases get pulled in. The first hour is full-table; subsequent hours mostly update unchanged rows (cheap, fast). The sync_state table makes monitoring trivial. A failure pings Slack within minutes.


Adapt to your case

Pagination

GitHub's /releases endpoint returns 30 by default. For feeds with more rows, the HTTP step pulls one page; the Python step loops to pull subsequent pages:

async def upsert_paginated(ctx, *, repo, target_connector, **_):
fetched = 0
page = 1
while True:
url = f"https://api.github.com/repos/{repo}/releases?per_page=100&page={page}"
# Use httpx or the framework's HTTP client
items = await ctx.http.get(url, headers={"Authorization": f"Bearer {os.environ['GITHUB_TOKEN']}"})
if not items: break
# ... upsert ...
fetched += len(items)
page += 1
return {"fetched": fetched, "pages": page - 1}

For very large feeds, replace the single Python step with a two-step shape: one Python step that lists page numbers + writes them to a queue table; one Python step that processes a batch. The first job runs hourly, the second runs every 5 minutes consuming the queue.

Incremental pulls

For APIs that support since=… filters, read the last sync_state.last_sync value at the top of the Python step and pass it to the HTTP URL. The job becomes a delta pull instead of a full pull — cheaper, faster, lighter on the upstream's rate limit.

Different transforms

The Python step is where shape mismatch gets resolved. Common patterns:

Source shapeTransform
Nested JSON (a release has authors, assets, …).Flatten — one row per release, denormalised columns. Or two tables — releases + release_assets with a foreign key.
Snake_case in target, camelCase in source.Map keys explicitly in the upsert.
Source dates as Unix timestamps.Convert to ISO strings in the Python before the upsert.

Authentication patterns

AuthHeaders
Bearer tokenAuthorization: Bearer ${env:TOKEN}
API keyX-API-Key: ${env:KEY}
Basic authAuthorization: Basic <base64>
AWS Sig v4Use a python step — the HTTP step doesn't sign.
OAuth refresh flowUse a python step — fetch a fresh token, then call the protected endpoint.

Common pitfalls

MistakeSymptomFix
Cron at 0 * * * *.Job collides with every other "every hour" workload in the building.Offset — 15 * * * * or 27 * * * *.
No ON CONFLICT clause.Re-running the hour blows up on duplicate keys.Always upsert when pulling from a re-readable source.
Hard-coded API token.Token leaks into version control / search results.Use ${env:…} and put the secret in the env file.
Retry attempts too high.A 401 (bad token) keeps retrying for 15 minutes.Cap at 3. A 4xx never retries successfully without intervention.
No sync_state table.Monitoring can't tell whether the last sync succeeded.Always mark a checkpoint.

What's next