High-throughput CSV: nodes + edges

The baseline CSV example streams one node type out of a flat file. This page is the version you reach for when the file is large (millions of rows) and each row should explode into several nodes and the edges between them — the shape most real exports have. It folds every ingestion-performance lever into one worked connector: streaming reads, an in-memory operation buffer, zero-fetch edges, deferred indexing, and idempotent re-runs.

The reference pages cover each lever on its own — Ingestion, Performance, Idempotency, Caching. Here they come together against a concrete source.

What you'll build

A denormalised support-case export — one row per case, with the device, manufacturer, and part repeated inline — turned into a connected graph:

support-cases.csv
case_id,opened_at,status,summary,device_model,manufacturer,part_number,part_name
SC-1001,2026-01-05T09:12:00Z,Open,Won't boot after update,MacBook Pro 14,Apple,A2779-BAT,Battery
SC-1002,2026-01-05T11:40:00Z,Closed,Cracked screen,MacBook Pro 14,Apple,A2779-LCD,Display
SC-1003,2026-01-06T08:05:00Z,Open,Fan noise,ThinkPad X1,Lenovo,X1-FAN-09,Cooling fan

Each row produces one mutable SupportCase node plus three reference nodes (Device, Manufacturer, Part) that recur across thousands of rows, wired together with bi-directional edges:

flowchart LR CSV[(CSV file)] -->|stream row-by-row| Row[CaseRow] Row --> Map{Map row} Map -->|AddOrUpdate| SC[SupportCase] Map -->|TryAdd| Ref[Device · Manufacturer · Part] Map -->|Link by key| E[Edges] SC --> Buf[(Pending buffer)] Ref --> Buf E --> Buf Buf -->|auto-commit every N nodes| Graph[(Graph)] Graph -. indexing paused .-> Idx[Search index] Idx -. builds once on resume .-> Graph

How the graph engine handles a bulk load

Four properties of the engine decide what "efficient" means here, and motivate every choice below.

  • Identity is a hash of the key. A node's identity is a deterministic hash of its type and its [Key] — the same key always maps to the same node, on every run and across every connector. Two rows naming Apple collapse onto one Manufacturer node automatically; there is no separate de-duplication pass to write, and re-running the connector overwrites in place instead of duplicating.
  • Writes are buffered, then committed in batches. AddOrUpdate, TryAdd, and Link don't each make a network call — they append to an in-memory operation buffer. A commit ships the whole batch in one request. The connector's job is to control when commits happen, not to commit per record.
  • Edges are directed and bind by key. Link(a, b, forward, reverse) records the relationship in both directions so traversal works from either end. The node handle returned by AddOrUpdate/TryAdd and a Node.FromKey(type, key) are both by-key references that cost no round-trip; the edge becomes active once both endpoints are committed. With unique: true (the default) re-linking the same pair is a no-op, so edges are idempotent too.
  • Indexing is asynchronous. A committed node isn't indexed synchronously — the engine queues it for full-text (BM25) and vector indexing on a background worker, which is what keeps writes fast. During a backfill you don't need fresh search until the end, so pausing indexing lets the import run without competing for CPU and I/O; the index catches up once on resume.

The connector

Program.cs
using System.Globalization;
using Curiosity.Library;
using CsvHelper;
using CsvHelper.Configuration;
using CsvHelper.Configuration.Attributes;

using var graph = Graph.Connect(
    endpoint:      Environment.GetEnvironmentVariable("CURIOSITY_ENDPOINT")!,
    token:         Environment.GetEnvironmentVariable("CURIOSITY_TOKEN")!,
    connectorName: "csv-support-cases");

// Schemas are idempotent — safe to call on every run.
await graph.CreateNodeSchemaAsync<SupportCase>();
await graph.CreateNodeSchemaAsync<Device>();
await graph.CreateNodeSchemaAsync<Manufacturer>();
await graph.CreateNodeSchemaAsync<Part>();
await graph.CreateEdgeSchemaAsync(typeof(Edges));

// Flush the buffer roughly every 20k nodes instead of per-record.
graph.SetAutoCommitCost(everyNodes: 20_000);

var path = args.Length > 0 ? args[0] : "support-cases.csv";

var config = new CsvConfiguration(CultureInfo.InvariantCulture)
{
    HasHeaderRecord   = true,
    MissingFieldFound = null,            // tolerate extra columns
    TrimOptions       = TrimOptions.Trim,
};

// Defer search indexing until the whole file is in.
await graph.PauseIndexing("csv-backfill");

