Curiosity

Custom connector from scratch

Build a connector that ingests data from a third-party REST API into Curiosity, end to end. Two parallel walkthroughs — C# (using Curiosity.Library) and Python (using the curiosity PyPI package) — so you can pick the runtime that matches your team.

By the end you'll have:

  • A graph schema for Article and Author nodes plus a Wrote edge.
  • An idempotent ingestion loop with a cursor and on-disk checkpoint.
  • ACL mirroring from the source.
  • An auto-commit policy that scales to large backfills.

Estimated time: 45–60 minutes.

Architecture

flowchart LR SourceAPI[(Source API)] -->|paged GET| Loop["Cursor loop<br/>(your code)"] Loop --> Map["Map record<br/>→ nodes + edges + ACL"] Map -->|TryAdd / AddOrUpdate / Link| Graph[(Curiosity graph)] Map -->|RestrictAccessToTeam| Graph Loop --> Cp[(Checkpoint file)] Graph -.->|max(UpdatedAt)| Cp

The same architecture in both languages — only the SDK calls differ.

Step 1 — provision a connector token

In the workspace UI:

  1. Settings → API integrations → + New connector.
  2. Name it news-connector, choose Tabular / generic REST, click Save.
  3. Copy the token. Store it in an env var (CURIOSITY_TOKEN); never commit it.

You'll also need the workspace URL — https://workspace.example.com in the examples below.

Step 2 — define the schema

// Schema.cs
using Curiosity.Library;

namespace News.Schema;

[Node]
public class Author
{
[Key]      public string Login { get; set; }
[Property] public string Name  { get; set; }
}

[Node]
public class Article
{
[Key]       public string         Id      { get; set; }
[Property]  public string         Title   { get; set; }
[Property]  public string         Body    { get; set; }
[Timestamp] public DateTimeOffset Published { get; set; }
}

public static class Edges
{
public const string Wrote   = nameof(Wrote);
public const string WrittenBy = nameof(WrittenBy);
}

Step 3 — write the pure mapping function

The mapper takes a source record and turns it into graph operations. Keep it free of I/O so you can unit-test it in isolation.

static void Map(SafeGraph graph, SourceArticle src)
{
var author = graph.AddOrUpdate(new Author { Login = src.AuthorLogin, Name = src.AuthorName });

var article = graph.AddOrUpdate(new Article
{
Id        = src.Id,
Title     = src.Title,
Body      = src.Body,
Published = src.Published,
});

graph.Link(author, article, Edges.Wrote, Edges.WrittenBy);
}

Step 4 — drive the loop with checkpointing

using System.Text.Json;
using Curiosity.Library;
using News.Schema;

const string CheckpointPath = "./checkpoint.json";

DateTimeOffset lastSync = File.Exists(CheckpointPath)
? JsonSerializer.Deserialize<DateTimeOffset>(File.ReadAllText(CheckpointPath))
: DateTimeOffset.MinValue;

using var graph = Graph.Connect(
endpoint:      Environment.GetEnvironmentVariable("CURIOSITY_ENDPOINT")!,
token:         Environment.GetEnvironmentVariable("CURIOSITY_TOKEN")!,
connectorName: "news-connector");

// One-time schema bootstrap — safe on every run.
await graph.CreateNodeSchemaAsync<Author>();
await graph.CreateNodeSchemaAsync<Article>();
await graph.CreateEdgeSchemaAsync(typeof(Edges));

graph.SetAutoCommitCost(everyNodes: 5_000);

using var http = new HttpClient { BaseAddress = new Uri("https://news.example.com/") };

while (true)
{
var page = await http.GetFromJsonAsync<SourcePage>(
$"/api/articles?since={lastSync:o}&limit=500");

if (page is null || page.Items.Count == 0) break;

foreach (var record in page.Items)
Map(graph, record);

await graph.CommitPendingAsync();

lastSync = page.Items.Max(r => r.Published);
File.WriteAllText(CheckpointPath, JsonSerializer.Serialize(lastSync));
}

Console.WriteLine($"Ingest complete; cursor at {lastSync:o}");

The two things that matter for crash safety:

  • Commit before persisting the cursor. If the cursor lands but the commit hasn't, a crash drops data.
  • Persist the cursor on disk, not in memory. A restarted pod must pick up where it left off.

If the source has per-article visibility, encode it as restrictions while ingesting. This works the same in both languages — different method names, same shape:

var team = await graph.CreateTeamAsync("Newsroom");
graph.RestrictAccessToTeam(articleNode, team);

foreach (var watcherLogin in src.ConfidentialReaders)
{
var reader = await graph.CreateUserAsync(watcherLogin, /* ... */);
graph.RestrictAccessToUser(articleNode, reader);
}

See Permission-aware search for the query-time half of this pattern.

Step 6 — package and schedule

Two common deployment shapes:

  1. One-shot CLI — package as a binary (C#) or container (Python) and trigger it from your scheduler (cron, GitHub Actions, Airflow). Each run picks up the cursor from disk or a shared object store. Recommended when the data source has a clean "since" parameter.
  2. In-workspace scheduled task — embed the connector as a scheduled task (docs). Runs inside the workspace process; checkpoints can live as a graph node. Recommended when you want the connector logs to flow through the workspace's normal monitoring.

Validation queries

After the first run, sanity-check the graph from Management → Shell:

return new {
    Authors  = Q().StartAt(nameof(Author )).Count(),
    Articles = Q().StartAt(nameof(Article)).Count(),
    Orphans  = Q().StartAt(nameof(Article))
                  .Where(a => a.In(Edges.Wrote).Count() == 0)
                  .Count(),
};

The Orphans count should be zero — every article must have an author edge.

What to take away

  • A connector is schema bootstrap + pure mapping + cursor loop with checkpointing, in that order.
  • Idempotency rests on stable keys plus AddOrUpdate / TryAdd.
  • ACLs are mirrored at ingest time, not at query time.
  • Pick a runtime based on team familiarity — the SDK shape is identical between C# and Python.
© 2026 Curiosity. All rights reserved.
Powered by Neko