Curiosity

PostgreSQL / MySQL recipe

Source: PostgresSample/ · production-style relational source with keyset pagination + watermark sync so it scales to large tables and re-runs cheaply.

Owns in the academic graph: research grants, funding agencies, faculty PIs.

What it teaches

  • Keyset pagination instead of OFFSET — O(1) per page, naturally incremental.
  • Watermark sync — persist the high-water-mark after each commit; next run resumes from there.
  • A driver-agnostic source: change the URL scheme, the same code talks to PostgreSQL or MySQL.
flowchart LR Mark[(Watermark file)] -->|startKey| Q[Connector] Q -->|"SELECT … WHERE updated_at > @startKey ORDER BY updated_at LIMIT @pageSize"| DB[(SQL server)] DB -->|page of rows| Q Q -->|graph.AddOrUpdate / graph.Link| WS[(Workspace)] Q -->|advance watermark| Mark

Keyset pagination + watermark

public const string PagedSql =
    "SELECT id, title, amount_usd, ..., updated_at FROM grants " +
    "WHERE updated_at > @startKey " +
    "ORDER BY updated_at ASC " +
    "LIMIT @pageSize";

var mark     = new SqlServerSource.Watermark(watermarkPath);
var startKey = mark.Read(fallback: "1970-01-01T00:00:00Z");
var lastSeen = startKey;

foreach (var row in db.StreamPaged(
             GrantsIngest.PagedSql,
             startKey,
             GrantsIngest.Map,
             keyOf: r => r.UpdatedAt.ToString("o"),
             pageSize: pageSize))
{
    GrantsIngest.Ingest(graph, row);
    lastSeen = row.UpdatedAt.ToString("o");
    if (++count % 500 == 0) await graph.CommitPendingAsync();
}

await graph.CommitPendingAsync();
mark.Write(lastSeen);  // Watermark advances only after commit succeeds.

Configuration

Variable Purpose Default
RECIPE_DB_URL postgres://user:pass@host:5432/db or mysql://… (required)
RECIPE_PAGE_SIZE Rows per page 1000
RECIPE_WATERMARK_PATH Watermark file data/.watermark

Reuse notes

  • Use a monotonic indexed column for updated_at to make the pagination cheap.
  • For strict ordering, use a composite cursor (updated_at, id) to defeat clock-skew ties.
  • Hard deletes don't appear in updated_at queries — combine with CDC (Kafka recipe) when you need them.

Referenced by