SQL Database Connector
Cursor-paginate a relational table into the graph using Dapper on top of whichever ADO.NET provider matches your engine. The pattern is the same for SQL Server, PostgreSQL, MySQL/MariaDB, and SQLite — only the connection string and the parameter syntax change.
Packages
Curiosity.Library on NuGet Dapper on NuGet
Pick the provider package for your engine:
Microsoft.Data.SqlClient on NuGet
dotnet add package Curiosity.Library
dotnet add package Dapper
dotnet add package Microsoft.Data.SqlClient
Expected source shape
A Customers table with at minimum a stable primary key, an updated-at column for incremental sync, and the business fields:
CREATE TABLE Customers (
customer_id INT PRIMARY KEY,
name NVARCHAR(200) NOT NULL,
email NVARCHAR(200),
country NVARCHAR(2),
updated_at DATETIME2 NOT NULL
);
Connector code (SQL Server example)
using System.Data;
using Curiosity.Library;
using Dapper;
using Microsoft.Data.SqlClient;
[Node]
public class Customer
{
[Key] public string CustomerId { get; set; }
[Property] public string Name { get; set; }
[Property] public string Email { get; set; }
[Property] public string Country { get; set; }
[Timestamp] public DateTimeOffset UpdatedAt { get; set; }
}
// DTO matches the SELECT columns.
class CustomerRow
{
public int customer_id { get; set; }
public string name { get; set; }
public string email { get; set; }
public string country { get; set; }
public DateTime updated_at { get; set; }
}
using var graph = Graph.Connect(
endpoint: Environment.GetEnvironmentVariable("CURIOSITY_ENDPOINT")!,
token: Environment.GetEnvironmentVariable("CURIOSITY_TOKEN")!,
connectorName: "sql-customers");
await graph.CreateNodeSchemaAsync<Customer>();
graph.SetAutoCommitCost(everyNodes: 10_000);
var connString = Environment.GetEnvironmentVariable("SQL_CONNECTION")!;
using var conn = new SqlConnection(connString);
await conn.OpenAsync();
// Incremental sync: read everything updated since the last cursor.
var lastSync = ReadCheckpoint() ?? DateTimeOffset.MinValue;
const string sql = @"
SELECT customer_id, name, email, country, updated_at
FROM Customers
WHERE updated_at > @lastSync
ORDER BY updated_at ASC";
var ingested = 0;
// Stream rows — Dapper's `buffered: false` keeps memory bounded.
foreach (var row in await conn.QueryAsync<CustomerRow>(sql, new { lastSync }, buffered: false))
{
graph.AddOrUpdate(new Customer
{
CustomerId = row.customer_id.ToString(),
Name = row.name,
Email = row.email,
Country = row.country,
UpdatedAt = new DateTimeOffset(DateTime.SpecifyKind(row.updated_at, DateTimeKind.Utc)),
});
ingested++;
}
await graph.CommitPendingAsync();
// Save the new cursor for the next run.
WriteCheckpoint(DateTimeOffset.UtcNow);
Console.WriteLine($"Ingested {ingested} customers since {lastSync:o}");
// --- Trivial file-based checkpoint helpers; replace with S3/DB in production. ---
DateTimeOffset? ReadCheckpoint()
{
var p = "checkpoint.txt";
return File.Exists(p) ? DateTimeOffset.Parse(File.ReadAllText(p)) : null;
}
void WriteCheckpoint(DateTimeOffset value) => File.WriteAllText("checkpoint.txt", value.ToString("o"));
How it works
QueryAsync<T> with buffered: false (or Query<T> synchronously with buffered: false) returns an IEnumerable<T> that streams from the database connection one row at a time — memory stays flat. The updated_at > @lastSync predicate makes the connector incremental: each run only pulls rows that changed since the last cursor.
SetAutoCommitCost(everyNodes: 10_000) ensures we flush to Curiosity in batches without holding the entire result set in graph buffers either.
The [Key] is the source primary key, stringified — exactly what Idempotency recommends.
Linking related tables
Real-world ingest usually involves multiple tables. Define a [Node] per table and edge constants between them:
[Node] public class Customer { [Key] public string CustomerId { get; set; } /* ... */ }
[Node] public class Order { [Key] public string OrderId { get; set; } /* ... */ }
public static class Edges
{
public const string PlacedOrder = nameof(PlacedOrder);
public const string OrderedBy = nameof(OrderedBy);
}
Then in the order-ingest loop:
const string sql = @"
SELECT o.order_id, o.customer_id, o.total, o.placed_at
FROM Orders o
WHERE o.updated_at > @lastSync";
foreach (var row in await conn.QueryAsync<OrderRow>(sql, new { lastSync }, buffered: false))
{
var order = graph.AddOrUpdate(new Order { /* ... */ });
graph.Link(
Node.FromKey(nameof(Customer), row.customer_id.ToString()),
order,
Edges.PlacedOrder,
Edges.OrderedBy);
}
Node.FromKey(...) avoids a round-trip — the edge becomes active as soon as the Customer node is ingested (now or later). See Querying for when you'd fetch the linked node instead.
Provider differences
| Engine | Connection string | Parameter syntax | Notes |
|---|---|---|---|
| SQL Server | Server=...;Database=...;User Id=...;Password=...;TrustServerCertificate=true |
@param |
Use Microsoft.Data.SqlClient, not System.Data.SqlClient. |
| PostgreSQL | Host=...;Database=...;Username=...;Password=... |
@param |
Npgsql 6+ supports DateTimeOffset natively. |
| MySQL/MariaDB | Server=...;Database=...;User=...;Password=... |
@param |
MySqlConnector is the modern fork; prefer it over MySql.Data. |
| SQLite | Data Source=mydb.db |
@param |
Use parameterized LIMIT @batch OFFSET @offset for pagination instead of cursors. |
The rest of the code — Dapper.QueryAsync<T>, the connector loop, Curiosity.Library calls — is identical across providers.
Notes & pitfalls
- Time zones.
DateTimefrom a SQLDATETIME2is unspecified-kind. Force it to UTC (DateTime.SpecifyKind(..., DateTimeKind.Utc)) before constructing theDateTimeOffset, otherwise local-time offsets sneak in. buffered: falseis critical. The defaultbuffered: truematerializes the whole result set into a list before returning. On a 50M-row table that means OOM. Always passbuffered: falsefor streaming reads.- Cursor robustness. Storing
lastSyncas the wall-clock time of the run is almost correct — a row written during ingest with an earlierupdated_atthanlastSyncwill be missed on the next run. For strictness, storeMAX(updated_at)over the rows you actually ingested. - Idempotent re-runs. Because
AddOrUpdateis keyed on the source primary key, replaying a window ofupdated_atis safe — the graph just gets the same writes again. - Schema drift. Adding a column to the source is fine — the DTO ignores unknown columns. Removing one breaks the DTO; either drop the property or mark it nullable.
- Connection pool. ADO.NET pools connections per connection string. Keep a single
IGraphand a singleSqlConnectionfor the duration of the run.
See also
- Schemas —
[Node],[Key],[Property],[Timestamp]. - Ingestion —
AddOrUpdate/Link/Node.FromKey. - Idempotency — stable keys and cursor patterns.
- Performance — auto-commit thresholds, parallel ingestion across shards.
- Custom connector from scratch — end-to-end walkthrough with checkpointing.