var ingested = 0;
try
{
    using var reader = new StreamReader(path);
    using var csv    = new CsvReader(reader, config);

    foreach (var row in csv.GetRecords<CaseRow>())   // lazy: one row in memory at a time
    {
        MapRow(graph, row);

        if (++ingested % 100_000 == 0)
            Console.WriteLine($"[{DateTimeOffset.UtcNow:HH:mm:ss}] mapped {ingested:N0} rows");
    }

    await graph.CommitPendingAsync();    // flush the tail
}
finally
{
    await graph.ResumeIndexing("csv-backfill");   // index builds once, even on error
}

Console.WriteLine($"Done: {ingested:N0} rows ingested.");

// --- mapping: one flat row -> nodes + edges -------------------------------

static void MapRow(Graph graph, CaseRow row)
{
    // Mutable source record -> AddOrUpdate keeps properties in sync on re-runs.
    var supportCase = graph.AddOrUpdate(new SupportCase
    {
        Id       = row.CaseId,
        Summary  = row.Summary,
        Status   = row.Status,
        OpenedAt = row.OpenedAt,
    });

    // Reference data repeats across thousands of rows -> TryAdd writes once,
    // identical keys merge onto the same node, and properties aren't rewritten.
    var device       = graph.TryAdd(new Device       { Model  = row.DeviceModel  });
    var manufacturer = graph.TryAdd(new Manufacturer { Name   = row.Manufacturer });
    var part         = graph.TryAdd(new Part         { Number = row.PartNumber, Name = row.PartName });

    // Link by the handles we already hold — no fetch, no round-trip.
    graph.Link(supportCase, device,       Edges.ForDevice, Edges.HasSupportCase);
    graph.Link(device,      manufacturer, Edges.MadeBy,    Edges.Makes);
    graph.Link(device,      part,         Edges.HasPart,   Edges.PartOf);
}

// --- schema ----------------------------------------------------------------

[Node]
public class SupportCase
{
    [Key]       public string         Id       { get; set; }
    [Property]  public string         Summary  { get; set; }
    [Property]  public string         Status   { get; set; }
    [Timestamp] public DateTimeOffset OpenedAt { get; set; }
}

[Node]
public class Device       { [Key] public string Model  { get; set; } }

[Node]
public class Manufacturer { [Key] public string Name   { get; set; } }

[Node]
public class Part
{
    [Key]      public string Number { get; set; }
    [Property] public string Name   { get; set; }
}

public static class Edges
{
    public const string ForDevice      = nameof(ForDevice);       // SupportCase -> Device
    public const string HasSupportCase = nameof(HasSupportCase);  // Device      -> SupportCase
    public const string MadeBy         = nameof(MadeBy);          // Device      -> Manufacturer
    public const string Makes          = nameof(Makes);           // Manufacturer-> Device
    public const string HasPart        = nameof(HasPart);         // Device      -> Part
    public const string PartOf         = nameof(PartOf);          // Part        -> Device
}

// --- CSV row DTO: names match the header exactly ---------------------------

class CaseRow
{
    [Name("case_id")]      public string         CaseId       { get; set; }
    [Name("opened_at")]    public DateTimeOffset OpenedAt     { get; set; }
    [Name("status")]       public string         Status       { get; set; }
    [Name("summary")]      public string         Summary      { get; set; }
    [Name("device_model")] public string         DeviceModel  { get; set; }
    [Name("manufacturer")] public string         Manufacturer { get; set; }
    [Name("part_number")]  public string         PartNumber   { get; set; }
    [Name("part_name")]    public string         PartName     { get; set; }
}

Why it's fast

Five decisions in that file are what separate a connector that finishes a million rows in minutes from one that takes hours.

1

Stream the file, don't load it

csv.GetRecords<CaseRow>() returns a lazy IEnumerable — CsvHelper reads one row, materialises the DTO, yields it, and discards it before the next. Peak memory stays flat regardless of file size. (For JSON or Parquet sources the equivalent streaming readers are in JSON and Parquet.)

2

Buffer writes, commit in batches

None of the AddOrUpdate / TryAdd / Link calls touch the network — they fill an in-memory buffer. SetAutoCommitCost(everyNodes: 20_000) flushes that buffer roughly every 20,000 nodes' worth of work, so commit overhead is amortised across the batch instead of paid per record. The threshold counts a node as far heavier than an edge (a node is ~1000× an edge's weight), and the default flushes at about 5,000 nodes — raise it for small records, lower it for large ones (long bodies, file contents).

3

Link by key, never fetch

