Batch Processing Guide¶
Overview¶
FinWiz's Batch Processing System revolutionizes portfolio analysis by implementing parallel data fetching and concurrent crew execution. This system delivers 10-20x performance improvements, reducing analysis time from hours to minutes while maintaining analysis quality.
Architecture¶
High-Level Flow¶
Portfolio Holdings (66 tickers)
↓
┌─────────────────────────────────────────────────────┐
│ Phase 1: Batch Data Pre-Fetching (2-5 seconds) │
│ │
│ Yahoo Finance API: All 66 tickers in parallel │
│ Alpha Vantage API: Rate-limited requests (optional)│
│ │
│ Result: Pre-fetched data cache for all tickers │
└─────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────┐
│ Phase 2: Concurrent Crew Execution (15-35 min) │
│ │
│ Batch 1: [AAPL, MSFT, GOOGL, TSLA, NVDA] │
│ Batch 2: [AMZN, META, NFLX, CRM, ADBE] │
│ Batch 3: [ORCL, INTC, AMD, QCOM, AVGO] │
│ ... │
│ Batch 14: [Final remaining tickers] │
│ │
│ Each batch: 5 crews running in parallel │
│ Zero API latency (uses pre-fetched data) │
└─────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────┐
│ Phase 3: Results Consolidation (< 1 minute) │
│ │
│ Collect all crew results │
│ Generate performance metrics │
│ Create consolidated portfolio analysis │
└─────────────────────────────────────────────────────┘
Key Components¶
1. BatchDataPreFetcher¶
The BatchDataPreFetcher class handles parallel data fetching:
from finwiz.utils.batch_data_prefetcher import BatchDataPreFetcher
# Initialize prefetcher
prefetcher = BatchDataPreFetcher(
tickers=["AAPL", "MSFT", "GOOGL"],
session_id="analysis-session-123"
)
# Fetch all data in parallel
prefetched_data = await prefetcher.prefetch_all_data()
# Data structure:
# {
# "AAPL": {
# "yahoo_finance": {...},
# "alpha_vantage": {...}, # Optional
# "failed": False
# },
# "MSFT": {...},
# "GOOGL": {...}
# }
2. Concurrent Crew Execution¶
The Flow orchestrator manages concurrent crew execution:
# Flow method for batch execution
@listen("check_portfolio")
def execute_deep_analysis_with_prefetch(self) -> dict[str, Any]:
"""Execute deep analysis with batch processing."""
# Phase 1: Batch data pre-fetching
prefetcher = BatchDataPreFetcher(tickers=underperforming_tickers)
prefetched_data = await prefetcher.prefetch_all_data()
# Phase 2: Concurrent crew execution in batches
batch_size = get_batch_size()
batches = create_batches(underperforming_tickers, batch_size)
for batch_num, batch_tickers in enumerate(batches):
# Execute batch concurrently
batch_results = await asyncio.gather(*[
execute_deep_analysis_crew(ticker, prefetched_data[ticker])
for ticker in batch_tickers
])
# Process batch results
for ticker, result in zip(batch_tickers, batch_results):
deep_analysis_results[ticker] = result
return {"deep_analysis_results": deep_analysis_results}
Configuration¶
Environment Variables¶
| Variable | Default | Description | Impact |
|---|---|---|---|
BATCH_PREFETCH_ENABLED |
true |
Enable/disable batch processing | 10-20x performance improvement |
ALPHA_VANTAGE_RATE_LIMIT |
5 |
Alpha Vantage API calls per minute | Controls secondary data source speed |
BATCH_PREFETCH_MIN_HOLDINGS |
10 |
Minimum holdings to trigger batch mode | Avoids overhead for small portfolios |
DEEP_ANALYSIS_BATCH_SIZE |
5 |
Concurrent crew execution batch size | Balances speed vs memory usage |
ENABLE_ALPHA_VANTAGE |
false |
Use Alpha Vantage as secondary source | Adds ~13 minutes for 66 tickers |
Configuration Examples¶
Optimal Performance (Recommended):
# Maximum speed with Yahoo Finance only
BATCH_PREFETCH_ENABLED=true
DEEP_ANALYSIS_BATCH_SIZE=5
ENABLE_ALPHA_VANTAGE=false
ALPHA_VANTAGE_RATE_LIMIT=5
BATCH_PREFETCH_MIN_HOLDINGS=10
Premium Alpha Vantage Setup:
# For users with premium Alpha Vantage API
BATCH_PREFETCH_ENABLED=true
DEEP_ANALYSIS_BATCH_SIZE=8
ENABLE_ALPHA_VANTAGE=true
ALPHA_VANTAGE_RATE_LIMIT=75 # Premium tier
BATCH_PREFETCH_MIN_HOLDINGS=5
Memory-Constrained Environment:
# Smaller batches for limited memory
BATCH_PREFETCH_ENABLED=true
DEEP_ANALYSIS_BATCH_SIZE=3
ENABLE_ALPHA_VANTAGE=false
BATCH_PREFETCH_MIN_HOLDINGS=15
Debugging/Development:
# Disable batch processing for debugging
BATCH_PREFETCH_ENABLED=false
# Falls back to sequential mode (1 ticker at a time)
Automatic Configuration Loading¶
The system automatically loads and validates configuration:
from finwiz.config.batch_prefetch_config import get_batch_prefetch_config
# Load configuration with validation and logging
config = get_batch_prefetch_config(log_config=True)
# Configuration is automatically validated:
# - Rate limits must be >= 1 and <= 100
# - Minimum holdings must be >= 1
# - Boolean values accept multiple formats (true/false, 1/0, yes/no, on/off)
print(f"Batch mode enabled: {config.enabled}")
print(f"Alpha Vantage rate limit: {config.alpha_vantage_rate_limit}")
print(f"Minimum holdings for batch: {config.min_holdings_for_batch}")
Data Sources¶
Primary Source: Yahoo Finance¶
Always Enabled - Provides comprehensive data for all analysis needs:
- Company Information: Name, sector, industry, market cap
- Fundamental Data: P/E ratio, ROE, debt-to-equity, revenue growth
- Price Data: Current price, 52-week high/low, volume
- Historical Data: Price history, dividend history, splits
- Technical Data: Moving averages, volatility, beta
Performance Characteristics:
- Speed: ~2-5 seconds for 66 tickers
- Rate Limit: 600 requests/minute (very generous)
- Coverage: All essential data for comprehensive analysis
- Reliability: High uptime, consistent data quality
Secondary Source: Alpha Vantage (Optional)¶
Disabled by Default - Provides additional fundamental data:
- Enhanced Fundamentals: Detailed earnings estimates, analyst ratings
- Additional Metrics: Advanced financial ratios, sector comparisons
- Earnings Data: Quarterly earnings, earnings surprises
Performance Characteristics:
- Speed: ~13 minutes for 66 tickers (free tier)
- Rate Limit: 5 calls/minute (free), 75 calls/minute (premium)
- Coverage: Supplementary data (Yahoo Finance covers essentials)
- Recommendation: Disable for optimal performance
Data Source Comparison¶
| Metric | Yahoo Finance | Alpha Vantage |
|---|---|---|
| Speed (66 tickers) | 2-5 seconds | 13 minutes (free) / 1 minute (premium) |
| Rate Limit | 600/minute | 5/minute (free) / 75/minute (premium) |
| Essential Data | ✅ Complete | ✅ Complete |
| Additional Data | ❌ Limited | ✅ Extensive |
| Cost | Free | Free tier / Premium |
| Recommendation | Always use | Disable unless needed |
Performance Benchmarks¶
Execution Time Comparison¶
| Portfolio Size | Sequential Mode | Batch Mode | Speedup Factor |
|---|---|---|---|
| 10 holdings | 50-100 minutes | 2-5 minutes | 10-20x |
| 30 holdings | 2.5-5 hours | 5-15 minutes | 10-20x |
| 66 holdings | 5.5-11 hours | 20-40 minutes | 16-20x |
| 100 holdings | 8.3-16.7 hours | 17-50 minutes | 10-20x |
Detailed Performance Breakdown (66 Holdings)¶
| Phase | Sequential Mode | Batch Mode | Improvement |
|---|---|---|---|
| Data Fetching | 330-660 minutes | 2-5 minutes | 66-132x faster |
| Crew Execution | 330-660 minutes | 15-35 minutes | 9-19x faster |
| Total Time | 660-1320 minutes | 20-40 minutes | 16-33x faster |
Memory Usage¶
| Portfolio Size | Peak Memory Usage | Average Memory Usage |
|---|---|---|
| 10 holdings | 150-200 MB | 100-150 MB |
| 30 holdings | 250-350 MB | 200-250 MB |
| 66 holdings | 400-500 MB | 300-400 MB |
| 100 holdings | 500-600 MB | 400-500 MB |
Batch Size Optimization¶
Automatic Batch Sizing¶
The system automatically determines optimal batch sizes:
def get_recommended_batch_size(portfolio_size: int) -> int:
"""Get recommended batch size based on portfolio size."""
if portfolio_size <= 10:
return min(3, portfolio_size) # Small portfolios: quality over speed
elif portfolio_size <= 30:
return min(5, portfolio_size // 3) # Medium portfolios: balanced
elif portfolio_size <= 100:
return min(8, portfolio_size // 8) # Large portfolios: speed optimization
else:
return min(12, portfolio_size // 15) # Very large: maximum parallelization
Batch Size Guidelines¶
| Portfolio Size | Recommended Batch Size | Rationale |
|---|---|---|
| 1-10 holdings | 3 | Small portfolios benefit from quality focus |
| 10-30 holdings | 5 | Balanced approach for medium portfolios |
| 30-100 holdings | 8 | Speed optimization for large portfolios |
| 100+ holdings | 12 | Maximum parallelization for very large portfolios |
Memory-Based Batch Sizing¶
The system monitors memory usage and adjusts batch sizes:
import psutil
from finwiz.utils.memory_manager import MemoryManager
def adjust_batch_size_for_memory(base_batch_size: int) -> int:
"""Adjust batch size based on available memory."""
memory_manager = MemoryManager()
available_memory_gb = memory_manager.get_available_memory_gb()
if available_memory_gb < 2.0:
# Low memory: reduce batch size
return max(1, base_batch_size // 2)
elif available_memory_gb > 8.0:
# High memory: can increase batch size
return min(15, base_batch_size * 2)
else:
# Normal memory: use base batch size
return base_batch_size
Error Handling & Resilience¶
Partial Failure Handling¶
The system gracefully handles individual ticker failures:
# Example: Yahoo Finance batch fetch with error handling
async def _fetch_yahoo_finance_batch(self, tickers: list[str]) -> dict[str, Any]:
"""Fetch Yahoo Finance data for multiple tickers with error handling."""
results = {}
failed_tickers = []
try:
# Attempt batch download
data = yf.download(tickers, period="1y", group_by="ticker")
for ticker in tickers:
try:
# Process individual ticker data
ticker_data = self._process_yahoo_data(ticker, data)
results[ticker] = {
"yahoo_finance": ticker_data,
"failed": False
}
except Exception as e:
# Individual ticker failed - continue with others
logger.error(f"Failed to process Yahoo Finance data for {ticker}: {e}")
failed_tickers.append(ticker)
results[ticker] = {"failed": True, "error": str(e)}
except Exception as e:
# Entire batch failed - mark all as failed
logger.error(f"Yahoo Finance batch download failed: {e}")
for ticker in tickers:
results[ticker] = {"failed": True, "error": str(e)}
failed_tickers.append(ticker)
# Log summary
if failed_tickers:
logger.warning(f"Yahoo Finance batch: {len(failed_tickers)} failed out of {len(tickers)}")
logger.warning(f"Failed tickers: {failed_tickers}")
return results
Complete Failure Fallback¶
If batch processing fails completely, the system falls back to sequential mode:
def _fallback_to_sequential_mode(self, reason: str) -> dict[str, Any]:
"""Fallback to sequential analysis mode."""
logger.warning(f"Falling back to sequential mode: {reason}")
# Update state to indicate fallback
self.state.batch_prefetch_enabled = False
self.state.fallback_reason = reason
self.state.fallback_timestamp = datetime.now()
# Execute sequential analysis
return self._run_deep_analysis_sequential()
Failure Detection Logic¶
The system detects various failure scenarios:
def _should_fallback_to_sequential(self, prefetched_data: dict) -> tuple[bool, str]:
"""Determine if we should fallback to sequential mode."""
total_tickers = len(prefetched_data)
failed_tickers = sum(1 for data in prefetched_data.values() if data.get("failed", False))
failure_rate = failed_tickers / total_tickers if total_tickers > 0 else 0
# Fallback if failure rate is too high
if failure_rate > 0.5: # More than 50% failed
return True, f"High failure rate: {failure_rate:.1%} ({failed_tickers}/{total_tickers})"
# Fallback if no data was fetched at all
if total_tickers == 0:
return True, "No tickers to analyze"
# Continue with batch mode
return False, ""
Performance Monitoring¶
Comprehensive Metrics Tracking¶
The system tracks detailed performance metrics:
@dataclass
class BatchPrefetchMetrics:
"""Comprehensive batch processing metrics."""
# Basic counts
total_tickers: int = 0
successful_tickers: int = 0
failed_tickers: int = 0
# Timing metrics
prefetch_duration_seconds: float = 0.0
crew_execution_duration_seconds: float = 0.0
total_duration_seconds: float = 0.0
# Performance metrics
time_savings_percentage: float = 0.0
estimated_sequential_time_seconds: float = 0.0
# Batch configuration
batch_size: int = 5
total_batches: int = 0
# Resource usage
memory_usage_mb: float = 0.0
peak_memory_usage_mb: float = 0.0
# Error tracking
failed_ticker_list: list[str] = field(default_factory=list)
error_summary: dict[str, int] = field(default_factory=dict)
Real-Time Performance Logging¶
The system provides detailed logging during execution:
2025-01-25 10:30:00 - INFO - Starting batch data pre-fetch for 66 tickers
2025-01-25 10:30:00 - INFO - Yahoo Finance: Fetching data for all 66 tickers in parallel
2025-01-25 10:30:04 - INFO - Yahoo Finance: Completed in 4.2 seconds (66/66 successful)
2025-01-25 10:30:04 - INFO - Alpha Vantage: Skipped (ENABLE_ALPHA_VANTAGE=false)
2025-01-25 10:30:04 - INFO - Batch pre-fetch completed: 66 successful, 0 failed
2025-01-25 10:30:04 - INFO - Starting concurrent crew execution (batch size: 5)
2025-01-25 10:30:04 - INFO - Batch 1/14: Processing AAPL, MSFT, GOOGL, TSLA, NVDA
2025-01-25 10:32:18 - INFO - Batch 1/14: Completed in 134.2 seconds
2025-01-25 10:32:18 - INFO - Batch 2/14: Processing AMZN, META, NFLX, CRM, ADBE
...
2025-01-25 11:00:45 - INFO - All batches completed successfully
2025-01-25 11:00:45 - INFO - Performance Summary:
2025-01-25 11:00:45 - INFO - Total time: 30.75 minutes
2025-01-25 11:00:45 - INFO - Estimated sequential time: 5.5 hours
2025-01-25 11:00:45 - INFO - Time savings: 89.1% (10.7x faster)
2025-01-25 11:00:45 - INFO - Memory usage: 456 MB peak
Performance Metrics File¶
Detailed metrics are saved to JSON for analysis:
{
"batch_prefetch_metrics": {
"session_id": "portfolio-analysis-20250125-103000",
"timestamp": "2025-01-25T10:30:00Z",
"total_tickers": 66,
"successful_tickers": 66,
"failed_tickers": 0,
"prefetch_duration_seconds": 4.2,
"crew_execution_duration_seconds": 1841.3,
"total_duration_seconds": 1845.5,
"time_savings_percentage": 89.1,
"estimated_sequential_time_seconds": 19800.0,
"batch_size": 5,
"total_batches": 14,
"memory_usage_mb": 456.7,
"peak_memory_usage_mb": 523.1,
"failed_ticker_list": [],
"error_summary": {},
"data_sources": {
"yahoo_finance": {
"enabled": true,
"successful_tickers": 66,
"failed_tickers": 0,
"duration_seconds": 4.2
},
"alpha_vantage": {
"enabled": false,
"successful_tickers": 0,
"failed_tickers": 0,
"duration_seconds": 0.0
}
}
}
}
Best Practices¶
Production Deployment¶
- Use Default Configuration: The default settings are optimized for most use cases
- Monitor Memory Usage: Set up alerts for high memory usage during batch processing
- Disable Alpha Vantage: Yahoo Finance provides all essential data for most analyses
- Monitor Performance Metrics: Track batch processing performance over time
- Set Up Error Alerts: Monitor for high failure rates or fallback events
Development and Testing¶
- Test with Small Portfolios: Start with 5-10 holdings to verify configuration
- Monitor Logs: Watch batch processing logs for errors or performance issues
- Test Fallback Scenarios: Verify sequential mode works when batch processing fails
- Memory Profiling: Profile memory usage with different batch sizes
- API Rate Limit Testing: Test with different rate limit configurations
Optimization Strategies¶
- Portfolio Size-Based Configuration:
def optimize_for_portfolio_size(portfolio_size: int) -> dict[str, str]:
"""Optimize configuration based on portfolio size."""
if portfolio_size <= 10:
return {
"DEEP_ANALYSIS_BATCH_SIZE": "3",
"BATCH_PREFETCH_MIN_HOLDINGS": "5"
}
elif portfolio_size <= 50:
return {
"DEEP_ANALYSIS_BATCH_SIZE": "5",
"BATCH_PREFETCH_MIN_HOLDINGS": "10"
}
else:
return {
"DEEP_ANALYSIS_BATCH_SIZE": "8",
"BATCH_PREFETCH_MIN_HOLDINGS": "15"
}
- Memory-Based Configuration:
def optimize_for_memory(available_memory_gb: float) -> dict[str, str]:
"""Optimize configuration based on available memory."""
if available_memory_gb < 4.0:
return {"DEEP_ANALYSIS_BATCH_SIZE": "3"}
elif available_memory_gb > 16.0:
return {"DEEP_ANALYSIS_BATCH_SIZE": "12"}
else:
return {"DEEP_ANALYSIS_BATCH_SIZE": "5"}
- API Tier-Based Configuration:
def optimize_for_api_tier(has_premium_alpha_vantage: bool) -> dict[str, str]:
"""Optimize configuration based on API tier."""
if has_premium_alpha_vantage:
return {
"ENABLE_ALPHA_VANTAGE": "true",
"ALPHA_VANTAGE_RATE_LIMIT": "75",
"DEEP_ANALYSIS_BATCH_SIZE": "8"
}
else:
return {
"ENABLE_ALPHA_VANTAGE": "false",
"DEEP_ANALYSIS_BATCH_SIZE": "5"
}
Troubleshooting¶
Common Issues¶
Issue: Batch processing is slower than expected
# Check configuration
echo "BATCH_PREFETCH_ENABLED: $BATCH_PREFETCH_ENABLED"
echo "DEEP_ANALYSIS_BATCH_SIZE: $DEEP_ANALYSIS_BATCH_SIZE"
echo "ENABLE_ALPHA_VANTAGE: $ENABLE_ALPHA_VANTAGE"
# Solution: Disable Alpha Vantage if enabled
export ENABLE_ALPHA_VANTAGE=false
Issue: High memory usage during batch processing
# Check current memory usage
free -h
# Solution: Reduce batch size
export DEEP_ANALYSIS_BATCH_SIZE=3
Issue: Frequent fallback to sequential mode
# Check logs for fallback reasons
grep "Falling back to sequential mode" logs/finwiz.log
# Common causes and solutions:
# 1. Network issues: Check internet connection
# 2. API rate limits: Reduce ALPHA_VANTAGE_RATE_LIMIT
# 3. Memory issues: Reduce DEEP_ANALYSIS_BATCH_SIZE
Issue: Individual tickers failing consistently
# Check for specific ticker issues
grep "Failed to process.*data for" logs/finwiz.log
# Common causes:
# 1. Invalid ticker symbols: Verify ticker exists
# 2. Delisted stocks: Remove from portfolio
# 3. API issues: Check API status
Performance Debugging¶
Monitor Batch Processing Performance:
from finwiz.utils.performance_monitor import PerformanceMonitor
monitor = PerformanceMonitor()
# Track batch processing performance
with monitor.track_batch_processing() as tracker:
# Run batch processing
result = execute_batch_processing()
# View performance metrics
metrics = tracker.get_metrics()
print(f"Total time: {metrics.total_duration_seconds:.1f}s")
print(f"Time savings: {metrics.time_savings_percentage:.1f}%")
print(f"Memory usage: {metrics.memory_usage_mb:.1f}MB")
Analyze Performance Metrics:
import json
from pathlib import Path
# Load performance metrics from file
metrics_file = Path("output/reports/session-123/batch_prefetch_metrics.json")
with open(metrics_file) as f:
metrics = json.load(f)
# Analyze performance
batch_metrics = metrics["batch_prefetch_metrics"]
print(f"Speedup factor: {batch_metrics['estimated_sequential_time_seconds'] / batch_metrics['total_duration_seconds']:.1f}x")
print(f"Failure rate: {batch_metrics['failed_tickers'] / batch_metrics['total_tickers']:.1%}")
print(f"Memory efficiency: {batch_metrics['memory_usage_mb'] / batch_metrics['total_tickers']:.1f}MB per ticker")
Future Enhancements¶
Planned Improvements¶
- Adaptive Batch Sizing: Automatically adjust batch size based on system performance
- Intelligent Data Source Selection: Choose optimal data sources based on requirements
- Distributed Processing: Scale batch processing across multiple machines
- Advanced Caching: Implement sophisticated caching strategies for repeated analyses
- Real-Time Optimization: Adjust configuration based on real-time performance metrics
Advanced Features¶
- Machine Learning Optimization: Use ML to predict optimal batch configurations
- Multi-Tier Processing: Different processing tiers based on analysis requirements
- Dynamic Resource Allocation: Automatically allocate resources based on workload
- Predictive Scaling: Scale resources based on predicted workload
- Cost Optimization: Optimize for cost efficiency across different API tiers
Conclusion¶
FinWiz's Batch Processing System provides:
- Dramatic Performance Improvements: 10-20x faster portfolio analysis
- Intelligent Resource Management: Automatic optimization based on system resources
- Robust Error Handling: Graceful degradation and comprehensive error recovery
- Flexible Configuration: Easy customization for different use cases and environments
- Comprehensive Monitoring: Detailed performance metrics and logging
- Production-Ready: Tested and optimized for real-world portfolio analysis
This system enables FinWiz to scale from small portfolios to large institutional-grade analyses while maintaining high performance and reliability.
Version: 1.0 Last Updated: 2025-01-25 Related Documentation: