Skip to content

API Reference

Factories

Start here when you want the default entry points for creating clients and routers without importing provider-specific classes directly.

Create and configure a client instance for the specified provider.

Parameters:

Name Type Description Default
provider str

The provider identifier (e.g., "sharadar").

required
api_key object

Provider API key where supported.

_UNSET
rate_limit object

Provider rate limit in requests per minute where supported.

_UNSET
schedule SchedulerConfig | dict[str, Any] | None

Grouped scheduler configuration for always-on DRR fairness.

None
retry RetryConfig | dict[str, Any] | None

Grouped retry policy configuration.

None
throttle AdaptiveThrottleConfig | dict[str, Any] | None

Grouped adaptive throttle policy configuration.

None
quality_check Literal['warn', 'error']

Data quality violation handling mode.

'warn'
pickle_compat_datasets list[str] | None

YFinance-only pickle compatibility dataset allowlist.

None
concurrency int | None

Explicit fetch concurrency limit.

None
storage StorageConfig | dict[str, Any] | None

Grouped data-lifecycle and write-path tuning settings.

None
limits HTTPConfig | dict[str, Any] | None

Grouped HTTP connection-pool configuration.

None

Returns:

Type Description
BaseClient

Configured client instance inheriting from BaseClient.

Raises:

Type Description
ValueError

If API key is missing.

KeyError

If provider is unknown.

Create and configure a router instance for the specified provider.

Parameters:

Name Type Description Default
provider str

The provider identifier (e.g., "sharadar").

required
api_key str | None

API key.

required
rate_limit int

Effective requests-per-minute setting for the router.

required
start_date str | None

Optional start date filter.

None
end_date str | None

Optional end date filter.

None
**kwargs Any

Additional provider-specific configuration.

{}

Returns:

Type Description
BaseRouter

Configured router instance inheriting from BaseRouter.

Raises:

Type Description
KeyError

If provider is unknown.

Client & Router

These abstractions define the public provider-facing contract that concrete clients and routers implement.

Bases: ABC, Generic[T]

Vendor-agnostic base client abstraction for the Vertex Forager pipeline.

This class serves as the foundation for all provider-specific clients (e.g., SharadarClient). It encapsulates the core infrastructure required to execute data collection tasks independently of the underlying data source.

Key Responsibilities: 1. Session Management: Manages the lifecycle of the underlying HTTP client (httpx.AsyncClient) via async context managers, ensuring efficient connection pooling and resource cleanup. 2. Pipeline Orchestration: Instantiates and executes the VertexForager pipeline, wiring together components like Routers, Writers, and Mappers. 3. Flow Control: Initializes the FlowController to enforce global rate limits and concurrency policies across all pipeline operations. 4. Configuration: Centralizes grouped runtime configuration handling.

Design Principles: - Provider-Agnostic: Contains NO vendor-specific logic. All vendor details must be injected via Routers and Mappers. - Composition over Inheritance: While this is a base class, it primarily delegates work to composed components (FlowController, VertexForager) rather than relying on deep inheritance chains.

Standardized Provider Implementation Pattern: All provider clients should follow this consistent structure for extensibility:

  1. execute_collection() - Unified data collection pipeline with:
  2. Router creation with provider-specific configuration
  3. Writer lifecycle management (DB storage or in-memory)
  4. Progress tracking and result collection
  5. Memory safety validation

  6. Provider-specific characteristics documented in docstrings:

  7. API rate limits and batching strategies
  8. Data source characteristics (coverage, update frequency)
  9. Special handling requirements
  10. Performance optimization techniques

  11. Memory management via common utilities:

  12. validate_memory_usage() for safety checks
  13. Provider-specific memory parameters

  14. Error handling patterns:

  15. Rate limit handling via FlowController
  16. Network retry logic via HttpExecutor
  17. Graceful degradation for missing data
Usage

Subclasses must implement specific methods (e.g., get_price_data) that define what to fetch, delegating the how to self.run_pipeline(). Follow the standardized patterns above for consistency across all providers.

Initialize the base client infrastructure.

Parameters:

Name Type Description Default
api_key str | None

API key for the provider (optional, depends on provider).

None
rate_limit int

Maximum requests per minute (RPM) allowed for this client.

required
schedule SchedulerConfig | dict[str, Any] | None

Grouped scheduler configuration for always-on DRR fairness.

None
retry RetryConfig | dict[str, Any] | None

Grouped retry policy configuration.

None
throttle AdaptiveThrottleConfig | dict[str, Any] | None

Grouped adaptive throttle policy configuration.

None
quality_check Literal['warn', 'error']

Data quality violation handling mode.

'warn'
concurrency int | None

Explicit fetch concurrency limit.

None
storage StorageConfig | dict[str, Any] | None

