PostgreSQL / MySQL recipe
Source: PostgresSample/ · production-style relational source with keyset pagination + watermark sync so it scales to large tables and re-runs cheaply.
Owns in the academic graph: research grants, funding agencies, faculty PIs.
What it teaches
- Keyset pagination instead of
OFFSET— O(1) per page, naturally incremental. - Watermark sync — persist the high-water-mark after each commit; next run resumes from there.
- A driver-agnostic source: change the URL scheme, the same code talks to PostgreSQL or MySQL.
Keyset pagination + watermark
public const string PagedSql =
"SELECT id, title, amount_usd, ..., updated_at FROM grants " +
"WHERE updated_at > @startKey " +
"ORDER BY updated_at ASC " +
"LIMIT @pageSize";
var mark = new SqlServerSource.Watermark(watermarkPath);
var startKey = mark.Read(fallback: "1970-01-01T00:00:00Z");
var lastSeen = startKey;
foreach (var row in db.StreamPaged(
GrantsIngest.PagedSql,
startKey,
GrantsIngest.Map,
keyOf: r => r.UpdatedAt.ToString("o"),
pageSize: pageSize))
{
GrantsIngest.Ingest(graph, row);
lastSeen = row.UpdatedAt.ToString("o");
if (++count % 500 == 0) await graph.CommitPendingAsync();
}
await graph.CommitPendingAsync();
mark.Write(lastSeen); // Watermark advances only after commit succeeds.
Configuration
| Variable | Purpose | Default |
|---|---|---|
RECIPE_DB_URL |
postgres://user:pass@host:5432/db or mysql://… |
(required) |
RECIPE_PAGE_SIZE |
Rows per page | 1000 |
RECIPE_WATERMARK_PATH |
Watermark file | data/.watermark |
Reuse notes
- Use a monotonic indexed column for
updated_atto make the pagination cheap. - For strict ordering, use a composite cursor
(updated_at, id)to defeat clock-skew ties. - Hard deletes don't appear in
updated_atqueries — combine with CDC (Kafka recipe) when you need them.