Data integrity controls¶
Enable stricter validation and lightweight dedup/upsert without changing defaults.
Strict schema validation¶
- Default: lenient — missing columns are added with
null, casts usestrict=False. - Strict: set
SchemaMapper(strict_validation=True)to fail fast on invalid inputs.
from vertex_forager.schema.mapper import SchemaMapper
mapper = SchemaMapper(strict_validation=False) # default
strict_mapper = SchemaMapper(strict_validation=True) # raises on missing columns / strict casts
Counters:
schema_missing_cols_filled: cells filled asnullin non‑strict modeschema_extra_cols_preserved_count: extra input columns preserved
In‑memory dedup/upsert by unique key¶
Use an optional unique key to deduplicate buffered rows when collecting results in memory.
Behavior:
- Frames are concatenated, then deduplicated by
unique_keywithkeep="last",maintain_order=True. - Sorting (if requested) is applied after dedup/upsert.
from vertex_forager.writers.memory import InMemoryBufferWriter
writer = InMemoryBufferWriter()
writer.set_unique_key(["provider", "ticker", "date"])
df = writer.collect_table("yfinance_price")
If collecting via a provider client, the schema’s unique_key is automatically passed to the in‑memory writer when available.
Counter:
inmem_dedup_dropped_rows: number of rows removed by dedup/upsert
Notes¶
- Metrics counters are always merged into
RunResult.metrics_counters. - Defaults are unchanged unless you opt in to these features.
Data Quality Rules¶
TableSchema supports a quality_rules tuple so you can attach lightweight validation to a table without changing the main fetch and write flow.
Available rules¶
DataQualityRule- Protocol for custom validators. Implement
validate(df) -> list[str]and return violation messages. NoDuplicateRows- Detects duplicate rows either across the full frame or within a subset of columns such as
["ticker", "date"]. NoFutureDates- Flags rows whose date or datetime columns are later than the current time.
NoNegativePrices- Flags negative numeric values in common price columns such as
open,high,low, andclose.
Pass rules through a schema¶
import polars as pl
from vertex_forager import NoDuplicateRows, NoFutureDates, NoNegativePrices
from vertex_forager.schema.config import TableSchema
price_schema = TableSchema(
table="custom_price",
schema={"ticker": pl.String, "date": pl.Date, "close": pl.Float64},
unique_key=("ticker", "date"),
quality_rules=(
NoDuplicateRows(subset=["ticker", "date"]),
NoFutureDates(date_columns=["date"]),
NoNegativePrices(price_columns=["close"]),
),
)
When to use each rule¶
- Use
NoDuplicateRowswhen a table has a natural key and duplicate packets should be surfaced before downstream analysis. - Use
NoFutureDateswhen provider timestamps must never exceed the observation time for the run. - Use
NoNegativePriceswhen market data should always remain non-negative and you want quick anomaly detection.
Violations are logged and aggregated into RunResult.quality_violations by default.
If you want the run to stop on the first quality violation:
from vertex_forager import create_client
client = create_client(provider="yfinance", quality_check="error")
In quality_check="error" mode, the pipeline raises DataQualityError(table=..., rule=..., violations=[...]) instead of returning a RunResult.