Grouped data-lifecycle and write-path tuning settings.

None
limits HTTPConfig | dict[str, Any] | None

Grouped HTTP connection-pool configuration.

None

config property

Resolved internal configuration snapshot.

Returns:

Name Type Description
ResolvedClientConfig ResolvedClientConfig

The configuration object governing rate limits, concurrency, queue sizes, and thresholds used by this client.

Notes

Read-only accessor. Callers should not mutate the returned object in place. Public callers should prefer create_client(...) and grouped config inputs rather than constructing this model directly.

http_client property

Get the active HTTP client.

Raises:

Type Description
RuntimeError

If the client has not been initialized (not in context).

__aenter__() async

Async context manager entry.

Initializes the shared HTTP client.

__aexit__(exc_type, exc, tb) async

Async context manager exit.

Ensures the HTTP client is closed.

aclose() async

Asynchronously close the underlying HTTP client to release resources.

collect_results(writer, table_name, connect_db, *, sort_by_unique_key=True) async

Collect and return results from writer.

Common result collection logic that handles both database and in-memory scenarios.

Parameters:

Name Type Description Default
writer BaseWriter

Writer instance to collect from

required
table_name str

Name of the table to collect

required
connect_db str | Path | None

Database connection (determines collection mode)

required
sort_by_unique_key bool

Whether to sort by schema's unique key if available

True

Returns:

Type Description
RunResult

RunResult for both in-memory and database modes

managed_writer(connect_db, *, show_progress=True) async

Manage writer lifecycle with proper resource cleanup.

This is a common infrastructure component that all providers can use to ensure consistent writer lifecycle management.

Parameters:

Name Type Description Default
connect_db str | Path | None

Database connection string/path, or None for in-memory.

required
show_progress bool

Whether to show progress indicators (default: True).

True

Yields:

Name Type Description
BaseWriter AsyncGenerator[BaseWriter, None]

Properly initialized writer instance.

Raises:

Type Description
Error

If a DuckDB connection cannot be established.

ValidationError

If writer initialization fails due to schema issues.

Exception

Any unexpected errors during writer setup are propagated.

Example

async with self.managed_writer(connect_db, show_progress=True) as writer: result = await self.run_pipeline(..., writer=writer)

run_async(method, url, **kwargs) async

Execute a standard async HTTP request using the underlying client.

Delegates directly to client.request().

run_pipeline(*, router, dataset, symbols, writer, mapper, on_progress=None, progress=False, **kwargs) async

Run the VertexForager pipeline for the given router, dataset, and symbols.

Parameters:

Name Type Description Default
router IRouter

Data router to fetch data from.

required
dataset T

Dataset name (e.g., "price").

required
symbols list[str] | None

List of symbols to fetch data for. If None, fetch all symbols.

required
writer IWriter

Data writer to persist the processed data.

required
mapper IMapper

Schema mapper to transform connector data to sink schema.

required
on_progress Callable[[ProgressSnapshot], None] | None

Optional callback receiving ProgressSnapshot per completed request.

None
progress bool

Whether to show built-in progress output and final summary.

False
**kwargs JSONValue

Additional arguments passed to the pipeline run method.

{}

Returns:

Name Type Description
RunResult RunResult

Summary of the pipeline run, including success/failure status.

Raises:

Type Description
RequestError

If a network error occurs during fetching.

HTTPStatusError

If an HTTP response returns non-2xx.

ValidationError

If schema validation fails during writing.

PrimaryKeyMissingError

When required PK columns are absent.

PrimaryKeyNullError

When PK columns contain nulls.

run_sync(func, *args, **kwargs) async

Execute a blocking (synchronous) function in a separate thread.

This wrapper ensures that blocking library calls (like yfinance, pandas I/O) do not freeze the main asyncio event loop.

Parameters:

Name Type Description Default
func Callable[..., Any]

Callable to execute in a worker thread.

required
*args Any

Positional arguments for the callable.

()
**kwargs Any

Keyword arguments for the callable.

{}

Returns:

Name Type Description
Any Any

The return value of the callable.

Raises:

Type Description
Exception

Any exception raised by the callable is propagated.

Bases: ABC, Generic[T]

Vendor-agnostic base router abstraction for the Vertex Forager pipeline.

The Router acts as the protocol adapter between the generic pipeline engine and specific vendor APIs. It encapsulates all knowledge about URL construction, request parameters, and response parsing logic.

Attributes:

Name Type Description
flexible_schema bool

When True, downstream normalization may allow diagonal concatenation for schema evolution (extra/missing columns). This enables tolerant merging when provider payloads vary across requests.

Key Responsibilities: 1. Job Generation (generate_jobs): Translates high-level data requests (dataset, symbols) into concrete FetchJob objects containing fully formed HTTP request specifications. - Handles pagination logic (generating multiple jobs if needed). - Applies provider-specific query parameters. 2. Response Parsing (parse): Converts raw HTTP response bytes into structured FramePackets (Polars DataFrames) normalized for the pipeline. - Handles CSV/JSON parsing. - Validates response schemas. - Maps vendor-specific field names to pipeline standards.

Design Principles: - Statelessness: Routers should be primarily stateless, processing inputs to outputs without maintaining complex internal state about the pipeline progress. - Isolation: Each Router implementation (e.g., SharadarRouter) contains all vendor-specific logic, keeping the core pipeline engine clean and generic.

provider abstractmethod property

Unique identifier for the data provider.

Returns:

Name Type Description
str str

Provider identifier (e.g., 'sharadar', 'yfinance').

generate_jobs(*, dataset, symbols, **kwargs) abstractmethod

Generate provider-specific HTTP fetch jobs.

Converts high-level requests (dataset, symbols, date filters) into concrete HTTP request specifications. Provider implementations own batching, URL construction, auth, and pagination context.

Parameters:

Name Type Description Default
dataset T

Dataset type (e.g., 'price', 'financials', 'actions').

required
symbols Sequence[str] | None

List of ticker symbols, or None for market-wide data.

required
**kwargs object

Provider-specific options (e.g., dimension, bulk_size).

{}

Yields:

Name Type Description
FetchJob AsyncIterator[FetchJob]

HTTP request spec and context for the executor.

Raises:

Type Description
NotImplementedError

Must be implemented by provider routers.

FetchError

Network/API failures from provider execution.

TransformError

Payload parsing/normalization failures.

Responsibilities
  • Symbol handling: single/multiple/None depending on dataset
  • Batching: choose batch size under API/URL limits
  • URL/params: build endpoints and query parameters
  • Date filters: apply start/end via _parse_date_range when needed
  • Pagination: include cursor/keys in context if supported
  • Auth: attach provider tokens/headers

Examples:

Yielding a pagination job: - dataset="tickers", symbols=None - context includes {"pagination": {"cursor_param": "qopts.cursor_id", "meta_key": "next_cursor_id"}} - params use {"qopts.per_page": "10000"}

parse(*, job, payload) abstractmethod

Normalize provider response into FramePacket(s).

Decodes raw bytes, converts to Polars, performs provider-specific structural normalization in this method (no separate normalize_frame), injects provider metadata, and prepares follow-up pagination jobs when applicable.

Parameters:

Name Type Description Default
job FetchJob

The fetch job that produced this payload.

required
payload bytes

Raw HTTP response bytes.

required

Returns:

Name Type Description
ParseResult ParseResult

Normalized packets and optional next jobs.

Raises:

Type Description
NotImplementedError

Must be implemented by provider routers.

FetchError

Provider-reported API errors mapped to standard exceptions.

TransformError

Data conversion/structural normalization failures.

Responsibilities
  • Format handling: JSON/CSV/binary/pickle
  • Error handling: detect API errors and malformed payloads
  • Data extraction: parse nested provider-specific structures
  • DataFrame creation: build Polars frames
  • Column normalization: use _normalize_columns where applicable
  • Metadata injection: use _add_provider_metadata
  • Empty handling: use _check_empty_response
  • Pagination: derive next jobs from response metadata/context
Notes
  • Structure transformation happens here; strict type casting is delegated to SchemaMapper.
  • Use _parse_date_range for date derivations when needed.

Examples:

Creating follow-up pagination job: - Read "next_cursor_id" from response meta - Set job.spec.params["qopts.cursor_id"] to next value - Append to ParseResult.next_jobs

Pipeline Engine

The pipeline engine coordinates fetch, parse, normalize, and write stages for a run.

High-performance asynchronous data pipeline engine.

This class orchestrates the end-to-end data collection process using a Producer-Consumer architecture with asyncio. It manages three main stages: Job Generation (Producer), Data Fetching (Fetch Workers), and Data Writing (Writer Workers).

Attributes:

Name Type Description
_router IRouter

Router/Queue manager for fetch jobs.

_http HttpExecutor

Handles HTTP requests.

_writer IWriter

Writer task manager.

_mapper IMapper

Normalizes data schemas.

_config ResolvedClientConfig

Internal resolved configuration object.

controller FlowController

Rate limiter and concurrency controller.

_flush_threshold int

Row count threshold for flushing buffers.

Public Methods

run(dataset: str, symbols: Symbols | None, ...) -> RunResult: Execute the full collection pipeline. stop() -> None: Gracefully stop the pipeline.

run(*, dataset, symbols, on_progress=None, progress=False, resume=False, **kwargs) async

Execute the pipeline.

This method sets up the asyncio queues and worker tasks, runs the pipeline until completion, and returns the execution result.

Process: 1. Create req_q (for jobs) and pkt_q (for data packets). 2. Spawn _writer_worker tasks (consumers of processed data). 3. Spawn _fetch_worker tasks (consumers of jobs, producers of data). 4. Spawn _producer task (generator of jobs). 5. Wait for all tasks to complete in order: - Producer finishes -> Queue join. - Fetchers finish -> Queue join. - Writer workers finish. 6. Aggregate results and handle cleanup.

Parameters:

Name Type Description Default
dataset str

Name of the dataset to fetch (e.g., "sep").

required
symbols Symbols | None

List of symbols to fetch, or None for all.

required
on_progress Callable[[ProgressSnapshot], Any] | None

Optional callback receiving ProgressSnapshot on each job completion.

None
progress bool

Whether to show the built-in progress bar and final summary.

False
resume bool

Whether to resume from existing checkpoint (default: False).

False
**kwargs object

Additional arguments passed to the router's generate_jobs method.

{}

Returns:

Name Type Description
RunResult RunResult

Summary of the run including metrics and errors.

Raises:

Type Description
RuntimeError

Orchestration-level failures (e.g., early writer shutdown) or if run() is called while another run is already in progress.

Notes
  • Exceptions raised during fetch/parse/write (e.g., httpx.RequestError, httpx.HTTPStatusError, ValidationError, PrimaryKeyMissingError, PrimaryKeyNullError) are captured and appended to RunResult.errors and are not re-raised by default.
  • Callers should inspect RunResult.errors for per-task failures and only expect orchestration-level issues to raise.
  • When resume=True, symbols already completed in previous runs will be skipped based on checkpoint rows stored in ~/.cache/vertex-forager/state.db.

Pipeline Results

These result models summarize parsed packets, table counts, and run-level outcomes.

Bases: BaseModel

Result summary for a pipeline run.

Attributes:

Name Type Description
provider str

Data provider name.

run_id str | None

Unique identifier for the run (default: None).

dataset str | None

Dataset name for the run (default: None).

started_at float | None

Timestamp when the run started (default: None).

finished_at float | None

Timestamp when the run finished (default: None).

duration_s float | None

Duration of the run in seconds (default: None).

coverage_pct float | None

Coverage percentage for the run (default: None).

data DataFrame | None

In-memory collected payload for non-persisted runs.

tables dict[str, int]

Dictionary mapping table names to row counts (default: empty dict).

errors list[RunError]

List of structured error information (default: empty list).

dlq_pending dict[str, list[FramePacket]]

Packets preserved for post-mortem/dead-letter processing when DLQ spool/dispatch fails. Items are appended by writer/rescue logic upon spool errors and can be consumed by operator recovery flows.

dlq_counts dict[str, dict[str, int]]

Per-table counts for rescued and remaining packets when DLQ is disabled or spooling occurs. Always populated regardless of metrics settings.

quality_violations dict[str, int]

Dictionary mapping table names to quality violation counts (default: empty dict).

Bases: BaseModel

Result of parsing a response.

Attributes:

Name Type Description
packets list[FramePacket]

List of extracted FramePackets containing data.

next_jobs list[FetchJob]

List of subsequent FetchJobs to be executed.

Flow Control

Use these components when tuning throughput, concurrency, and request pacing.

Unified Flow Controller managing both Concurrency and Rate Limiting.

Combines: 1. GradientConcurrencyLimiter: To handle backpressure and optimize throughput based on latency. 2. GCRARateLimiter: To strictly enforce API rate limits (RPM).

Initialize the unified Flow Controller.

Parameters:

Name Type Description Default
requests_per_minute int

int — Allowed requests per minute (RPM).

required
concurrency_limit int | None

int | None — Optional max concurrent requests; if None, auto-computed.

None
adaptive_throttle_window_s int

int — Sliding window (seconds) for error-rate calculation.

60
error_rate_threshold float

float — Error/retry ratio threshold to trigger throttle decrease.

0.2
rpm_floor_ratio float

float — Minimum RPM as a ratio of ceiling (0.0-1.0) during throttle.

0.1
recovery_factor float

float — RPM increment as a fraction of ceiling during recovery (0.0-1.0).

0.05
healthy_window_s int

int — Duration (seconds) of healthy period required to upshift.

60

Raises:

Type Description
ValueError

If requests_per_minute <= 0.

concurrency_limit property

Get the configured maximum concurrency limit.

throttle() async

Context manager to acquire both rate limit and concurrency slot.

Behavior
  • Acquires GCRA rate limiter permission (may sleep).
  • Acquires GradientConcurrencyLimiter slot (may wait).
  • Releases slot with RTT feedback on exit to adapt limits.

Raises:

Type Description
CancelledError

If the enclosing task is cancelled while waiting.

