Custom connector from scratch
Build a connector that ingests data from a third-party REST API into Curiosity, end to end. Two parallel walkthroughs — C# (using Curiosity.Library) and Python (using the curiosity PyPI package) — so you can pick the runtime that matches your team.
By the end you'll have:
- A graph schema for
ArticleandAuthornodes plus aWroteedge. - An idempotent ingestion loop with a cursor and on-disk checkpoint.
- ACL mirroring from the source.
- An auto-commit policy that scales to large backfills.
Estimated time: 45–60 minutes.
Architecture
The same architecture in both languages — only the SDK calls differ.
Step 1 — provision a connector token
In the workspace UI:
- Settings → API integrations → + New connector.
- Name it
news-connector, choose Tabular / generic REST, click Save. - Copy the token. Store it in an env var (
CURIOSITY_TOKEN); never commit it.
You'll also need the workspace URL — https://workspace.example.com in the examples below.
Step 2 — define the schema
// Schema.cs
using Curiosity.Library;
namespace News.Schema;
[Node]
public class Author
{
[Key] public string Login { get; set; }
[Property] public string Name { get; set; }
}
[Node]
public class Article
{
[Key] public string Id { get; set; }
[Property] public string Title { get; set; }
[Property] public string Body { get; set; }
[Timestamp] public DateTimeOffset Published { get; set; }
}
public static class Edges
{
public const string Wrote = nameof(Wrote);
public const string WrittenBy = nameof(WrittenBy);
}
Step 3 — write the pure mapping function
The mapper takes a source record and turns it into graph operations. Keep it free of I/O so you can unit-test it in isolation.
static void Map(SafeGraph graph, SourceArticle src)
{
var author = graph.AddOrUpdate(new Author { Login = src.AuthorLogin, Name = src.AuthorName });
var article = graph.AddOrUpdate(new Article
{
Id = src.Id,
Title = src.Title,
Body = src.Body,
Published = src.Published,
});
graph.Link(author, article, Edges.Wrote, Edges.WrittenBy);
}
Step 4 — drive the loop with checkpointing
using System.Text.Json;
using Curiosity.Library;
using News.Schema;
const string CheckpointPath = "./checkpoint.json";
DateTimeOffset lastSync = File.Exists(CheckpointPath)
? JsonSerializer.Deserialize<DateTimeOffset>(File.ReadAllText(CheckpointPath))
: DateTimeOffset.MinValue;
using var graph = Graph.Connect(
endpoint: Environment.GetEnvironmentVariable("CURIOSITY_ENDPOINT")!,
token: Environment.GetEnvironmentVariable("CURIOSITY_TOKEN")!,
connectorName: "news-connector");
// One-time schema bootstrap — safe on every run.
await graph.CreateNodeSchemaAsync<Author>();
await graph.CreateNodeSchemaAsync<Article>();
await graph.CreateEdgeSchemaAsync(typeof(Edges));
graph.SetAutoCommitCost(everyNodes: 5_000);
using var http = new HttpClient { BaseAddress = new Uri("https://news.example.com/") };
while (true)
{
var page = await http.GetFromJsonAsync<SourcePage>(
$"/api/articles?since={lastSync:o}&limit=500");
if (page is null || page.Items.Count == 0) break;
foreach (var record in page.Items)
Map(graph, record);
await graph.CommitPendingAsync();
lastSync = page.Items.Max(r => r.Published);
File.WriteAllText(CheckpointPath, JsonSerializer.Serialize(lastSync));
}
Console.WriteLine($"Ingest complete; cursor at {lastSync:o}");
The two things that matter for crash safety:
- Commit before persisting the cursor. If the cursor lands but the commit hasn't, a crash drops data.
- Persist the cursor on disk, not in memory. A restarted pod must pick up where it left off.
Step 5 — mirror source ACLs (optional but recommended)
If the source has per-article visibility, encode it as restrictions while ingesting. This works the same in both languages — different method names, same shape:
var team = await graph.CreateTeamAsync("Newsroom");
graph.RestrictAccessToTeam(articleNode, team);
foreach (var watcherLogin in src.ConfidentialReaders)
{
var reader = await graph.CreateUserAsync(watcherLogin, /* ... */);
graph.RestrictAccessToUser(articleNode, reader);
}
See Permission-aware search for the query-time half of this pattern.
Step 6 — package and schedule
Two common deployment shapes:
- One-shot CLI — package as a binary (C#) or container (Python) and trigger it from your scheduler (cron, GitHub Actions, Airflow). Each run picks up the cursor from disk or a shared object store. Recommended when the data source has a clean "since" parameter.
- In-workspace scheduled task — embed the connector as a scheduled task (docs). Runs inside the workspace process; checkpoints can live as a graph node. Recommended when you want the connector logs to flow through the workspace's normal monitoring.
Validation queries
After the first run, sanity-check the graph from Management → Shell:
return new {
Authors = Q().StartAt(nameof(Author )).Count(),
Articles = Q().StartAt(nameof(Article)).Count(),
Orphans = Q().StartAt(nameof(Article))
.Where(a => a.In(Edges.Wrote).Count() == 0)
.Count(),
};
The Orphans count should be zero — every article must have an author edge.
What to take away
- A connector is schema bootstrap + pure mapping + cursor loop with checkpointing, in that order.
- Idempotency rests on stable keys plus
AddOrUpdate/TryAdd. - ACLs are mirrored at ingest time, not at query time.
- Pick a runtime based on team familiarity — the SDK shape is identical between C# and Python.
Cross-links
- Connector templates — starter projects per source type
- Schema design — modeling guidance
- Ingestion tutorial — a richer, ACL-aware example
- Custom endpoint from scratch
- Data connector reference