Curiosity

SQL Database Connector

Cursor-paginate a relational table into the graph using Dapper on top of whichever ADO.NET provider matches your engine. The pattern is the same for SQL Server, PostgreSQL, MySQL/MariaDB, and SQLite — only the connection string and the parameter syntax change.

Packages

Curiosity.Library on NuGet Dapper on NuGet

Pick the provider package for your engine:

Microsoft.Data.SqlClient on NuGet

dotnet add package Curiosity.Library
dotnet add package Dapper
dotnet add package Microsoft.Data.SqlClient

Expected source shape

A Customers table with at minimum a stable primary key, an updated-at column for incremental sync, and the business fields:

CREATE TABLE Customers (
    customer_id  INT          PRIMARY KEY,
    name         NVARCHAR(200) NOT NULL,
    email        NVARCHAR(200),
    country      NVARCHAR(2),
    updated_at   DATETIME2    NOT NULL
);

Connector code (SQL Server example)

Program.cs
using System.Data;
using Curiosity.Library;
using Dapper;
using Microsoft.Data.SqlClient;

[Node]
public class Customer
{
    [Key]       public string         CustomerId { get; set; }
    [Property]  public string         Name       { get; set; }
    [Property]  public string         Email      { get; set; }
    [Property]  public string         Country    { get; set; }
    [Timestamp] public DateTimeOffset UpdatedAt  { get; set; }
}

// DTO matches the SELECT columns.
class CustomerRow
{
    public int      customer_id { get; set; }
    public string   name        { get; set; }
    public string   email       { get; set; }
    public string   country     { get; set; }
    public DateTime updated_at  { get; set; }
}

using var graph = Graph.Connect(
    endpoint:      Environment.GetEnvironmentVariable("CURIOSITY_ENDPOINT")!,
    token:         Environment.GetEnvironmentVariable("CURIOSITY_TOKEN")!,
    connectorName: "sql-customers");

await graph.CreateNodeSchemaAsync<Customer>();
graph.SetAutoCommitCost(everyNodes: 10_000);

var connString = Environment.GetEnvironmentVariable("SQL_CONNECTION")!;
using var conn = new SqlConnection(connString);
await conn.OpenAsync();

// Incremental sync: read everything updated since the last cursor.
var lastSync = ReadCheckpoint() ?? DateTimeOffset.MinValue;

const string sql = @"
    SELECT customer_id, name, email, country, updated_at
    FROM   Customers
    WHERE  updated_at > @lastSync
    ORDER BY updated_at ASC";

var ingested = 0;

// Stream rows — Dapper's `buffered: false` keeps memory bounded.
foreach (var row in await conn.QueryAsync<CustomerRow>(sql, new { lastSync }, buffered: false))
{
    graph.AddOrUpdate(new Customer
    {
        CustomerId = row.customer_id.ToString(),
        Name       = row.name,
        Email      = row.email,
        Country    = row.country,
        UpdatedAt  = new DateTimeOffset(DateTime.SpecifyKind(row.updated_at, DateTimeKind.Utc)),
    });
    ingested++;
}

await graph.CommitPendingAsync();

// Save the new cursor for the next run.
WriteCheckpoint(DateTimeOffset.UtcNow);

Console.WriteLine($"Ingested {ingested} customers since {lastSync:o}");

// --- Trivial file-based checkpoint helpers; replace with S3/DB in production. ---
DateTimeOffset? ReadCheckpoint()
{
    var p = "checkpoint.txt";
    return File.Exists(p) ? DateTimeOffset.Parse(File.ReadAllText(p)) : null;
}

void WriteCheckpoint(DateTimeOffset value) => File.WriteAllText("checkpoint.txt", value.ToString("o"));

How it works

QueryAsync<T> with buffered: false (or Query<T> synchronously with buffered: false) returns an IEnumerable<T> that streams from the database connection one row at a time — memory stays flat. The updated_at > @lastSync predicate makes the connector incremental: each run only pulls rows that changed since the last cursor.

SetAutoCommitCost(everyNodes: 10_000) ensures we flush to Curiosity in batches without holding the entire result set in graph buffers either.

The [Key] is the source primary key, stringified — exactly what Idempotency recommends.

Real-world ingest usually involves multiple tables. Define a [Node] per table and edge constants between them:

[Node] public class Customer  { [Key] public string CustomerId { get; set; } /* ... */ }
[Node] public class Order     { [Key] public string OrderId    { get; set; } /* ... */ }

public static class Edges
{
    public const string PlacedOrder = nameof(PlacedOrder);
    public const string OrderedBy   = nameof(OrderedBy);
}

Then in the order-ingest loop:

const string sql = @"
    SELECT o.order_id, o.customer_id, o.total, o.placed_at
    FROM   Orders o
    WHERE  o.updated_at > @lastSync";

foreach (var row in await conn.QueryAsync<OrderRow>(sql, new { lastSync }, buffered: false))
{
    var order = graph.AddOrUpdate(new Order { /* ... */ });

    graph.Link(
        Node.FromKey(nameof(Customer), row.customer_id.ToString()),
        order,
        Edges.PlacedOrder,
        Edges.OrderedBy);
}

Node.FromKey(...) avoids a round-trip — the edge becomes active as soon as the Customer node is ingested (now or later). See Querying for when you'd fetch the linked node instead.

Provider differences

Engine Connection string Parameter syntax Notes
SQL Server Server=...;Database=...;User Id=...;Password=...;TrustServerCertificate=true @param Use Microsoft.Data.SqlClient, not System.Data.SqlClient.
PostgreSQL Host=...;Database=...;Username=...;Password=... @param Npgsql 6+ supports DateTimeOffset natively.
MySQL/MariaDB Server=...;Database=...;User=...;Password=... @param MySqlConnector is the modern fork; prefer it over MySql.Data.
SQLite Data Source=mydb.db @param Use parameterized LIMIT @batch OFFSET @offset for pagination instead of cursors.

The rest of the code — Dapper.QueryAsync<T>, the connector loop, Curiosity.Library calls — is identical across providers.

Notes & pitfalls

  • Time zones. DateTime from a SQL DATETIME2 is unspecified-kind. Force it to UTC (DateTime.SpecifyKind(..., DateTimeKind.Utc)) before constructing the DateTimeOffset, otherwise local-time offsets sneak in.
  • buffered: false is critical. The default buffered: true materializes the whole result set into a list before returning. On a 50M-row table that means OOM. Always pass buffered: false for streaming reads.
  • Cursor robustness. Storing lastSync as the wall-clock time of the run is almost correct — a row written during ingest with an earlier updated_at than lastSync will be missed on the next run. For strictness, store MAX(updated_at) over the rows you actually ingested.
  • Idempotent re-runs. Because AddOrUpdate is keyed on the source primary key, replaying a window of updated_at is safe — the graph just gets the same writes again.
  • Schema drift. Adding a column to the source is fine — the DTO ignores unknown columns. Removing one breaks the DTO; either drop the property or mark it nullable.
  • Connection pool. ADO.NET pools connections per connection string. Keep a single IGraph and a single SqlConnection for the duration of the run.

See also

© 2026 Curiosity. All rights reserved.