The handles returned by AddOrUpdate and TryAdd are by-key references that cost nothing to reuse, so graph.Link(supportCase, device, …) adds an edge with no round-trip. When you need to link to a node you didn't create in the same scope — say one another connector owns — use Node.FromKey(nameof(Device), model) for the same zero-fetch effect. The slow anti-pattern is fetching first:

// SLOW — a query round-trip per edge just to get a node handle.
var result = await graph.QueryAsync(q =>
q.StartAt(Node.FromKey(nameof(Device), row.DeviceModel)).Emit("N"));
graph.Link(supportCase, result.GetEmitted("N").Single(), Edges.ForDevice, Edges.HasSupportCase);

That turns an O(rows) load into O(rows) round-trips. Reserve it for the rare case where you must read a property before deciding whether to link.

4

Pause indexing for the backfill

Indexing runs asynchronously after each commit. For a one-shot backfill you don't need fresh search until the end, so PauseIndexing lets the import run without the full-text and vector indexers competing for resources; the index rebuilds once on resume.

await graph.PauseIndexing("csv-backfill");
try     { /* ingest everything */  await graph.CommitPendingAsync(); }
finally { await graph.ResumeIndexing("csv-backfill"); }

Always resume in a finally so a thrown exception can't leave search stale.

5

Pick TryAdd vs AddOrUpdate deliberately

SupportCase mutates between runs, so it's AddOrUpdate. Device, Manufacturer, and Part are reference data that recur on thousands of rows — TryAdd writes them once and skips the property write when they already exist, which is both correct and cheaper than re-AddOrUpdate-ing the same value a thousand times.

Indexing pauses expire on their own

On a server an indexing pause lapses after about 30 minutes as a safety net, so indexing can never get stuck. For a load that runs longer than that, chunk the file (or re-assert PauseIndexing periodically) rather than assuming one call covers a multi-hour run.

Re-runs: skip unchanged rows

Stable keys already make re-running safe — every row is an upsert, so a second run against an unchanged file produces an identical graph. To also skip the work of re-mapping rows that haven't changed (a daily CSV drop where only a few rows move), wrap the map in a HashCache:

using var cache = HashCache.Initialize("cache/csv-support-cases.db");

foreach (var row in csv.GetRecords<CaseRow>())
    cache.IfNotCached(connectorVersion: 1, data: row, action: r => MapRow(graph, r));

await graph.CommitPendingAsync();
await cache.CommitPendingHashesAsync();   // only after the graph commit succeeds

A cache hit is a single lookup and no graph operations at all. See Caching for the commit-ordering rules and when the hash shape matters.

Going parallel

A single thread is usually enough — most connectors are bound by the source read, not by Curiosity. If profiling shows the map/commit side is the bottleneck and the CSV can be split, shard it by a stable key range so each worker owns disjoint keys and they can't race on the same node. See Performance → parallel ingestion for the connection-budget and write-conflict caveats.

Verify the load

From Manage → Shell:

// Node counts by type.
return Q().EmitSummary();

// A device with its cases, manufacturer, and parts.
return Q().StartAt(N.Device.Type, new[] { "MacBook Pro 14" })
          .EmitNeighborsSummary();

Run the connector twice on the same file and confirm EmitSummary is unchanged — that's the idempotency check worth wiring into CI (see Idempotency).

Pitfalls

Symptom Cause Fix
Memory climbs through the run Buffer never flushed; auto-commit not set SetAutoCommitCost(everyNodes: …) and CommitPendingAsync() at the end.
Reference nodes keep getting rewritten AddOrUpdate used for Device/Manufacturer/Part Use TryAdd for data that doesn't change once seeded.
Load is slower than expected A query round-trip per edge to resolve a node Link by the returned handle or Node.FromKey — no round-trip.
Search stale after a long backfill Pause expired mid-run, or ResumeIndexing skipped Chunk long loads; always resume in finally. Reindex from Settings → Search Index if needed.
Duplicates on re-run [Key] derived from row number or a GUID Use a source identifier; hash immutable fields if there's none. See Idempotency.
Wrong numbers/dates CSV culture not pinned Set CultureInfo explicitly in CsvConfiguration. See the baseline CSV notes.

See also

  • CSV (baseline) — the minimal nodes-only starting point.
  • IngestionTryAdd / AddOrUpdate / Link reference.
  • Performance — auto-commit, pause indexing, parallelism in isolation.
  • CachingHashCache commit ordering and hash shape.
  • Idempotency — stable keys, hashing immutable fields.
  • Schemas[Node], [Key], [Property], [Timestamp], edge constants.
© 2026 Curiosity. All rights reserved.