API Reference¶
Factories¶
Start here when you want the default entry points for creating clients and routers without importing provider-specific classes directly.
Create and configure a client instance for the specified provider.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
provider
|
str
|
The provider identifier (e.g., "sharadar"). |
required |
api_key
|
object
|
Provider API key where supported. |
_UNSET
|
rate_limit
|
object
|
Provider rate limit in requests per minute where supported. |
_UNSET
|
schedule
|
SchedulerConfig | dict[str, Any] | None
|
Grouped scheduler configuration for always-on DRR fairness. |
None
|
retry
|
RetryConfig | dict[str, Any] | None
|
Grouped retry policy configuration. |
None
|
throttle
|
AdaptiveThrottleConfig | dict[str, Any] | None
|
Grouped adaptive throttle policy configuration. |
None
|
quality_check
|
Literal['warn', 'error']
|
Data quality violation handling mode. |
'warn'
|
pickle_compat_datasets
|
list[str] | None
|
YFinance-only pickle compatibility dataset allowlist. |
None
|
concurrency
|
int | None
|
Explicit fetch concurrency limit. |
None
|
storage
|
StorageConfig | dict[str, Any] | None
|
Grouped data-lifecycle and write-path tuning settings. |
None
|
limits
|
HTTPConfig | dict[str, Any] | None
|
Grouped HTTP connection-pool configuration. |
None
|
Returns:
| Type | Description |
|---|---|
BaseClient
|
Configured client instance inheriting from BaseClient. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If API key is missing. |
KeyError
|
If provider is unknown. |
Create and configure a router instance for the specified provider.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
provider
|
str
|
The provider identifier (e.g., "sharadar"). |
required |
api_key
|
str | None
|
API key. |
required |
rate_limit
|
int
|
Effective requests-per-minute setting for the router. |
required |
start_date
|
str | None
|
Optional start date filter. |
None
|
end_date
|
str | None
|
Optional end date filter. |
None
|
**kwargs
|
Any
|
Additional provider-specific configuration. |
{}
|
Returns:
| Type | Description |
|---|---|
BaseRouter
|
Configured router instance inheriting from BaseRouter. |
Raises:
| Type | Description |
|---|---|
KeyError
|
If provider is unknown. |
Client & Router¶
These abstractions define the public provider-facing contract that concrete clients and routers implement.
Bases: ABC, Generic[T]
Vendor-agnostic base client abstraction for the Vertex Forager pipeline.
This class serves as the foundation for all provider-specific clients (e.g., SharadarClient). It encapsulates the core infrastructure required to execute data collection tasks independently of the underlying data source.
Key Responsibilities:
1. Session Management: Manages the lifecycle of the underlying HTTP client (httpx.AsyncClient)
via async context managers, ensuring efficient connection pooling and resource cleanup.
2. Pipeline Orchestration: Instantiates and executes the VertexForager pipeline, wiring together
components like Routers, Writers, and Mappers.
3. Flow Control: Initializes the FlowController to enforce global rate limits and concurrency
policies across all pipeline operations.
4. Configuration: Centralizes grouped runtime configuration handling.
Design Principles: - Provider-Agnostic: Contains NO vendor-specific logic. All vendor details must be injected via Routers and Mappers. - Composition over Inheritance: While this is a base class, it primarily delegates work to composed components (FlowController, VertexForager) rather than relying on deep inheritance chains.
Standardized Provider Implementation Pattern: All provider clients should follow this consistent structure for extensibility:
- execute_collection() - Unified data collection pipeline with:
- Router creation with provider-specific configuration
- Writer lifecycle management (DB storage or in-memory)
- Progress tracking and result collection
-
Memory safety validation
-
Provider-specific characteristics documented in docstrings:
- API rate limits and batching strategies
- Data source characteristics (coverage, update frequency)
- Special handling requirements
-
Performance optimization techniques
-
Memory management via common utilities:
- validate_memory_usage() for safety checks
-
Provider-specific memory parameters
-
Error handling patterns:
- Rate limit handling via FlowController
- Network retry logic via HttpExecutor
- Graceful degradation for missing data
Usage
Subclasses must implement specific methods (e.g., get_price_data) that define what to fetch,
delegating the how to self.run_pipeline(). Follow the standardized patterns above for
consistency across all providers.
Initialize the base client infrastructure.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api_key
|
str | None
|
API key for the provider (optional, depends on provider). |
None
|
rate_limit
|
int
|
Maximum requests per minute (RPM) allowed for this client. |
required |
schedule
|
SchedulerConfig | dict[str, Any] | None
|
Grouped scheduler configuration for always-on DRR fairness. |
None
|
retry
|
RetryConfig | dict[str, Any] | None
|
Grouped retry policy configuration. |
None
|
throttle
|
AdaptiveThrottleConfig | dict[str, Any] | None
|
Grouped adaptive throttle policy configuration. |
None
|
quality_check
|
Literal['warn', 'error']
|
Data quality violation handling mode. |
'warn'
|
concurrency
|
int | None
|
Explicit fetch concurrency limit. |
None
|
storage
|
StorageConfig | dict[str, Any] | None
|
Grouped data-lifecycle and write-path tuning settings. |
None
|
limits
|
HTTPConfig | dict[str, Any] | None
|
Grouped HTTP connection-pool configuration. |
None
|
config
property
¶
Resolved internal configuration snapshot.
Returns:
| Name | Type | Description |
|---|---|---|
ResolvedClientConfig |
ResolvedClientConfig
|
The configuration object governing rate limits, concurrency, queue sizes, and thresholds used by this client. |
Notes
Read-only accessor. Callers should not mutate the returned object
in place. Public callers should prefer create_client(...) and
grouped config inputs rather than constructing this model directly.
http_client
property
¶
Get the active HTTP client.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the client has not been initialized (not in context). |
__aenter__()
async
¶
Async context manager entry.
Initializes the shared HTTP client.
__aexit__(exc_type, exc, tb)
async
¶
Async context manager exit.
Ensures the HTTP client is closed.
aclose()
async
¶
Asynchronously close the underlying HTTP client to release resources.
collect_results(writer, table_name, connect_db, *, sort_by_unique_key=True)
async
¶
Collect and return results from writer.
Common result collection logic that handles both database and in-memory scenarios.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
writer
|
BaseWriter
|
Writer instance to collect from |
required |
table_name
|
str
|
Name of the table to collect |
required |
connect_db
|
str | Path | None
|
Database connection (determines collection mode) |
required |
sort_by_unique_key
|
bool
|
Whether to sort by schema's unique key if available |
True
|
Returns:
| Type | Description |
|---|---|
RunResult
|
RunResult for both in-memory and database modes |
managed_writer(connect_db, *, show_progress=True)
async
¶
Manage writer lifecycle with proper resource cleanup.
This is a common infrastructure component that all providers can use to ensure consistent writer lifecycle management.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connect_db
|
str | Path | None
|
Database connection string/path, or None for in-memory. |
required |
show_progress
|
bool
|
Whether to show progress indicators (default: True). |
True
|
Yields:
| Name | Type | Description |
|---|---|---|
BaseWriter |
AsyncGenerator[BaseWriter, None]
|
Properly initialized writer instance. |
Raises:
| Type | Description |
|---|---|
Error
|
If a DuckDB connection cannot be established. |
ValidationError
|
If writer initialization fails due to schema issues. |
Exception
|
Any unexpected errors during writer setup are propagated. |
Example
async with self.managed_writer(connect_db, show_progress=True) as writer: result = await self.run_pipeline(..., writer=writer)
run_async(method, url, **kwargs)
async
¶
Execute a standard async HTTP request using the underlying client.
Delegates directly to client.request().
run_pipeline(*, router, dataset, symbols, writer, mapper, on_progress=None, progress=False, **kwargs)
async
¶
Run the VertexForager pipeline for the given router, dataset, and symbols.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
router
|
IRouter
|
Data router to fetch data from. |
required |
dataset
|
T
|
Dataset name (e.g., "price"). |
required |
symbols
|
list[str] | None
|
List of symbols to fetch data for. If None, fetch all symbols. |
required |
writer
|
IWriter
|
Data writer to persist the processed data. |
required |
mapper
|
IMapper
|
Schema mapper to transform connector data to sink schema. |
required |
on_progress
|
Callable[[ProgressSnapshot], None] | None
|
Optional callback receiving ProgressSnapshot per completed request. |
None
|
progress
|
bool
|
Whether to show built-in progress output and final summary. |
False
|
**kwargs
|
JSONValue
|
Additional arguments passed to the pipeline run method. |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
RunResult |
RunResult
|
Summary of the pipeline run, including success/failure status. |
Raises:
| Type | Description |
|---|---|
RequestError
|
If a network error occurs during fetching. |
HTTPStatusError
|
If an HTTP response returns non-2xx. |
ValidationError
|
If schema validation fails during writing. |
PrimaryKeyMissingError
|
When required PK columns are absent. |
PrimaryKeyNullError
|
When PK columns contain nulls. |
run_sync(func, *args, **kwargs)
async
¶
Execute a blocking (synchronous) function in a separate thread.
This wrapper ensures that blocking library calls (like yfinance, pandas I/O) do not freeze the main asyncio event loop.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[..., Any]
|
Callable to execute in a worker thread. |
required |
*args
|
Any
|
Positional arguments for the callable. |
()
|
**kwargs
|
Any
|
Keyword arguments for the callable. |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The return value of the callable. |
Raises:
| Type | Description |
|---|---|
Exception
|
Any exception raised by the callable is propagated. |
Bases: ABC, Generic[T]
Vendor-agnostic base router abstraction for the Vertex Forager pipeline.
The Router acts as the protocol adapter between the generic pipeline engine and specific vendor APIs. It encapsulates all knowledge about URL construction, request parameters, and response parsing logic.
Attributes:
| Name | Type | Description |
|---|---|---|
flexible_schema |
bool
|
When True, downstream normalization may allow diagonal concatenation for schema evolution (extra/missing columns). This enables tolerant merging when provider payloads vary across requests. |
Key Responsibilities:
1. Job Generation (generate_jobs): Translates high-level data requests (dataset, symbols)
into concrete FetchJob objects containing fully formed HTTP request specifications.
- Handles pagination logic (generating multiple jobs if needed).
- Applies provider-specific query parameters.
2. Response Parsing (parse): Converts raw HTTP response bytes into structured
FramePackets (Polars DataFrames) normalized for the pipeline.
- Handles CSV/JSON parsing.
- Validates response schemas.
- Maps vendor-specific field names to pipeline standards.
Design Principles:
- Statelessness: Routers should be primarily stateless, processing inputs to outputs
without maintaining complex internal state about the pipeline progress.
- Isolation: Each Router implementation (e.g., SharadarRouter) contains all
vendor-specific logic, keeping the core pipeline engine clean and generic.
provider
abstractmethod
property
¶
Unique identifier for the data provider.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
Provider identifier (e.g., 'sharadar', 'yfinance'). |
generate_jobs(*, dataset, symbols, **kwargs)
abstractmethod
¶
Generate provider-specific HTTP fetch jobs.
Converts high-level requests (dataset, symbols, date filters) into concrete HTTP request specifications. Provider implementations own batching, URL construction, auth, and pagination context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dataset
|
T
|
Dataset type (e.g., 'price', 'financials', 'actions'). |
required |
symbols
|
Sequence[str] | None
|
List of ticker symbols, or None for market-wide data. |
required |
**kwargs
|
object
|
Provider-specific options (e.g., dimension, bulk_size). |
{}
|
Yields:
| Name | Type | Description |
|---|---|---|
FetchJob |
AsyncIterator[FetchJob]
|
HTTP request spec and context for the executor. |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
Must be implemented by provider routers. |
FetchError
|
Network/API failures from provider execution. |
TransformError
|
Payload parsing/normalization failures. |
Responsibilities
- Symbol handling: single/multiple/None depending on dataset
- Batching: choose batch size under API/URL limits
- URL/params: build endpoints and query parameters
- Date filters: apply start/end via _parse_date_range when needed
- Pagination: include cursor/keys in context if supported
- Auth: attach provider tokens/headers
Examples:
Yielding a pagination job: - dataset="tickers", symbols=None - context includes {"pagination": {"cursor_param": "qopts.cursor_id", "meta_key": "next_cursor_id"}} - params use {"qopts.per_page": "10000"}
parse(*, job, payload)
abstractmethod
¶
Normalize provider response into FramePacket(s).
Decodes raw bytes, converts to Polars, performs provider-specific structural normalization in this method (no separate normalize_frame), injects provider metadata, and prepares follow-up pagination jobs when applicable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
FetchJob
|
The fetch job that produced this payload. |
required |
payload
|
bytes
|
Raw HTTP response bytes. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
ParseResult |
ParseResult
|
Normalized packets and optional next jobs. |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
Must be implemented by provider routers. |
FetchError
|
Provider-reported API errors mapped to standard exceptions. |
TransformError
|
Data conversion/structural normalization failures. |
Responsibilities
- Format handling: JSON/CSV/binary/pickle
- Error handling: detect API errors and malformed payloads
- Data extraction: parse nested provider-specific structures
- DataFrame creation: build Polars frames
- Column normalization: use _normalize_columns where applicable
- Metadata injection: use _add_provider_metadata
- Empty handling: use _check_empty_response
- Pagination: derive next jobs from response metadata/context
Notes
- Structure transformation happens here; strict type casting is delegated to SchemaMapper.
- Use _parse_date_range for date derivations when needed.
Examples:
Creating follow-up pagination job: - Read "next_cursor_id" from response meta - Set job.spec.params["qopts.cursor_id"] to next value - Append to ParseResult.next_jobs
Pipeline Engine¶
The pipeline engine coordinates fetch, parse, normalize, and write stages for a run.
High-performance asynchronous data pipeline engine.
This class orchestrates the end-to-end data collection process using a Producer-Consumer architecture with asyncio. It manages three main stages: Job Generation (Producer), Data Fetching (Fetch Workers), and Data Writing (Writer Workers).
Attributes:
| Name | Type | Description |
|---|---|---|
_router |
IRouter
|
Router/Queue manager for fetch jobs. |
_http |
HttpExecutor
|
Handles HTTP requests. |
_writer |
IWriter
|
Writer task manager. |
_mapper |
IMapper
|
Normalizes data schemas. |
_config |
ResolvedClientConfig
|
Internal resolved configuration object. |
controller |
FlowController
|
Rate limiter and concurrency controller. |
_flush_threshold |
int
|
Row count threshold for flushing buffers. |
Public Methods
run(dataset: str, symbols: Symbols | None, ...) -> RunResult: Execute the full collection pipeline. stop() -> None: Gracefully stop the pipeline.
run(*, dataset, symbols, on_progress=None, progress=False, resume=False, **kwargs)
async
¶
Execute the pipeline.
This method sets up the asyncio queues and worker tasks, runs the pipeline until completion, and returns the execution result.
Process:
1. Create req_q (for jobs) and pkt_q (for data packets).
2. Spawn _writer_worker tasks (consumers of processed data).
3. Spawn _fetch_worker tasks (consumers of jobs, producers of data).
4. Spawn _producer task (generator of jobs).
5. Wait for all tasks to complete in order:
- Producer finishes -> Queue join.
- Fetchers finish -> Queue join.
- Writer workers finish.
6. Aggregate results and handle cleanup.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dataset
|
str
|
Name of the dataset to fetch (e.g., "sep"). |
required |
symbols
|
Symbols | None
|
List of symbols to fetch, or None for all. |
required |
on_progress
|
Callable[[ProgressSnapshot], Any] | None
|
Optional callback receiving ProgressSnapshot on each job completion. |
None
|
progress
|
bool
|
Whether to show the built-in progress bar and final summary. |
False
|
resume
|
bool
|
Whether to resume from existing checkpoint (default: False). |
False
|
**kwargs
|
object
|
Additional arguments passed to the router's generate_jobs method. |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
RunResult |
RunResult
|
Summary of the run including metrics and errors. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
Orchestration-level failures (e.g., early writer shutdown)
or if |
Notes
- Exceptions raised during fetch/parse/write (e.g., httpx.RequestError,
httpx.HTTPStatusError, ValidationError, PrimaryKeyMissingError,
PrimaryKeyNullError) are captured and appended to
RunResult.errorsand are not re-raised by default. - Callers should inspect
RunResult.errorsfor per-task failures and only expect orchestration-level issues to raise. - When
resume=True, symbols already completed in previous runs will be skipped based on checkpoint rows stored in ~/.cache/vertex-forager/state.db.
Pipeline Results¶
These result models summarize parsed packets, table counts, and run-level outcomes.
Bases: BaseModel
Result summary for a pipeline run.
Attributes:
| Name | Type | Description |
|---|---|---|
provider |
str
|
Data provider name. |
run_id |
str | None
|
Unique identifier for the run (default: None). |
dataset |
str | None
|
Dataset name for the run (default: None). |
started_at |
float | None
|
Timestamp when the run started (default: None). |
finished_at |
float | None
|
Timestamp when the run finished (default: None). |
duration_s |
float | None
|
Duration of the run in seconds (default: None). |
coverage_pct |
float | None
|
Coverage percentage for the run (default: None). |
data |
DataFrame | None
|
In-memory collected payload for non-persisted runs. |
tables |
dict[str, int]
|
Dictionary mapping table names to row counts (default: empty dict). |
errors |
list[RunError]
|
List of structured error information (default: empty list). |
dlq_pending |
dict[str, list[FramePacket]]
|
Packets preserved for post-mortem/dead-letter processing when DLQ spool/dispatch fails. Items are appended by writer/rescue logic upon spool errors and can be consumed by operator recovery flows. |
dlq_counts |
dict[str, dict[str, int]]
|
Per-table counts for rescued and remaining packets when DLQ is disabled or spooling occurs. Always populated regardless of metrics settings. |
quality_violations |
dict[str, int]
|
Dictionary mapping table names to quality violation counts (default: empty dict). |
Result of parsing a response.
Attributes:
| Name | Type | Description |
|---|---|---|
packets |
list[FramePacket]
|
List of extracted FramePackets containing data. |
next_jobs |
list[FetchJob]
|
List of subsequent FetchJobs to be executed. |
Flow Control¶
Use these components when tuning throughput, concurrency, and request pacing.
Unified Flow Controller managing both Concurrency and Rate Limiting.
Combines: 1. GradientConcurrencyLimiter: To handle backpressure and optimize throughput based on latency. 2. GCRARateLimiter: To strictly enforce API rate limits (RPM).
Initialize the unified Flow Controller.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
requests_per_minute
|
int
|
int — Allowed requests per minute (RPM). |
required |
concurrency_limit
|
int | None
|
int | None — Optional max concurrent requests; if None, auto-computed. |
None
|
adaptive_throttle_window_s
|
int
|
int — Sliding window (seconds) for error-rate calculation. |
60
|
error_rate_threshold
|
float
|
float — Error/retry ratio threshold to trigger throttle decrease. |
0.2
|
rpm_floor_ratio
|
float
|
float — Minimum RPM as a ratio of ceiling (0.0-1.0) during throttle. |
0.1
|
recovery_factor
|
float
|
float — RPM increment as a fraction of ceiling during recovery (0.0-1.0). |
0.05
|
healthy_window_s
|
int
|
int — Duration (seconds) of healthy period required to upshift. |
60
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If requests_per_minute <= 0. |
concurrency_limit
property
¶
Get the configured maximum concurrency limit.
throttle()
async
¶
Context manager to acquire both rate limit and concurrency slot.
Behavior
- Acquires GCRA rate limiter permission (may sleep).
- Acquires GradientConcurrencyLimiter slot (may wait).
- Releases slot with RTT feedback on exit to adapt limits.
Raises:
| Type | Description |
|---|---|
CancelledError
|
If the enclosing task is cancelled while waiting. |
Adaptive Concurrency Controller based on Netflix's Gradient Algorithm.
[What it limits]
Limits the number of Concurrent In-flight Requests (Concurrency).
It dynamically adjusts the limit (max concurrent requests) to find the
system's optimal throughput without overloading it.
[Theoretical Background]
Based on Little's Law (L = λW) and Netflix's Gradient Algorithm.
It calculates a gradient using the ratio of min_rtt (ideal no-load latency) to observed_rtt (current latency).
- Gradient < 1 (RTT increased): System is overloaded -> Decrease limit.
- Gradient ~ 1 (RTT stable): System is healthy -> Increase limit slightly (explore capacity).
Formula: new_limit = old_limit * (min_rtt / current_rtt) + queue_size
Async Rate Limiter implementing GCRA (Generic Cell Rate Algorithm).
[What it limits] Limits the Rate of Requests over time (e.g., Requests Per Minute). It ensures the application complies with the API provider's usage quotas (Rate Limits).
[Theoretical Background] GCRA (Generic Cell Rate Algorithm) is a sophisticated Leaky Bucket variant. Instead of using a counter reset window (which causes "thundering herd" at window boundaries), it tracks the Theoretical Arrival Time (TAT) for the next request. - If a request arrives before TAT, it waits (or is rejected). - Allows short Bursts up to a defined limit, but strictly enforces the long-term average rate.
Configuration¶
These types describe the request, retry, writer, and execution settings used throughout the pipeline.
Bases: BaseModel
Retry configuration for HTTP requests.
Attributes:
| Name | Type | Description |
|---|---|---|
max_attempts |
int
|
Maximum number of retry attempts (default: 3). |
base_backoff_s |
float
|
Initial backoff duration in seconds (default: 1.0). |
max_backoff_s |
float
|
Maximum backoff duration in seconds (default: 30.0). |
backoff_mode |
Literal['full_jitter', 'equal']
|
Backoff distribution strategy (default: "full_jitter"). |
retry_status_codes |
tuple[int, ...]
|
Tuple of HTTP status codes to trigger retries (default: (429, 503)). |
Notes
- Two backoff modes are supported:
- "full_jitter" (default): sleep is drawn uniformly from [0, min(max_backoff_s, base_backoff_s * 2^(attempt-1))].
- "equal": sleep is drawn from [cap/2, cap] where cap = min(max_backoff_s, base_backoff_s * 2^(attempt-1)).
- Retry-After response headers (integer seconds) take priority over backoff when present.
- Defaults are conservative: retries on 429 (Too Many Requests) and 503 (Service Unavailable).
- Opt-in to broader server errors (e.g., 500, 502, 504) ONLY when requests are idempotent. Non-idempotent operations (e.g., POST/PUT without idempotency keys) may cause duplicate side effects. Use idempotency keys or upstream idempotent semantics before enabling broader codes.
Bases: BaseModel
Adaptive throttle policy for dynamic RPM adjustment based on error rate.
Attributes:
| Name | Type | Description |
|---|---|---|
window_s |
int
|
Sliding window in seconds used to evaluate recent error rate. |
error_rate_threshold |
float
|
Error ratio that triggers throttle decrease. |
rpm_floor_ratio |
float
|
Minimum RPM as a ratio of ceiling (0.0-1.0) maintained while throttled. |
recovery_factor |
float
|
RPM added as a fraction of ceiling during healthy recovery (0.0-1.0). |
healthy_window_s |
int
|
Healthy period required before recovering RPM upward. |
Notes
- Uses AIMD (Additive Increase/Multiplicative Decrease) pattern.
- Multiplicative decrease: new_rpm = effective_rpm * 0.8 when error threshold exceeded.
- Additive increase: new_rpm = effective_rpm + max(1, ceiling * recovery_factor) when healthy.
- rpm_floor is resolved to an absolute value at init: floor = max(1, ceiling * rpm_floor_ratio).
Bases: BaseModel
HTTP connection-pool settings exposed as a grouped public config.
Attributes:
| Name | Type | Description |
|---|---|---|
max_connections |
int
|
Maximum connection pool size. |
max_keepalive_connections |
int
|
Maximum keep-alive pool size. |
timeout_s |
float
|
HTTP request timeout in seconds. |
Bases: BaseModel
HTTP request specification for a fetch job.
Attributes:
| Name | Type | Description |
|---|---|---|
method |
HttpMethod
|
HTTP method to use (default: |
url |
str
|
Target URL for the request. |
params |
dict[str, JSONValue]
|
Query parameters as key-value pairs (default: empty dict). |
headers |
dict[str, str]
|
HTTP headers as key-value pairs (default: empty dict). |
json_body |
dict[str, JSONValue] | None
|
JSON payload for POST/PUT requests (default: None). |
data |
bytes | None
|
Raw bytes payload for requests (default: None). |
auth |
RequestAuth
|
Authentication strategy to apply (default: |
idempotent |
bool
|
Whether the request is safe to retry without side effects. Defaults to True; set to False to disable automatic retries for non-idempotent operations. |
Bases: BaseModel
Unit of work for the fetch pipeline.
Attributes:
| Name | Type | Description |
|---|---|---|
provider |
str
|
Data provider name (e.g., |
dataset |
str
|
Dataset identifier (e.g., |
symbol |
str | None
|
Target symbol or ticker if applicable (default: None). |
spec |
RequestSpec
|
HTTP request specification details. |
context |
Mapping[str, JSONValue]
|
Additional context for job execution and tracing (default: empty dict). |
Bases: BaseModel
Polars frame packet passed from provider to sink.
Attributes:
| Name | Type | Description |
|---|---|---|
provider |
str
|
Data provider name. |
table |
str
|
Target table name for storage. |
frame |
DataFrame
|
Polars DataFrame containing the data. |
observed_at |
datetime
|
Timestamp when the data was observed/fetched. |
partition_date |
date | None
|
Optional date for partitioning logic (default: None). |
context |
Mapping[str, JSONValue]
|
Metadata context passed along with the data (default: empty dict). |
Bases: str, Enum
HTTP method for request execution.
Values
GET: HTTP GET method. POST: HTTP POST method.
Bases: BaseModel
Authentication strategy attached to a request spec.
Attributes:
| Name | Type | Description |
|---|---|---|
kind |
str
|
Authentication type — |
token |
str | None
|
Authentication token string if applicable (default: None). |
header_name |
str | None
|
Name of the header to inject the token into (default: None). |
query_param |
str | None
|
Name of the query parameter to inject the token into (default: None). |
HTTP¶
The HTTP executor handles transport concerns for provider requests and library fetch dispatch.
Async HTTP Request Executor using httpx.
This class abstracts the low-level HTTP client details and maps RequestSpec
objects to actual network requests. It handles authentication header injection
and response status checking.
It also supports special schemes like yfinance:// to bypass HTTP and use internal libraries.
Notes
- Error logs redact URLs using a sanitizer to avoid leaking sensitive query parameters (e.g., API keys) in messages.
- Example: "https://api.example.com?token=SECRET&cursor=123" -> "[redacted]"
Initialize with an existing client.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
HttpClientProtocol
|
|
required |
fetch(spec)
async
¶
Execute a request and return response bytes.
Dispatches to specific fetch implementation based on URL scheme.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spec
|
RequestSpec
|
Fully defined request specification. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bytes |
bytes
|
The raw response body. |
Raises:
| Type | Description |
|---|---|
HTTPStatusError
|
If the server returns 4xx/5xx status code. |
RequestError
|
If a network error occurs. |
ValueError
|
If URL scheme is invalid. |
TypeError
|
If library request parameters are invalid. |
Retry¶
Retry helpers centralize backoff policy and retry execution behavior.
Create a tenacity AsyncRetrying controller from configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RetryConfig
|
Retry configuration. |
required |
log_level
|
int
|
Logging level for retry attempts. |
WARNING
|
retry_on
|
tuple[type[Exception], ...]
|
Tuple of exception types to retry on. Defaults to (httpx.TransportError,). |
(TransportError,)
|
Returns:
| Name | Type | Description |
|---|---|---|
AsyncRetrying |
AsyncRetrying
|
Configured retry controller. |
Writers¶
Writer APIs control how normalized frames are persisted or collected in memory.
Factory function to instantiate the appropriate Writer.
Selection Logic:
- None: Returns InMemoryBufferWriter (in-memory).
- String URI (duckdb://): Returns DuckDBWriter.
- String Path / Path object: Returns DuckDBWriter (assumed file path).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connect_db
|
str | Path | None
|
Connection string, Path object, or None. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
BaseWriter |
BaseWriter
|
An initialized writer instance. |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If a URI scheme is unknown. |
Bases: ABC
Abstract Base Class for Data Writers.
The Writer component acts as the final destination in the data pipeline, responsible
for persisting normalized FramePackets (Polars DataFrames) to a durable storage backend.
Key Responsibilities:
1. Persistence: Efficiently writing data to disk (DuckDB, Parquet) or memory.
2. Concurrency Safety: Managing thread/async safety for storage engines that require
single-writer access (e.g., DuckDB).
3. Bulk Processing: Implementing write_bulk to optimize throughput by reducing
transaction overhead.
4. Resource Management: Handling connections, file handles, and proper cleanup via
async context managers (__aenter__, __aexit__).
Design Principles:
- Schema-Agnostic: Writers receive already-normalized data. They trust the upstream
SchemaMapper and do not perform schema validation.
- Fail-Fast: Errors during write operations should propagate immediately to stop
the pipeline or trigger retry logic.
__aenter__()
async
¶
Async context manager entry.
__aexit__(exc_type, exc_val, exc_tb)
async
¶
Async context manager exit.
Ensures resources are closed even if an error occurs.
close()
async
¶
Close any open resources (connections, files).
Default implementation does nothing. Override if resources need cleanup.
collect_table(table_name, sort_cols=None)
¶
Collect table data in memory for in-memory writers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
str
|
The name of the table to collect. |
required |
sort_cols
|
list[str] | None
|
Optional list of columns to sort by. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: The collected DataFrame. |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If the writer does not support in-memory collection. |
flush()
async
¶
Flush any buffered data to storage.
Default implementation does nothing. Override if buffering is used.
write(packet)
abstractmethod
async
¶
Persist a single data packet.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
packet
|
FramePacket
|
The normalized data packet containing a Polars DataFrame. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
WriteResult |
WriteResult
|
Summary including table name and written row count. |
Raises:
| Type | Description |
|---|---|
ComputeError
|
On data processing errors. |
ValidationError
|
On schema validation errors. |
Error
|
On database specific errors (if applicable). |
write_bulk(packets)
async
¶
Persist a bulk of data packets.
Default implementation iterates write().
Subclasses should override this for transactional/bulk optimization.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
packets
|
list[FramePacket]
|
List of data packets. |
required |
Returns:
| Type | Description |
|---|---|
list[WriteResult]
|
list[WriteResult]: List of results for each packet. |
Bases: BaseWriter
DuckDB writer implementation.
Manages a local DuckDB database for storing collected data using Polars integration.
Attributes:
| Name | Type | Description |
|---|---|---|
lock |
Lock
|
Lock to ensure single-writer access to the database. |
connection |
DuckDBPyConnection
|
Active DuckDB connection. |
db_path |
str
|
Path to the DuckDB database file. |
Features
- Automatic table creation from Polars schema.
- Upsert semantics based on primary keys.
- Zero-copy data transfer between Polars and DuckDB.
- Thread-safe execution using a single-threaded executor.
Initialize DuckDB writer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
db_path
|
str | Path
|
Path to the DuckDB database file. |
required |
strict
|
bool
|
If True, unsupported Polars types raise SchemaMapError. If False, fallback to VARCHAR. |
False
|
close()
async
¶
Close the database connection.
Acquires the lock to ensure no write operations are in progress before closing the connection.
Raises:
| Type | Description |
|---|---|
Error
|
If the connection close operation fails. |
compact()
async
¶
Optimize database storage.
Runs VACUUM and CHECKPOINT to reclaim space and enforce compression.
Raises:
| Type | Description |
|---|---|
Error
|
If VACUUM or CHECKPOINT commands fail. |
flush()
async
¶
Flush is now no-op as we write immediately.
write(packet)
async
¶
Write a single packet to DuckDB immediately. Stateless implementation (no buffering).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
packet
|
FramePacket
|
FramePacket containing the table name and DataFrame to write. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
WriteResult |
WriteResult
|
Result object containing table name and number of rows written. |
Raises:
| Type | Description |
|---|---|
Error
|
If DuckDB execution fails during upsert. |
ValidationError
|
If schema validation fails during write. |
PrimaryKeyMissingError
|
Required PK columns are missing in packet. |
PrimaryKeyNullError
|
PK columns contain null values. |
write_bulk(packets)
async
¶
Write a bulk of packets to DuckDB immediately. Stateless implementation (no buffering).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
packets
|
list[FramePacket]
|
List of FramePackets to write. |
required |
Returns:
| Type | Description |
|---|---|
list[WriteResult]
|
list[WriteResult]: List of result objects for each written packet. |
Raises:
| Type | Description |
|---|---|
Error
|
If DuckDB execution fails during batch upsert. |
ValidationError
|
If schema validation fails during write. |
PrimaryKeyMissingError
|
Required PK columns are missing in packets. |
PrimaryKeyNullError
|
PK columns contain null values in packets. |
Bases: BaseWriter
In-memory writer for buffering results.
Used when the user wants to get a DataFrame back directly without writing to disk. Accumulates all incoming packets in a dictionary of lists.
Notes
- Not suitable for massive datasets that exceed memory.
- Best used for small workloads or unit testing scenarios.
Example
Collect and sort buffered frames for a table: writer = InMemoryBufferWriter() await writer.write(packet) df = writer.collect_table("price", sort_cols=["ticker", "date"])
collect_table(table, sort_cols=None)
¶
Concatenate all buffered parts for a table into a single DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
str
|
Table name (e.g., 'price_bars'). |
required |
sort_cols
|
list[str] | None
|
Optional list of columns to sort by (e.g., from schema unique_key). |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pl.DataFrame: Combined data. |
Example
df = writer.collect_table("price", sort_cols=["ticker", "date"])
df contains all buffered parts for 'price', optionally sorted¶
write(packet)
async
¶
Append packet to the in-memory buffer.
Thread-safe via threading lock. Applies deduplication if upsert_keys is configured, matching DuckDB semantics.
Bases: BaseModel
Writer write result.
Attributes:
| Name | Type | Description |
|---|---|---|
table |
str
|
The target table name written to. |
rows |
int
|
Number of rows written. |
partitions |
Mapping[str, str]
|
Partition key/value pairs created or updated (defaults to empty dict). |
Lifecycle¶
Lifecycle helpers create and finalize the shared state for a pipeline run.
Exceptions¶
These are the main public exception types you should catch around client usage and integration code.
Bases: Exception
Base exception for all Vertex Forager errors.
Raised when an operation within the Vertex Forager system encounters an error that does not fit a more specific category.
Bases: VertexForagerError
Invalid user or configuration input.
Raised when parameters, identifiers, or configuration values are invalid.
Bases: VertexForagerError
Network or external provider fetch failures.
Raised when HTTP/library calls return errors or unreachable resources.
Bases: VertexForagerError
Data transformation/normalization failures.
Raised during parsing or schema normalization when data cannot be shaped.
Bases: VertexForagerError
Persistence/write failures.
Raised by writers when storage operations fail.
Bases: VertexForagerError
Computation failures during data processing.
Bases: VertexForagerError
Schema or data validation failures.
Bases: ValidationError
Primary key column missing from data.
Indicates that required primary key columns are absent in the dataset being processed or written.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
str
|
Target table name. |
required |
column
|
str
|
Missing primary key column name(s). |
required |
Attributes:
| Name | Type | Description |
|---|---|---|
table |
Target table name. |
|
column |
Missing primary key column name(s). |
Bases: ValidationError
Primary key contains null values.
Indicates that the primary key column includes one or more nulls which would violate uniqueness constraints or upsert logic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
str
|
Target table name. |
required |
column
|
str
|
Primary key column name. |
required |
null_count
|
int
|
Number of nulls detected. |
required |
Attributes:
| Name | Type | Description |
|---|---|---|
table |
Target table name. |
|
column |
Primary key column name. |
|
null_count |
Number of nulls detected. |
Bases: VertexForagerError
Dead Letter Queue (DLQ) spooling failure.
Raised when persisting failed packets to the DLQ fails. Carries counts that summarize partial rescue progress and remaining items.
Attributes:
| Name | Type | Description |
|---|---|---|
rescued |
Number of items successfully rescued (written) before the spool attempt. |
|
remaining |
Number of items left to persist in the DLQ when the failure occurred. |
|
original |
Optional underlying exception that triggered the spool failure. |
Example
raise DLQSpoolError(rescued=1, remaining=3, original=exc)
Core Errors¶
Core errors cover pipeline-level failures that sit below the top-level public exceptions.
Structured error information for pipeline runs.
Attributes:
| Name | Type | Description |
|---|---|---|
provider |
str
|
Data provider name (e.g., "sharadar", "yfinance") |
dataset |
str
|
Dataset name (e.g., "sep", "actions") |
symbol |
str
|
Symbol that failed (e.g., "AAPL", "MSFT") |
exc_type |
str
|
Exception type as string (e.g., "httpx.HTTPStatusError") |
message |
str
|
Error message |
retryable |
bool
|
Whether this error is retryable (True for 429/503/network errors) |
from_exception(exc, provider, dataset, symbol)
classmethod
¶
Create a RunError from an exception.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
exc
|
Exception
|
The exception that occurred |
required |
provider
|
str
|
Data provider name |
required |
dataset
|
str
|
Dataset name |
required |
symbol
|
str | None
|
Symbol that failed |
required |
Returns:
| Name | Type | Description |
|---|---|---|
RunError |
RunError
|
Structured error information |
Utilities¶
Utility helpers provide small convenience functions for env parsing, ticker validation, and progress updates.
Convert a pipeline result object to a serializable dictionary.
Handles cases where the object is None or has different metric structures.
Validate a list of ticker symbols.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
symbols
|
list[str] | tuple[str, ...]
|
Ticker symbols container (list or tuple of strings). |
required |
Raises:
| Type | Description |
|---|---|
InputError
|
If symbols is not a list/tuple, empty, contains non-string items, or any item is empty or has leading/trailing whitespace. |
Read a boolean environment variable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Environment variable name. |
required |
default
|
bool
|
Default value if not set. |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True if value is "1", "true", "yes", "on" (case-insensitive). |
Read an integer environment variable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Environment variable name. |
required |
default
|
int | None
|
Default value if not set or invalid. |
None
|
Returns:
| Type | Description |
|---|---|
int | None
|
Integer value, or default if parsing fails. |
Read a float environment variable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Environment variable name. |
required |
default
|
float | None
|
Default value if not set or invalid. |
None
|
Returns:
| Type | Description |
|---|---|
float | None
|
Float value, or default if parsing fails. |
Create a progress bar update callback.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pbar
|
tqdm
|
The tqdm progress bar instance. |
required |
Returns:
| Type | Description |
|---|---|
Callable[[ProgressSnapshot], None]
|
Callable to update the progress bar. |