Adaptive Concurrency Controller based on Netflix's Gradient Algorithm.

[What it limits] Limits the number of Concurrent In-flight Requests (Concurrency). It dynamically adjusts the limit (max concurrent requests) to find the system's optimal throughput without overloading it.

[Theoretical Background] Based on Little's Law (L = λW) and Netflix's Gradient Algorithm. It calculates a gradient using the ratio of min_rtt (ideal no-load latency) to observed_rtt (current latency). - Gradient < 1 (RTT increased): System is overloaded -> Decrease limit. - Gradient ~ 1 (RTT stable): System is healthy -> Increase limit slightly (explore capacity).

Formula: new_limit = old_limit * (min_rtt / current_rtt) + queue_size

acquire() async

Acquire a concurrency slot.

release(rtt) async

Release a slot and update the concurrency limit based on RTT.

Async Rate Limiter implementing GCRA (Generic Cell Rate Algorithm).

[What it limits] Limits the Rate of Requests over time (e.g., Requests Per Minute). It ensures the application complies with the API provider's usage quotas (Rate Limits).

[Theoretical Background] GCRA (Generic Cell Rate Algorithm) is a sophisticated Leaky Bucket variant. Instead of using a counter reset window (which causes "thundering herd" at window boundaries), it tracks the Theoretical Arrival Time (TAT) for the next request. - If a request arrives before TAT, it waits (or is rejected). - Allows short Bursts up to a defined limit, but strictly enforces the long-term average rate.

acquire() async

Acquire permission to proceed. Waits if necessary.

set_rpm_async(rpm) async

Safely update RPM under the same lock used by acquire().

Configuration

These types describe the request, retry, writer, and execution settings used throughout the pipeline.

Bases: BaseModel

Retry configuration for HTTP requests.

Attributes:

Name Type Description
max_attempts int

Maximum number of retry attempts (default: 3).

base_backoff_s float

Initial backoff duration in seconds (default: 1.0).

max_backoff_s float

Maximum backoff duration in seconds (default: 30.0).

backoff_mode Literal['full_jitter', 'equal']

Backoff distribution strategy (default: "full_jitter").

retry_status_codes tuple[int, ...]

Tuple of HTTP status codes to trigger retries (default: (429, 503)).

Notes
  • Two backoff modes are supported:
  • "full_jitter" (default): sleep is drawn uniformly from [0, min(max_backoff_s, base_backoff_s * 2^(attempt-1))].
  • "equal": sleep is drawn from [cap/2, cap] where cap = min(max_backoff_s, base_backoff_s * 2^(attempt-1)).
  • Retry-After response headers (integer seconds) take priority over backoff when present.
  • Defaults are conservative: retries on 429 (Too Many Requests) and 503 (Service Unavailable).
  • Opt-in to broader server errors (e.g., 500, 502, 504) ONLY when requests are idempotent. Non-idempotent operations (e.g., POST/PUT without idempotency keys) may cause duplicate side effects. Use idempotency keys or upstream idempotent semantics before enabling broader codes.

Bases: BaseModel

Adaptive throttle policy for dynamic RPM adjustment based on error rate.

Attributes:

Name Type Description
window_s int

Sliding window in seconds used to evaluate recent error rate.

error_rate_threshold float

Error ratio that triggers throttle decrease.

rpm_floor_ratio float

Minimum RPM as a ratio of ceiling (0.0-1.0) maintained while throttled.

recovery_factor float

RPM added as a fraction of ceiling during healthy recovery (0.0-1.0).

healthy_window_s int

Healthy period required before recovering RPM upward.

Notes
  • Uses AIMD (Additive Increase/Multiplicative Decrease) pattern.
  • Multiplicative decrease: new_rpm = effective_rpm * 0.8 when error threshold exceeded.
  • Additive increase: new_rpm = effective_rpm + max(1, ceiling * recovery_factor) when healthy.
  • rpm_floor is resolved to an absolute value at init: floor = max(1, ceiling * rpm_floor_ratio).

Bases: BaseModel

HTTP connection-pool settings exposed as a grouped public config.

Attributes:

Name Type Description
max_connections int

Maximum connection pool size.

max_keepalive_connections int

Maximum keep-alive pool size.

timeout_s float

HTTP request timeout in seconds.

Bases: BaseModel

Scheduler controls for always-on DRR pagination fairness.

Bases: BaseModel

HTTP request specification for a fetch job.

Attributes:

Name Type Description
method HttpMethod

HTTP method to use (default: HttpMethod.GET).

url str

Target URL for the request.

params dict[str, JSONValue]

Query parameters as key-value pairs (default: empty dict).

headers dict[str, str]

HTTP headers as key-value pairs (default: empty dict).

json_body dict[str, JSONValue] | None

