JSON Connector
Stream a JSON file of records into the graph with constant memory, using System.Text.Json from the BCL. No extra NuGet package needed beyond Curiosity.Library itself — System.Text.Json ships with .NET 6 and later.
Packages
That's the only required package. System.Text.Json is part of the .NET BCL since net6.0. If you're targeting older runtimes, add it explicitly:
dotnet add package Curiosity.Library
Expected source shape
The example assumes a JSON array of objects:
[
{"id": "INV-001", "customer": "Acme", "total": 1290.00, "createdAt": "2025-11-03T08:11:00Z"},
{"id": "INV-002", "customer": "Globex", "total": 430.50, "createdAt": "2025-11-04T14:32:00Z"}
]
For newline-delimited JSON (NDJSON / JSONL — one object per line), see the NDJSON variant at the bottom of the page.
Connector code
using System.Text.Json;
using System.Text.Json.Serialization;
using Curiosity.Library;
[Node]
public class Invoice
{
[Key] public string Id { get; set; }
[Property] public string Customer { get; set; }
[Property] public double Total { get; set; }
[Timestamp] public DateTimeOffset CreatedAt { get; set; }
}
// JSON DTO that matches the source field names (camelCase).
class InvoiceDto
{
[JsonPropertyName("id")] public string Id { get; set; }
[JsonPropertyName("customer")] public string Customer { get; set; }
[JsonPropertyName("total")] public double Total { get; set; }
[JsonPropertyName("createdAt")] public DateTimeOffset CreatedAt { get; set; }
}
using var graph = Graph.Connect(
endpoint: Environment.GetEnvironmentVariable("CURIOSITY_ENDPOINT")!,
token: Environment.GetEnvironmentVariable("CURIOSITY_TOKEN")!,
connectorName: "json-invoices");
await graph.CreateNodeSchemaAsync<Invoice>();
graph.SetAutoCommitCost(everyNodes: 10_000);
var path = args.Length > 0 ? args[0] : "invoices.json";
using var stream = File.OpenRead(path);
var options = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
};
var ingested = 0;
await foreach (var dto in JsonSerializer.DeserializeAsyncEnumerable<InvoiceDto>(stream, options))
{
if (dto is null) continue;
graph.AddOrUpdate(new Invoice
{
Id = dto.Id,
Customer = dto.Customer,
Total = dto.Total,
CreatedAt = dto.CreatedAt,
});
ingested++;
}
await graph.CommitPendingAsync();
Console.WriteLine($"Ingested {ingested} invoices from {path}");
How it works
JsonSerializer.DeserializeAsyncEnumerable<T>(stream) reads the source one element at a time and yields each parsed object. Peak memory stays flat regardless of file size — the stream advances element-by-element. Combined with SetAutoCommitCost(everyNodes: 10_000), the connector flushes 10k nodes at a time and never holds the full source in memory.
AddOrUpdate makes the run idempotent: re-running with the same Id updates the existing node instead of duplicating it. See Idempotency for the rules around stable keys.
The DTO + POCO split is deliberate. The [Node] class is what the workspace stores; the DTO is what the source emits. Keeping them separate lets the source rename a field (createdAt → created_at) without touching the graph schema.
NDJSON variant
For one-object-per-line files (common for logs and data lake exports):
using var reader = new StreamReader(path);
string line;
while ((line = await reader.ReadLineAsync()) is not null)
{
if (string.IsNullOrWhiteSpace(line)) continue;
var dto = JsonSerializer.Deserialize<InvoiceDto>(line, options);
if (dto is null) continue;
graph.AddOrUpdate(new Invoice { /* ... */ });
}
await graph.CommitPendingAsync();
Same memory profile, slightly simpler — one parse per line.
Notes & pitfalls
- Source field names. Use
[JsonPropertyName]on the DTO if the JSON keys don't match your C# property names. Or setPropertyNamingPolicy = JsonNamingPolicy.CamelCaseonce and let the convention handle it. - Nullable / missing fields.
System.Text.Jsondefaults tonullfor missing properties on reference types. For value types, mark themNullable<T>(double? Total) if the field is genuinely optional. - Dates. ISO-8601 (
"2025-11-03T08:11:00Z") parses toDateTimeOffsetout of the box. Custom formats need aJsonConverter<DateTimeOffset>. - Huge single objects.
DeserializeAsyncEnumerableworks on arrays. If the file is one giant object with many nested arrays, useUtf8JsonReaderfor token-level streaming. - Pause indexing for backfills. For the initial load of millions of records, wrap the loop in
graph.PauseIndexing("backfill")/ResumeIndexing("backfill")— see Performance.
See also
- Schemas —
[Node],[Key],[Property],[Timestamp]. - Ingestion —
AddOrUpdate/TryAdd/Linkreference. - Performance — auto-commit thresholds, paused indexing for backfills.
- CSV connector — when the source is comma-separated instead.