High-throughput CSV: nodes + edges
The baseline CSV example streams one node type out of a flat file. This page is the version you reach for when the file is large (millions of rows) and each row should explode into several nodes and the edges between them — the shape most real exports have. It folds every ingestion-performance lever into one worked connector: streaming reads, an in-memory operation buffer, zero-fetch edges, deferred indexing, and idempotent re-runs.
The reference pages cover each lever on its own — Ingestion, Performance, Idempotency, Caching. Here they come together against a concrete source.
What you'll build
A denormalised support-case export — one row per case, with the device, manufacturer, and part repeated inline — turned into a connected graph:
case_id,opened_at,status,summary,device_model,manufacturer,part_number,part_name
SC-1001,2026-01-05T09:12:00Z,Open,Won't boot after update,MacBook Pro 14,Apple,A2779-BAT,Battery
SC-1002,2026-01-05T11:40:00Z,Closed,Cracked screen,MacBook Pro 14,Apple,A2779-LCD,Display
SC-1003,2026-01-06T08:05:00Z,Open,Fan noise,ThinkPad X1,Lenovo,X1-FAN-09,Cooling fan
Each row produces one mutable SupportCase node plus three reference nodes (Device, Manufacturer, Part) that recur across thousands of rows, wired together with bi-directional edges:
How the graph engine handles a bulk load
Four properties of the engine decide what "efficient" means here, and motivate every choice below.
- Identity is a hash of the key. A node's identity is a deterministic hash of its type and its
[Key]— the same key always maps to the same node, on every run and across every connector. Two rows namingApplecollapse onto oneManufacturernode automatically; there is no separate de-duplication pass to write, and re-running the connector overwrites in place instead of duplicating. - Writes are buffered, then committed in batches.
AddOrUpdate,TryAdd, andLinkdon't each make a network call — they append to an in-memory operation buffer. A commit ships the whole batch in one request. The connector's job is to control when commits happen, not to commit per record. - Edges are directed and bind by key.
Link(a, b, forward, reverse)records the relationship in both directions so traversal works from either end. The node handle returned byAddOrUpdate/TryAddand aNode.FromKey(type, key)are both by-key references that cost no round-trip; the edge becomes active once both endpoints are committed. Withunique: true(the default) re-linking the same pair is a no-op, so edges are idempotent too. - Indexing is asynchronous. A committed node isn't indexed synchronously — the engine queues it for full-text (BM25) and vector indexing on a background worker, which is what keeps writes fast. During a backfill you don't need fresh search until the end, so pausing indexing lets the import run without competing for CPU and I/O; the index catches up once on resume.
The connector
using System.Globalization;
using Curiosity.Library;
using CsvHelper;
using CsvHelper.Configuration;
using CsvHelper.Configuration.Attributes;
using var graph = Graph.Connect(
endpoint: Environment.GetEnvironmentVariable("CURIOSITY_ENDPOINT")!,
token: Environment.GetEnvironmentVariable("CURIOSITY_TOKEN")!,
connectorName: "csv-support-cases");
// Schemas are idempotent — safe to call on every run.
await graph.CreateNodeSchemaAsync<SupportCase>();
await graph.CreateNodeSchemaAsync<Device>();
await graph.CreateNodeSchemaAsync<Manufacturer>();
await graph.CreateNodeSchemaAsync<Part>();
await graph.CreateEdgeSchemaAsync(typeof(Edges));
// Flush the buffer roughly every 20k nodes instead of per-record.
graph.SetAutoCommitCost(everyNodes: 20_000);
var path = args.Length > 0 ? args[0] : "support-cases.csv";
var config = new CsvConfiguration(CultureInfo.InvariantCulture)
{
HasHeaderRecord = true,
MissingFieldFound = null, // tolerate extra columns
TrimOptions = TrimOptions.Trim,
};
// Defer search indexing until the whole file is in.
await graph.PauseIndexing("csv-backfill");
var ingested = 0;
try
{
using var reader = new StreamReader(path);
using var csv = new CsvReader(reader, config);
foreach (var row in csv.GetRecords<CaseRow>()) // lazy: one row in memory at a time
{
MapRow(graph, row);
if (++ingested % 100_000 == 0)
Console.WriteLine($"[{DateTimeOffset.UtcNow:HH:mm:ss}] mapped {ingested:N0} rows");
}
await graph.CommitPendingAsync(); // flush the tail
}
finally
{
await graph.ResumeIndexing("csv-backfill"); // index builds once, even on error
}
Console.WriteLine($"Done: {ingested:N0} rows ingested.");
// --- mapping: one flat row -> nodes + edges -------------------------------
static void MapRow(Graph graph, CaseRow row)
{
// Mutable source record -> AddOrUpdate keeps properties in sync on re-runs.
var supportCase = graph.AddOrUpdate(new SupportCase
{
Id = row.CaseId,
Summary = row.Summary,
Status = row.Status,
OpenedAt = row.OpenedAt,
});
// Reference data repeats across thousands of rows -> TryAdd writes once,
// identical keys merge onto the same node, and properties aren't rewritten.
var device = graph.TryAdd(new Device { Model = row.DeviceModel });
var manufacturer = graph.TryAdd(new Manufacturer { Name = row.Manufacturer });
var part = graph.TryAdd(new Part { Number = row.PartNumber, Name = row.PartName });
// Link by the handles we already hold — no fetch, no round-trip.
graph.Link(supportCase, device, Edges.ForDevice, Edges.HasSupportCase);
graph.Link(device, manufacturer, Edges.MadeBy, Edges.Makes);
graph.Link(device, part, Edges.HasPart, Edges.PartOf);
}
// --- schema ----------------------------------------------------------------
[Node]
public class SupportCase
{
[Key] public string Id { get; set; }
[Property] public string Summary { get; set; }
[Property] public string Status { get; set; }
[Timestamp] public DateTimeOffset OpenedAt { get; set; }
}
[Node]
public class Device { [Key] public string Model { get; set; } }
[Node]
public class Manufacturer { [Key] public string Name { get; set; } }
[Node]
public class Part
{
[Key] public string Number { get; set; }
[Property] public string Name { get; set; }
}
public static class Edges
{
public const string ForDevice = nameof(ForDevice); // SupportCase -> Device
public const string HasSupportCase = nameof(HasSupportCase); // Device -> SupportCase
public const string MadeBy = nameof(MadeBy); // Device -> Manufacturer
public const string Makes = nameof(Makes); // Manufacturer-> Device
public const string HasPart = nameof(HasPart); // Device -> Part
public const string PartOf = nameof(PartOf); // Part -> Device
}
// --- CSV row DTO: names match the header exactly ---------------------------
class CaseRow
{
[Name("case_id")] public string CaseId { get; set; }
[Name("opened_at")] public DateTimeOffset OpenedAt { get; set; }
[Name("status")] public string Status { get; set; }
[Name("summary")] public string Summary { get; set; }
[Name("device_model")] public string DeviceModel { get; set; }
[Name("manufacturer")] public string Manufacturer { get; set; }
[Name("part_number")] public string PartNumber { get; set; }
[Name("part_name")] public string PartName { get; set; }
}
Why it's fast
Five decisions in that file are what separate a connector that finishes a million rows in minutes from one that takes hours.
Stream the file, don't load it
Buffer writes, commit in batches
None of the AddOrUpdate / TryAdd / Link calls touch the network — they fill an in-memory buffer. SetAutoCommitCost(everyNodes: 20_000) flushes that buffer roughly every 20,000 nodes' worth of work, so commit overhead is amortised across the batch instead of paid per record. The threshold counts a node as far heavier than an edge (a node is ~1000× an edge's weight), and the default flushes at about 5,000 nodes — raise it for small records, lower it for large ones (long bodies, file contents).
Link by key, never fetch
The handles returned by AddOrUpdate and TryAdd are by-key references that cost nothing to reuse, so graph.Link(supportCase, device, …) adds an edge with no round-trip. When you need to link to a node you didn't create in the same scope — say one another connector owns — use Node.FromKey(nameof(Device), model) for the same zero-fetch effect. The slow anti-pattern is fetching first:
// SLOW — a query round-trip per edge just to get a node handle.
var result = await graph.QueryAsync(q =>
q.StartAt(Node.FromKey(nameof(Device), row.DeviceModel)).Emit("N"));
graph.Link(supportCase, result.GetEmitted("N").Single(), Edges.ForDevice, Edges.HasSupportCase);
That turns an O(rows) load into O(rows) round-trips. Reserve it for the rare case where you must read a property before deciding whether to link.
Pause indexing for the backfill
Indexing runs asynchronously after each commit. For a one-shot backfill you don't need fresh search until the end, so PauseIndexing lets the import run without the full-text and vector indexers competing for resources; the index rebuilds once on resume.
await graph.PauseIndexing("csv-backfill");
try { /* ingest everything */ await graph.CommitPendingAsync(); }
finally { await graph.ResumeIndexing("csv-backfill"); }
Always resume in a finally so a thrown exception can't leave search stale.
Pick TryAdd vs AddOrUpdate deliberately
SupportCase mutates between runs, so it's AddOrUpdate. Device, Manufacturer, and Part are reference data that recur on thousands of rows — TryAdd writes them once and skips the property write when they already exist, which is both correct and cheaper than re-AddOrUpdate-ing the same value a thousand times.
Indexing pauses expire on their own
On a server an indexing pause lapses after about 30 minutes as a safety net, so indexing can never get stuck. For a load that runs longer than that, chunk the file (or re-assert PauseIndexing periodically) rather than assuming one call covers a multi-hour run.
Re-runs: skip unchanged rows
Stable keys already make re-running safe — every row is an upsert, so a second run against an unchanged file produces an identical graph. To also skip the work of re-mapping rows that haven't changed (a daily CSV drop where only a few rows move), wrap the map in a HashCache:
using var cache = HashCache.Initialize("cache/csv-support-cases.db");
foreach (var row in csv.GetRecords<CaseRow>())
cache.IfNotCached(connectorVersion: 1, data: row, action: r => MapRow(graph, r));
await graph.CommitPendingAsync();
await cache.CommitPendingHashesAsync(); // only after the graph commit succeeds
A cache hit is a single lookup and no graph operations at all. See Caching for the commit-ordering rules and when the hash shape matters.
Going parallel
A single thread is usually enough — most connectors are bound by the source read, not by Curiosity. If profiling shows the map/commit side is the bottleneck and the CSV can be split, shard it by a stable key range so each worker owns disjoint keys and they can't race on the same node. See Performance → parallel ingestion for the connection-budget and write-conflict caveats.
Verify the load
From Manage → Shell:
// Node counts by type.
return Q().EmitSummary();
// A device with its cases, manufacturer, and parts.
return Q().StartAt(N.Device.Type, new[] { "MacBook Pro 14" })
.EmitNeighborsSummary();
Run the connector twice on the same file and confirm EmitSummary is unchanged — that's the idempotency check worth wiring into CI (see Idempotency).
Pitfalls
| Symptom | Cause | Fix |
|---|---|---|
| Memory climbs through the run | Buffer never flushed; auto-commit not set | SetAutoCommitCost(everyNodes: …) and CommitPendingAsync() at the end. |
| Reference nodes keep getting rewritten | AddOrUpdate used for Device/Manufacturer/Part |
Use TryAdd for data that doesn't change once seeded. |
| Load is slower than expected | A query round-trip per edge to resolve a node | Link by the returned handle or Node.FromKey — no round-trip. |
| Search stale after a long backfill | Pause expired mid-run, or ResumeIndexing skipped |
Chunk long loads; always resume in finally. Reindex from Settings → Search Index if needed. |
| Duplicates on re-run | [Key] derived from row number or a GUID |
Use a source identifier; hash immutable fields if there's none. See Idempotency. |
| Wrong numbers/dates | CSV culture not pinned | Set CultureInfo explicitly in CsvConfiguration. See the baseline CSV notes. |
See also
- CSV (baseline) — the minimal nodes-only starting point.
- Ingestion —
TryAdd/AddOrUpdate/Linkreference. - Performance — auto-commit, pause indexing, parallelism in isolation.
- Caching —
HashCachecommit ordering and hash shape. - Idempotency — stable keys, hashing immutable fields.
- Schemas —
[Node],[Key],[Property],[Timestamp], edge constants.