Data Storage & Management Flow
Overview
The pipeline fetches raw data over HTTP, parses into structured packets, normalizes schemas, buffers rows for chunked flushes, and persists to a user-connected database (DuckDB by default).
Failures during chunked writes are handled via a Dead Letter Queue (DLQ) with Arrow IPC artifacts and a per-packet rescue path to maximize successful persistence.
Observability includes structured logs, counters, histograms, and a summary; data-loss prevention policies ensure that failed data is retained and recoverable.
Request → Parse
Jobs are generated by the Router and scheduled in a priority queue.
Each job goes through rate limiting and retry (with exponential backoff) before HTTP execution.
The response bytes are parsed into FramePackets by the provider-specific Router.
Normalize
The Mapper enforces target schema: column presence, types, and provider-specific transformations.
Normalized FramePackets advance to the writer queue.
Buffer & Flush
Writer worker maintains per-table buffers for adaptive batching.
When buffer row-count reaches the flush threshold, frames are merged:
Vertical concat with rechunk; diagonal concat fallback for flexible schemas.
Primary key enforcement: missing or null PK columns raise errors before writing.
Persist to DB
DuckDBWriter performs transactional upserts, returning WriteResult per packet/bulk.
Stateless write per packet; bulk write returns one result per input packet.
Failure Handling: DLQ + Rescue
If flush fails, the pipeline:
Attempts per-packet writes to salvage healthy packets.
Spools only failed packets to DLQ as Arrow IPC.
Records errors with context and structured logs.
DLQ IPC is written atomically:
Write to a temp file in the same directory, fsync, then os.replace to final path.
File permissions are restricted (chmod 600) when possible.
Consecutive failure threshold:
Stops rescue after N consecutive failures and records a summary for remaining packets.
DLQ Layout
Base: $VERTEXFORAGER_ROOT/cache/dlq/{table}/
Artifact: batch_{time_ns}.ipc
Contents: Polars DataFrame representing failed packets merged for that table.
Read with pl.read_ipc(path) to reinject or analyze.
Observability
Structured logs with key-value formatting and stages:
http_start, http_retry_reason:*, http_end
write_flush, dlq_spooled, dlq_rescued_{n}, dlq_remaining_{n}
Counters and histograms tracked for durations and rows:
fetch_duration_s, parse_duration_s, http_duration_s, writer_flush_duration_s
rows_written_total, errors_total
Cleanup & Retention
Temp files .ipc.tmp are removed in two ways:
On-error cleanup: when DLQ spool fails after writing temp, the temp is deleted before recording DLQSpoolError.
Periodic cleanup: at run start, stale temp files older than retention are removed.
Retention is handled by internal defaults at run start.
Recovery Workflow
Identify DLQ artifacts under $VERTEXFORAGER_ROOT/cache/dlq/{table}/batch_*.ipc.
For each artifact:
Open with pl.read_ipc(path) and re-map if schema evolved.
Write via the writer’s standard write() or write_bulk() to reinject.
Back to top