Python Pipeline Integration Reference¶
Overview¶
This reference documents the integration modules that connect the Pure Python Pipeline components together.
Module: aplus_discovery_integrator.py¶
Function: integrate_aplus_discovery_with_deep_analysis¶
Integrates A+ discovery with deep analysis results by reading JSON exports.
Signature:
Parameters:
session_id(str): Session identifier for finding analysis files
Returns:
dict[str, Any]: Discovery integration results containing:has_a_plus_analysis(bool): Whether A+ analysis existstotal_opportunities_found(int): Number of A+ opportunitiesaplus_holdings(list): List of A+ holding detailstotal_analyzed(int): Total holdings analyzedsession_id(str): Session identifierintegration_timestamp(str): ISO timestamp
Example:
from finwiz.integration.aplus_discovery_integrator import integrate_aplus_discovery_with_deep_analysis
discovery_results = integrate_aplus_discovery_with_deep_analysis(
session_id="analysis_session_123"
)
if discovery_results["has_a_plus_analysis"]:
print(f"Found {discovery_results['total_opportunities_found']} A+ opportunities")
for holding in discovery_results["aplus_holdings"]:
print(f" - {holding['ticker']}: Grade {holding['grade']}")
Data Sources:
- Scans
output/stock/*.jsonfor stock analysis files - Scans
output/etf/*.jsonfor ETF analysis files - Scans
output/crypto/*.jsonfor crypto analysis files - Reads
output/deep_analysis_consolidated_{session_id}.jsonif available
Filtering Logic:
- Only includes holdings with grade "A+" or "A"
- Excludes holdings with grades "B", "C", "D", "F"
- Removes duplicate tickers
Error Handling:
- Returns empty results if directories don't exist
- Logs warnings for unreadable files
- Continues processing even if individual files fail
Module: backtesting_pipeline_connector.py¶
Function: connect_backtesting_to_discovery_results¶
Connects backtesting pipeline to discovery results and executes backtesting for A+ candidates.
Signature:
Parameters:
session_id(str): Session identifier for finding discovery files
Returns:
dict[str, Any]: Backtesting execution results containing:backtesting_executed(bool): Whether backtesting was executedcandidates_count(int): Number of candidates testedcandidates(list): List of candidate detailsresults(list): List of backtesting resultsexecution_time_seconds(float): Total execution timeresults_file(str): Path to results JSON filesession_id(str): Session identifier
Example:
from finwiz.integration.backtesting_pipeline_connector import connect_backtesting_to_discovery_results
backtesting_results = connect_backtesting_to_discovery_results(
session_id="analysis_session_123"
)
if backtesting_results["backtesting_executed"]:
print(f"Backtested {backtesting_results['candidates_count']} candidates")
for result in backtesting_results["results"]:
print(f" {result['ticker']}: {result['annual_return']:.1%} return, "
f"Sharpe {result['sharpe_ratio']:.2f}")
Data Sources:
- Calls
integrate_aplus_discovery_with_deep_analysis()to get candidates - Falls back to reading discovery JSON files if integration fails
- Removes duplicate candidates
Backtesting Metrics:
annual_return(float): Annualized returnsharpe_ratio(float): Risk-adjusted return metricmax_drawdown(float): Maximum peak-to-trough declinewin_rate(float): Percentage of winning tradesbacktest_period(str): Time period testedstatus(str): Execution status
Output Files:
- Saves results to
output/backtesting_results_{session_id}.json - Includes candidates, results, and execution metadata
Error Handling:
- Returns
backtesting_executed: falseif no candidates found - Logs errors and continues processing
- Includes error details in return value
Module: aplus_extractor.py¶
Class: APlusDataExtractor¶
Extracts and processes A+ opportunity data from discovery crew outputs.
Constructor:
Parameters:
output_dir(Path): Base output directory containing crew outputs
Methods:
extract_aplus_opportunities¶
Extracts A+ opportunities from all discovery crew files.
Signature:
Returns:
APlusOpportunityCollection: Collection of A+ opportunities, or None if extraction fails
Example:
from finwiz.integration.aplus_extractor import APlusDataExtractor
extractor = APlusDataExtractor()
collection = extractor.extract_aplus_opportunities()
if collection:
print(f"Stock opportunities: {len(collection.stock_opportunities)}")
print(f"ETF opportunities: {len(collection.etf_opportunities)}")
print(f"Crypto opportunities: {len(collection.crypto_opportunities)}")
print(f"Confidence score: {collection.confidence_score:.2f}")
Data Sources:
- Reads
output/discovery/a_plus_stocks.json - Reads
output/discovery/a_plus_etfs.json - Reads
output/discovery/a_plus_crypto.json - Reads
output/discovery/discovery_latest.jsonfor market context
Extraction Process:
- Loads JSON files from discovery directory
- Cleans JSON content (fixes Python-style numeric literals)
- Extracts opportunities with grades "A+" or "A"
- Calculates composite scores from fundamentals
- Generates discovery summary
- Calculates confidence score
- Extracts allocation recommendations and replacement notes
- Extracts market context and backtesting metrics
validate_aplus_opportunities¶
Validates A+ opportunities collection for completeness and quality.
Signature:
def validate_aplus_opportunities(
self,
collection: APlusOpportunityCollection
) -> tuple[bool, list[str]]
Parameters:
collection(APlusOpportunityCollection): Collection to validate
Returns:
tuple[bool, list[str]]: (is_valid, list_of_validation_errors)
Validation Checks:
- At least one opportunity found
- Confidence score ≥ 0.5
- Discovery summary ≥ 50 characters
- Allocation recommendations provided
- No duplicate symbols
Example:
extractor = APlusDataExtractor()
collection = extractor.extract_aplus_opportunities()
if collection:
is_valid, errors = extractor.validate_aplus_opportunities(collection)
if is_valid:
print("✅ Validation passed")
else:
print("❌ Validation failed:")
for error in errors:
print(f" - {error}")
Integration Patterns¶
Pattern 1: Complete Pipeline Execution¶
Execute all pipeline components in sequence:
from finwiz.scoring.portfolio_deep_analyzer import analyze_portfolio_with_python
from finwiz.integration.aplus_discovery_integrator import integrate_aplus_discovery_with_deep_analysis
from finwiz.integration.backtesting_pipeline_connector import connect_backtesting_to_discovery_results
from finwiz.reporting.python_report_generator import generate_python_report
# Step 1: Deep Analysis
analysis_results = analyze_portfolio_with_python(
holdings=portfolio_holdings,
session_id=session_id
)
# Step 2: A+ Discovery
discovery_results = integrate_aplus_discovery_with_deep_analysis(
session_id=session_id
)
# Step 3: Backtesting
backtesting_results = connect_backtesting_to_discovery_results(
session_id=session_id
)
# Step 4: Report Generation
report_path = generate_python_report(
portfolio_review=portfolio_review,
deep_analysis_results=analysis_results,
session_id=session_id
)
Pattern 2: Conditional Execution¶
Execute components conditionally based on results:
# Always run deep analysis
analysis_results = analyze_portfolio_with_python(
holdings=portfolio_holdings,
session_id=session_id
)
# Only run discovery if analysis succeeded
if analysis_results["successful_analyses"] > 0:
discovery_results = integrate_aplus_discovery_with_deep_analysis(
session_id=session_id
)
# Only run backtesting if A+ opportunities found
if discovery_results["has_a_plus_analysis"]:
backtesting_results = connect_backtesting_to_discovery_results(
session_id=session_id
)
Pattern 3: Error Handling¶
Implement comprehensive error handling:
try:
# Deep Analysis
analysis_results = analyze_portfolio_with_python(
holdings=portfolio_holdings,
session_id=session_id
)
except Exception as e:
logger.error(f"Deep analysis failed: {e}")
analysis_results = {"successful_analyses": 0}
try:
# A+ Discovery
discovery_results = integrate_aplus_discovery_with_deep_analysis(
session_id=session_id
)
except Exception as e:
logger.error(f"A+ discovery failed: {e}")
discovery_results = {"has_a_plus_analysis": False}
try:
# Backtesting
if discovery_results.get("has_a_plus_analysis"):
backtesting_results = connect_backtesting_to_discovery_results(
session_id=session_id
)
except Exception as e:
logger.error(f"Backtesting failed: {e}")
backtesting_results = {"backtesting_executed": False}
# Always generate report (even with partial data)
report_path = generate_python_report(
portfolio_review=portfolio_review,
deep_analysis_results=analysis_results,
session_id=session_id
)
File Structure¶
Input Files¶
output/
├── stock/
│ ├── AAPL_{session_id}.json
│ ├── MSFT_{session_id}.json
│ └── ...
├── etf/
│ ├── SPY_{session_id}.json
│ └── ...
├── crypto/
│ ├── BTC_{session_id}.json
│ └── ...
└── deep_analysis_consolidated_{session_id}.json
Output Files¶
output/
├── aplus_discovery_{session_id}.json
├── backtesting_results_{session_id}.json
└── finwiz_family_financial_plan.html
Performance Characteristics¶
Execution Times¶
| Component | Typical Time | Notes |
|---|---|---|
| A+ Discovery Integration | <1 second | File I/O only |
| Backtesting Pipeline | 2-5 seconds | Depends on candidate count |
| A+ Data Extraction | <1 second | JSON parsing |
Memory Usage¶
| Component | Typical Memory | Notes |
|---|---|---|
| A+ Discovery Integration | <10 MB | Minimal memory footprint |
| Backtesting Pipeline | <50 MB | Depends on candidate count |
| A+ Data Extraction | <20 MB | JSON data in memory |
Error Codes¶
A+ Discovery Integration¶
has_a_plus_analysis: false- No A+ opportunities foundtotal_opportunities_found: 0- Analysis completed but no A+ gradeserrorkey present - Exception occurred during integration
Backtesting Pipeline¶
backtesting_executed: false- Backtesting not executedreason: "No A+ candidates available"- No candidates to testerrorkey present - Exception occurred during execution
Troubleshooting¶
Issue: No A+ Opportunities Found¶
Symptoms:
has_a_plus_analysis: falsetotal_opportunities_found: 0
Possible Causes:
- Deep analysis not completed
- No holdings achieved A+ or A grade
- JSON files not in expected directories
- Session ID mismatch
Solutions:
- Verify deep analysis completed successfully
- Check analysis results for grades
- Verify output directory structure
- Ensure consistent session_id usage
Issue: Backtesting Not Executing¶
Symptoms:
backtesting_executed: falsereason: "No A+ candidates available"
Possible Causes:
- A+ discovery found no opportunities
- Discovery integration failed
- Candidate list empty
Solutions:
- Run A+ discovery integration first
- Check discovery results for candidates
- Verify discovery JSON files exist