JSON payload for POST/PUT requests (default: None).

data bytes | None

Raw bytes payload for requests (default: None).

auth RequestAuth

Authentication strategy to apply (default: RequestAuth()).

idempotent bool

Whether the request is safe to retry without side effects. Defaults to True; set to False to disable automatic retries for non-idempotent operations.

Bases: BaseModel

Unit of work for the fetch pipeline.

Attributes:

Name Type Description
provider str

Data provider name (e.g., 'sharadar').

dataset str

Dataset identifier (e.g., 'SEP', 'SF1').

symbol str | None

Target symbol or ticker if applicable (default: None).

spec RequestSpec

HTTP request specification details.

context Mapping[str, JSONValue]

Additional context for job execution and tracing (default: empty dict).

Bases: BaseModel

Polars frame packet passed from provider to sink.

Attributes:

Name Type Description
provider str

Data provider name.

table str

Target table name for storage.

frame DataFrame

Polars DataFrame containing the data.

observed_at datetime

Timestamp when the data was observed/fetched.

partition_date date | None

Optional date for partitioning logic (default: None).

context Mapping[str, JSONValue]

Metadata context passed along with the data (default: empty dict).

Bases: str, Enum

HTTP method for request execution.

Values

GET: HTTP GET method. POST: HTTP POST method.

Bases: BaseModel

Authentication strategy attached to a request spec.

Attributes:

Name Type Description
kind str

Authentication type — 'none', 'bearer', 'header', or 'query' (default: 'none').

token str | None

Authentication token string if applicable (default: None).

header_name str | None

Name of the header to inject the token into (default: None).

query_param str | None

Name of the query parameter to inject the token into (default: None).

HTTP

The HTTP executor handles transport concerns for provider requests and library fetch dispatch.

Async HTTP Request Executor using httpx.

This class abstracts the low-level HTTP client details and maps RequestSpec objects to actual network requests. It handles authentication header injection and response status checking.

It also supports special schemes like yfinance:// to bypass HTTP and use internal libraries.

Notes
  • Error logs redact URLs using a sanitizer to avoid leaking sensitive query parameters (e.g., API keys) in messages.
  • Example: "https://api.example.com?token=SECRET&cursor=123" -> "[redacted]"

Initialize with an existing client.

Parameters:

Name Type Description Default
client HttpClientProtocol

BaseClient (or compatible interface) to use for requests.

required

fetch(spec) async

Execute a request and return response bytes.

Dispatches to specific fetch implementation based on URL scheme.

Parameters:

Name Type Description Default
spec RequestSpec

Fully defined request specification.

required

Returns:

Name Type Description
bytes bytes

The raw response body.

Raises:

Type Description
HTTPStatusError

If the server returns 4xx/5xx status code.

RequestError

If a network error occurs.

ValueError

If URL scheme is invalid.

TypeError

If library request parameters are invalid.

Retry

Retry helpers centralize backoff policy and retry execution behavior.

Create a tenacity AsyncRetrying controller from configuration.

Parameters:

Name Type Description Default
config RetryConfig

Retry configuration.

required
log_level int

Logging level for retry attempts.

WARNING
retry_on tuple[type[Exception], ...]

Tuple of exception types to retry on. Defaults to (httpx.TransportError,).

(TransportError,)

Returns:

Name Type Description
AsyncRetrying AsyncRetrying

Configured retry controller.

Writers

Writer APIs control how normalized frames are persisted or collected in memory.

Factory function to instantiate the appropriate Writer.

