Parquet Connector
Stream Apache Parquet files into the graph row-by-row using Parquet.Net. Parquet is columnar and compressed — a single file is often hundreds of millions of rows. Parquet.Net reads one row group at a time, so memory stays bounded regardless of file size.
Packages
Curiosity.Library on NuGet Parquet.Net on NuGet
dotnet add package Curiosity.Library
dotnet add package Parquet.Net
Expected source shape
A Parquet file whose schema includes the columns you want to ingest. For this example, transactions.parquet is assumed to have:
| Column | Parquet type | C# type |
|---|---|---|
tx_id |
BYTE_ARRAY (UTF-8) |
string |
customer |
BYTE_ARRAY (UTF-8) |
string |
amount_cents |
INT64 |
long |
occurred_at |
INT64 (TIMESTAMP_MILLIS) |
DateTimeOffset |
Connector code
using Curiosity.Library;
using Parquet;
using Parquet.Serialization;
[Node]
public class Transaction
{
[Key] public string TxId { get; set; }
[Property] public string Customer { get; set; }
[Property] public long AmountCents { get; set; }
[Timestamp] public DateTimeOffset OccurredAt { get; set; }
}
// DTO with property names that match the Parquet column names.
class TransactionRow
{
public string tx_id { get; set; }
public string customer { get; set; }
public long amount_cents { get; set; }
public DateTime occurred_at { get; set; }
}
using var graph = Graph.Connect(
endpoint: Environment.GetEnvironmentVariable("CURIOSITY_ENDPOINT")!,
token: Environment.GetEnvironmentVariable("CURIOSITY_TOKEN")!,
connectorName: "parquet-transactions");
await graph.CreateNodeSchemaAsync<Transaction>();
graph.SetAutoCommitCost(everyNodes: 10_000);
var path = args.Length > 0 ? args[0] : "transactions.parquet";
using var stream = File.OpenRead(path);
var ingested = 0;
// ParquetSerializer.DeserializeAllAsync streams the file row-group by row-group.
await foreach (var row in ParquetSerializer.DeserializeAllAsync<TransactionRow>(stream))
{
graph.AddOrUpdate(new Transaction
{
TxId = row.tx_id,
Customer = row.customer,
AmountCents = row.amount_cents,
OccurredAt = new DateTimeOffset(DateTime.SpecifyKind(row.occurred_at, DateTimeKind.Utc)),
});
ingested++;
}
await graph.CommitPendingAsync();
Console.WriteLine($"Ingested {ingested} transactions from {path}");
How it works
ParquetSerializer.DeserializeAllAsync<T>(stream) opens the file, reads one row group at a time, materializes each row into the DTO, and yields it. Only one row group worth of bytes is in memory at any moment — typically 64–512 MB depending on writer settings.
The DTO uses lowercase/snake_case property names to match Parquet conventions. The [Node] POCO keeps idiomatic C# names. If your Parquet writer produces PascalCase columns, you can drop the DTO and deserialize directly into the [Node] type.
Reading specific columns only
For wide schemas you usually want only a subset of columns. Parquet's columnar layout means unused columns are never read from disk — but you still need the DTO to mention only the columns you want:
class TransactionSlim
{
public string tx_id { get; set; }
public long amount_cents { get; set; }
}
await foreach (var row in ParquetSerializer.DeserializeAllAsync<TransactionSlim>(stream))
{
// only tx_id and amount_cents are read from disk
}
This is what makes Parquet faster than CSV/JSON on wide rows: the file is laid out in columns, so unread columns cost nothing.
Notes & pitfalls
- Timestamp types. Parquet has multiple time encodings (
TIMESTAMP_MILLIS,TIMESTAMP_MICROS,INT96). Parquet.Net surfaces them asDateTime(UTC). Wrap inDateTimeOffsetfor storage — the workspace's[Timestamp]expectsDateTimeOffset. - Nullable columns. A nullable Parquet column maps to
T?in C# (long? amount_cents). Don't make it non-nullable or you'll getNullReferenceExceptionmid-stream. - Logical types. Decimals, dates, UUIDs are logical types on top of physical types. Parquet.Net handles
DECIMAL→decimalcorrectly; check the type mapping table for the rest. - Row group size vs auto-commit. If row groups are large (say 1M rows each), set
SetAutoCommitCost(everyNodes: 50_000)or higher to amortize the HTTP round-trip across more rows. - Partitioned directories. A Spark/Athena/Hive export usually produces many
part-*.parquetfiles in a directory. Glob them withDirectory.EnumerateFiles(path, "*.parquet")and run the same loop per file.
Notes on Parquet.Net versions
Parquet.Net 4.x introduced the ParquetSerializer shown above. On 3.x the API is lower-level (ParquetReader + Column arrays). If you're stuck on 3.x, upgrade — the 4.x serializer is much cleaner.
See also
- Schemas —
[Node],[Key],[Property],[Timestamp]. - Performance — auto-commit thresholds, paused indexing, large-file streaming.
- CSV connector — the row-oriented alternative.
- Parquet.Net documentation — full library reference.