Skip to content

Analysis DAG Modules API

DAG (Directed Acyclic Graph) modules for data processing workflows.

Overview

The chronos_lab.analysis.dag modules provide pipeline functions used to build Apache Hamilton graphs. These functions are composable and follow DAG patterns for data transformations.

Data Input/Output

chronos_lab.analysis.dag.io.ohlcv_from__yfinance

ohlcv_from__yfinance(ohlcv_from_config: Dict[str, Any]) -> pd.DataFrame

Fetch OHLCV data from Yahoo Finance using provided configuration.

chronos_lab.analysis.dag.io.ohlcv_from__intrinio

ohlcv_from__intrinio(ohlcv_from_config: Dict[str, Any]) -> pd.DataFrame

Fetch OHLCV data from Intrinio API using provided configuration.

chronos_lab.analysis.dag.io.ohlcv_from__arcticdb

ohlcv_from__arcticdb(ohlcv_from_config: Dict[str, Any]) -> pd.DataFrame

Retrieve OHLCV data from ArcticDB storage using provided configuration.

chronos_lab.analysis.dag.io.ohlcv_from__disabled

ohlcv_from__disabled(source_ohlcv: DataFrame) -> pd.DataFrame

Pass through source OHLCV DataFrame without fetching from external sources.

chronos_lab.analysis.dag.io.analysis_to_dataset__enabled

analysis_to_dataset__enabled(analysis_result_dataset: DataFrame, to_dataset_config: Dict[str, Any]) -> Dict[str, Any]

Save analysis results to a named dataset with optional TTL configuration.

chronos_lab.analysis.dag.io.analysis_to_dataset__disabled

analysis_to_dataset__disabled(analysis_result_dataset: Optional[DataFrame]) -> Dict[str, Any]

Return empty result when dataset output is disabled.

chronos_lab.analysis.dag.io.analysis_to_arcticdb__enabled

analysis_to_arcticdb__enabled(analysis_result_arcticdb: Dict[str, Any], to_arcticdb_config: Dict[str, Any]) -> Dict[str, Any]

Save analysis results to ArcticDB with configurable backend and symbol naming.

chronos_lab.analysis.dag.io.analysis_to_arcticdb__disabled

analysis_to_arcticdb__disabled(analysis_result_arcticdb: Optional[Dict[str, Any]]) -> Dict[str, Any]

Return empty result when ArcticDB output is disabled.

Data Standardization

chronos_lab.analysis.dag.standardize.standardize_ohlcv

standardize_ohlcv(ohlcv_from: DataFrame, use_adjusted: bool = True) -> pd.DataFrame

Standardize OHLCV DataFrame to consistent column names and validate MultiIndex structure.

chronos_lab.analysis.dag.standardize.validate_ohlcv

validate_ohlcv(standardize_ohlcv: DataFrame) -> pd.DataFrame

Validate OHLCV data for required columns, missing values, and logical constraints.

chronos_lab.analysis.dag.standardize.split_ohlcv_by_symbol

split_ohlcv_by_symbol(validate_ohlcv: DataFrame) -> Parallelizable[pd.DataFrame]

Split MultiIndex OHLCV DataFrame into separate DataFrames per symbol for parallel processing.

Feature Engineering

chronos_lab.analysis.dag.features.ohlcv_features

ohlcv_features(split_ohlcv_by_symbol: DataFrame, ohlcv_features_list: List[str]) -> pd.DataFrame

Compute features from OHLCV data including returns, volume changes, and volatility.

Anomaly Detection

chronos_lab.analysis.dag.anomaly.detect_ohlcv_features_anomalies

detect_ohlcv_features_anomalies(ohlcv_features: DataFrame, ohlcv_features_list: List[str], isolation_forest_config: Dict[str, Any] = None) -> pd.DataFrame

Run Isolation Forest on computed features to detect anomalies and assign scores and ranks.

chronos_lab.analysis.dag.anomaly.ohlcv_by_symbol_with_features_anomalies

ohlcv_by_symbol_with_features_anomalies(ohlcv_features: DataFrame, detect_ohlcv_features_anomalies: DataFrame) -> pd.DataFrame

Join anomaly detection results with original OHLCV features DataFrame.

chronos_lab.analysis.dag.anomaly.analysis_result

analysis_result(ohlcv_by_symbol_with_features_anomalies: Collect[DataFrame]) -> pd.DataFrame

Collect and concatenate anomaly results from parallel symbol processing into unified DataFrames.

chronos_lab.analysis.dag.anomaly.analysis_result_dataset__enabled

analysis_result_dataset__enabled(analysis_result: DataFrame) -> pd.DataFrame

chronos_lab.analysis.dag.anomaly.analysis_result_dataset__disabled

analysis_result_dataset__disabled(analysis_result: DataFrame) -> pd.DataFrame

chronos_lab.analysis.dag.anomaly.analysis_result_arcticdb__enabled

analysis_result_arcticdb__enabled(analysis_result: DataFrame, to_arcticdb_config: Dict[str, Any]) -> Dict[str, Any]

chronos_lab.analysis.dag.anomaly.analysis_result_arcticdb__disabled

analysis_result_arcticdb__disabled(analysis_result: DataFrame) -> Dict[str, Any]