Selection Logic: - None: Returns InMemoryBufferWriter (in-memory). - String URI (duckdb://): Returns DuckDBWriter. - String Path / Path object: Returns DuckDBWriter (assumed file path).

Parameters:

Name Type Description Default
connect_db str | Path | None

Connection string, Path object, or None.

required

Returns:

Name Type Description
BaseWriter BaseWriter

An initialized writer instance.

Raises:

Type Description
NotImplementedError

If a URI scheme is unknown.

Bases: ABC

Abstract Base Class for Data Writers.

The Writer component acts as the final destination in the data pipeline, responsible for persisting normalized FramePackets (Polars DataFrames) to a durable storage backend.

Key Responsibilities: 1. Persistence: Efficiently writing data to disk (DuckDB, Parquet) or memory. 2. Concurrency Safety: Managing thread/async safety for storage engines that require single-writer access (e.g., DuckDB). 3. Bulk Processing: Implementing write_bulk to optimize throughput by reducing transaction overhead. 4. Resource Management: Handling connections, file handles, and proper cleanup via async context managers (__aenter__, __aexit__).

Design Principles: - Schema-Agnostic: Writers receive already-normalized data. They trust the upstream SchemaMapper and do not perform schema validation. - Fail-Fast: Errors during write operations should propagate immediately to stop the pipeline or trigger retry logic.

__aenter__() async

Async context manager entry.

__aexit__(exc_type, exc_val, exc_tb) async

Async context manager exit.

Ensures resources are closed even if an error occurs.

close() async

Close any open resources (connections, files).

Default implementation does nothing. Override if resources need cleanup.

collect_table(table_name, sort_cols=None)

Collect table data in memory for in-memory writers.

Parameters:

Name Type Description Default
table_name str

The name of the table to collect.

required
sort_cols list[str] | None

Optional list of columns to sort by.

None

Returns:

Type Description
DataFrame

pl.DataFrame: The collected DataFrame.

Raises:

Type Description
NotImplementedError

If the writer does not support in-memory collection.

flush() async

Flush any buffered data to storage.

Default implementation does nothing. Override if buffering is used.

write(packet) abstractmethod async

Persist a single data packet.

Parameters:

Name Type Description Default
packet FramePacket

The normalized data packet containing a Polars DataFrame.

required

Returns:

Name Type Description
WriteResult WriteResult

Summary including table name and written row count.

Raises:

Type Description
ComputeError

On data processing errors.

ValidationError

On schema validation errors.

Error

On database specific errors (if applicable).

write_bulk(packets) async

Persist a bulk of data packets.

Default implementation iterates write(). Subclasses should override this for transactional/bulk optimization.

Parameters:

Name Type Description Default
packets list[FramePacket]

List of data packets.

required

Returns:

Type Description
list[WriteResult]

list[WriteResult]: List of results for each packet.

Bases: BaseWriter

DuckDB writer implementation.

Manages a local DuckDB database for storing collected data using Polars integration.

Attributes:

Name Type Description
lock Lock

Lock to ensure single-writer access to the database.

connection DuckDBPyConnection

Active DuckDB connection.

db_path str

Path to the DuckDB database file.

Features
  • Automatic table creation from Polars schema.
  • Upsert semantics based on primary keys.
  • Zero-copy data transfer between Polars and DuckDB.
  • Thread-safe execution using a single-threaded executor.

Initialize DuckDB writer.

Parameters:

Name Type Description Default
db_path str | Path

Path to the DuckDB database file.

required
strict bool

If True, unsupported Polars types raise SchemaMapError. If False, fallback to VARCHAR.

False

close() async

Close the database connection.

Acquires the lock to ensure no write operations are in progress before closing the connection.

Raises:

Type Description
Error

If the connection close operation fails.

compact() async

Optimize database storage.

Runs VACUUM and CHECKPOINT to reclaim space and enforce compression.

Raises:

Type Description
Error

If VACUUM or CHECKPOINT commands fail.

flush() async

Flush is now no-op as we write immediately.

write(packet) async

Write a single packet to DuckDB immediately. Stateless implementation (no buffering).

Parameters:

Name Type Description Default
packet FramePacket

FramePacket containing the table name and DataFrame to write.

required

Returns:

Name Type Description
WriteResult WriteResult

Result object containing table name and number of rows written.

Raises:

Type Description
Error

If DuckDB execution fails during upsert.

ValidationError

If schema validation fails during write.

PrimaryKeyMissingError

Required PK columns are missing in packet.

PrimaryKeyNullError

PK columns contain null values.

write_bulk(packets) async

Write a bulk of packets to DuckDB immediately. Stateless implementation (no buffering).

Parameters:

Name Type Description Default
packets list[FramePacket]

List of FramePackets to write.

required

Returns:

Type Description
list[WriteResult]

list[WriteResult]: List of result objects for each written packet.

Raises:

Type Description
Error

If DuckDB execution fails during batch upsert.

ValidationError

If schema validation fails during write.

PrimaryKeyMissingError

Required PK columns are missing in packets.

PrimaryKeyNullError

PK columns contain null values in packets.

Bases: BaseWriter

In-memory writer for buffering results.

Used when the user wants to get a DataFrame back directly without writing to disk. Accumulates all incoming packets in a dictionary of lists.

Notes
  • Not suitable for massive datasets that exceed memory.
  • Best used for small workloads or unit testing scenarios.
Example

Collect and sort buffered frames for a table: writer = InMemoryBufferWriter() await writer.write(packet) df = writer.collect_table("price", sort_cols=["ticker", "date"])

collect_table(table, sort_cols=None)

Concatenate all buffered parts for a table into a single DataFrame.

Parameters:

Name Type Description Default
table str

Table name (e.g., 'price_bars').

required
sort_cols list[str] | None

Optional list of columns to sort by (e.g., from schema unique_key).

None

Returns:

Type Description
DataFrame

pl.DataFrame: Combined data.

Example

df = writer.collect_table("price", sort_cols=["ticker", "date"])

df contains all buffered parts for 'price', optionally sorted

write(packet) async

Append packet to the in-memory buffer.

Thread-safe via threading lock. Applies deduplication if upsert_keys is configured, matching DuckDB semantics.

Bases: BaseModel

Writer write result.

Attributes:

Name Type Description
table str

The target table name written to.

rows int

Number of rows written.

partitions Mapping[str, str]

Partition key/value pairs created or updated (defaults to empty dict).

Lifecycle

Lifecycle helpers create and finalize the shared state for a pipeline run.

Exceptions

These are the main public exception types you should catch around client usage and integration code.

Bases: Exception

Base exception for all Vertex Forager errors.

Raised when an operation within the Vertex Forager system encounters an error that does not fit a more specific category.

Bases: VertexForagerError

Invalid user or configuration input.

Raised when parameters, identifiers, or configuration values are invalid.

Bases: VertexForagerError

Network or external provider fetch failures.

Raised when HTTP/library calls return errors or unreachable resources.

Bases: VertexForagerError

Data transformation/normalization failures.

Raised during parsing or schema normalization when data cannot be shaped.

Bases: VertexForagerError

Persistence/write failures.

Raised by writers when storage operations fail.

Bases: VertexForagerError

Computation failures during data processing.

Bases: VertexForagerError

Schema or data validation failures.

Bases: ValidationError

Primary key column missing from data.

Indicates that required primary key columns are absent in the dataset being processed or written.

Parameters:

Name Type Description Default
table str

Target table name.

required
column str

Missing primary key column name(s).

required

Attributes:

Name Type Description
table

Target table name.

column

Missing primary key column name(s).

Bases: ValidationError

Primary key contains null values.

Indicates that the primary key column includes one or more nulls which would violate uniqueness constraints or upsert logic.

Parameters:

Name Type Description Default
table str

Target table name.

required
column str

Primary key column name.

required
null_count int

Number of nulls detected.

required

Attributes:

Name Type Description
table

Target table name.

column

Primary key column name.

null_count

Number of nulls detected.

Bases: VertexForagerError

Dead Letter Queue (DLQ) spooling failure.

Raised when persisting failed packets to the DLQ fails. Carries counts that summarize partial rescue progress and remaining items.

Attributes:

Name Type Description
rescued

Number of items successfully rescued (written) before the spool attempt.

remaining

Number of items left to persist in the DLQ when the failure occurred.

original

Optional underlying exception that triggered the spool failure.

Example

raise DLQSpoolError(rescued=1, remaining=3, original=exc)

Core Errors

Core errors cover pipeline-level failures that sit below the top-level public exceptions.

Structured error information for pipeline runs.

Attributes:

Name Type Description
provider str

Data provider name (e.g., "sharadar", "yfinance")

dataset str

Dataset name (e.g., "sep", "actions")

symbol str

Symbol that failed (e.g., "AAPL", "MSFT")

exc_type str

Exception type as string (e.g., "httpx.HTTPStatusError")

message str

Error message

retryable bool

Whether this error is retryable (True for 429/503/network errors)

from_exception(exc, provider, dataset, symbol) classmethod

Create a RunError from an exception.

Parameters:

Name Type Description Default
exc Exception

The exception that occurred

required
provider str

Data provider name

required
dataset str

Dataset name

required
symbol str | None

Symbol that failed

required

Returns:

Name Type Description
RunError RunError

Structured error information

Utilities

Utility helpers provide small convenience functions for env parsing, ticker validation, and progress updates.

Convert a pipeline result object to a serializable dictionary.

Handles cases where the object is None or has different metric structures.

Validate a list of ticker symbols.

Parameters:

Name Type Description Default
symbols list[str] | tuple[str, ...]

Ticker symbols container (list or tuple of strings).

required

Raises:

Type Description
InputError

If symbols is not a list/tuple, empty, contains non-string items, or any item is empty or has leading/trailing whitespace.

Read a boolean environment variable.

Parameters:

Name Type Description Default
name str

Environment variable name.

required
default bool

Default value if not set.

False

Returns:

Type Description
bool

True if value is "1", "true", "yes", "on" (case-insensitive).

Read an integer environment variable.

Parameters:

Name Type Description Default
name str

Environment variable name.

required
default int | None

Default value if not set or invalid.

None

Returns:

Type Description
int | None

Integer value, or default if parsing fails.

Read a float environment variable.

Parameters:

Name Type Description Default
name str

Environment variable name.

required
default float | None

Default value if not set or invalid.

None

Returns:

Type Description
float | None

Float value, or default if parsing fails.

Create a progress bar update callback.

Parameters:

Name Type Description Default
pbar tqdm

The tqdm progress bar instance.

required

Returns:

Type Description
Callable[[ProgressSnapshot], None]

Callable to update the progress bar.