Skip to content

Data Storage & Management Flow

Overview

  • The pipeline fetches raw data over HTTP, parses into structured packets, normalizes schemas, buffers rows for chunked flushes, and persists to a user-connected database (DuckDB by default).
  • Failures during chunked writes are handled via a Dead Letter Queue (DLQ) with Arrow IPC artifacts and a per-packet rescue path to maximize successful persistence.
  • Observability includes structured logs, counters, histograms, and a summary; data-loss prevention policies ensure that failed data is retained and recoverable.

Request → Parse

  • Jobs are generated by the Router and scheduled in a priority queue.
  • Each job goes through rate limiting and retry (with exponential backoff) before HTTP execution.
  • The response bytes are parsed into FramePackets by the provider-specific Router.

Normalize

  • The Mapper enforces target schema: column presence, types, and provider-specific transformations.
  • Normalized FramePackets advance to the writer queue.

Buffer & Flush

  • Writer worker maintains per-table buffers for adaptive batching.
  • When buffer row-count reaches the flush threshold, frames are merged:
  • Vertical concat with rechunk; diagonal concat fallback for flexible schemas.
  • Primary key enforcement: missing or null PK columns raise errors before writing.

Persist to DB

  • DuckDBWriter performs transactional upserts, returning WriteResult per packet/bulk.
  • Stateless write per packet; bulk write returns one result per input packet.

Failure Handling: DLQ + Rescue

  • If flush fails, the pipeline:
  • Attempts per-packet writes to salvage healthy packets.
  • Spools only failed packets to DLQ as Arrow IPC.
  • Records errors with context and structured logs.
  • DLQ IPC is written atomically:
  • Write to a temp file in the same directory, fsync, then os.replace to final path.
  • File permissions are restricted (chmod 600) when possible.
  • Consecutive failure threshold:
  • Stops rescue after N consecutive failures and records a summary for remaining packets.

DLQ Layout

  • Base: $VERTEXFORAGER_ROOT/cache/dlq/{table}/
  • Artifact: batch_{time_ns}.ipc
  • Contents: Polars DataFrame representing failed packets merged for that table.
  • Read with pl.read_ipc(path) to reinject or analyze.

Observability

  • Structured logs with key-value formatting and stages:
  • http_start, http_retry_reason:*, http_end
  • write_flush, dlq_spooled, dlq_rescued_{n}, dlq_remaining_{n}
  • Counters and histograms tracked for durations and rows:
  • fetch_duration_s, parse_duration_s, http_duration_s, writer_flush_duration_s
  • rows_written_total, errors_total

Cleanup & Retention

  • Temp files .ipc.tmp are removed in two ways:
  • On-error cleanup: when DLQ spool fails after writing temp, the temp is deleted before recording DLQSpoolError.
  • Periodic cleanup: at run start, stale temp files older than retention are removed.
  • Retention is handled by internal defaults at run start.

Recovery Workflow

  • Identify DLQ artifacts under $VERTEXFORAGER_ROOT/cache/dlq/{table}/batch_*.ipc.
  • For each artifact:
  • Open with pl.read_ipc(path) and re-map if schema evolved.
  • Write via the writer’s standard write() or write_bulk() to reinject.