Memory Management for Batch Processing¶
This document describes the memory management system for batch data pre-fetching and crew execution in FinWiz.
Data Source Priority¶
IMPORTANT: Yahoo Finance is the PRIMARY data source and is ALWAYS used.
- Yahoo Finance (PRIMARY): ALWAYS ENABLED
- Provides ALL essential data: company info, fundamentals, price, history
- Performance: ~2-5 seconds for 66 tickers
- Rate limit: 600 requests/minute (10/second)
-
Recommended configuration
-
Alpha Vantage (OPTIONAL): DISABLED BY DEFAULT
- Adds minimal value over Yahoo Finance
- Performance: ~13 minutes for 66 tickers
- Rate limit: 5 calls/minute (free tier)
- Not recommended - Yahoo Finance provides all essential data
Overview¶
The memory management system ensures that batch processing stays within acceptable memory limits (< 500 MB total) by:
- Monitoring memory usage during pre-fetch and execution
- Logging memory metrics for performance analysis
- Cleaning up cache after Flow completion
- Validating memory constraints to ensure limits are met
Requirements¶
- 17.70: Monitor memory usage during pre-fetch and execution
- 17.71: Implement cache cleanup after Flow completion
- 17.72: Add memory usage logging to metrics
- 17.73: Validate memory constraints (< 500 MB total)
- 17.74: Memory limit enforcement and warnings
Architecture¶
Components¶
- MemoryManager (
src/finwiz/utils/memory_manager.py) - Core memory monitoring and management
- Memory usage tracking and logging
- Cache cleanup functionality
-
Memory constraint validation
-
BatchDataPreFetcher (
src/finwiz/utils/batch_data_prefetcher.py) - Integrates MemoryManager for monitoring
- Tracks memory during batch operations
-
Provides memory metrics and cleanup methods
-
Flow Integration (to be implemented in Flow orchestrator)
- Monitors memory during crew execution
- Cleans up cache after Flow completion
- Includes memory metrics in performance reports
Usage¶
Basic Usage with BatchDataPreFetcher¶
from finwiz.utils.batch_data_prefetcher import BatchDataPreFetcher
# Initialize prefetcher (automatically creates memory manager)
# Yahoo Finance is ALWAYS used (primary source)
# Alpha Vantage is DISABLED by default (recommended)
prefetcher = BatchDataPreFetcher(
session_id="session-123",
enable_alpha_vantage=False # Recommended: Yahoo Finance only
)
# Pre-fetch data (memory is monitored automatically)
# Yahoo Finance provides all essential data in ~2-5 seconds
data = prefetcher.prefetch_all_data(["AAPL", "MSFT", "GOOGL"])
# Get memory metrics
metrics = prefetcher.get_memory_metrics()
print(f"Peak memory: {metrics['peak_memory_mb']} MB")
print(f"Within limit: {metrics['within_limit']}")
# Validate memory constraints
if prefetcher.validate_memory_constraints():
print("✓ Memory usage within limits")
else:
print("✗ Memory limit exceeded")
# Clean up cache after completion
cleanup_result = prefetcher.cleanup_cache()
print(f"Freed {cleanup_result['disk_freed_mb']} MB")
Direct MemoryManager Usage¶
from finwiz.utils.memory_manager import get_memory_manager
# Create memory manager
memory_manager = get_memory_manager(session_id="session-123")
# Monitor memory at different stages
memory_manager.monitor_memory("initialization")
# ... perform operations ...
memory_manager.monitor_memory("data-processing")
# ... more operations ...
memory_manager.monitor_memory("completion")
# Get comprehensive metrics
metrics = memory_manager.get_memory_metrics()
# Validate constraints
is_valid = memory_manager.validate_memory_constraints()
# Clean up cache
cleanup_result = memory_manager.cleanup_cache()
Flow Integration Example¶
from finwiz.utils.batch_data_prefetcher import BatchDataPreFetcher
from finwiz.utils.memory_manager import get_memory_manager
class FinwizFlow(Flow[FinwizState]):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.memory_manager = None
self.prefetcher = None
@listen("check_portfolio")
def execute_deep_analysis_with_prefetch(self) -> dict[str, Any]:
"""Execute deep analysis with batch pre-fetching and memory monitoring."""
# Initialize prefetcher with memory management
session_id = self.state.session_id
self.prefetcher = BatchDataPreFetcher(
session_id=session_id,
enable_alpha_vantage=False # Yahoo Finance only for speed
)
# Get memory manager reference
self.memory_manager = self.prefetcher.memory_manager
# Monitor memory at start
self.memory_manager.monitor_memory("deep-analysis-start")
# Get holdings to analyze
holdings = self._get_underperforming_holdings()
tickers = [h["ticker"] for h in holdings]
# Pre-fetch all data in batch
logger.info(f"Pre-fetching data for {len(tickers)} holdings...")
prefetched_data = self.prefetcher.prefetch_all_data(tickers)
# Monitor memory after pre-fetch
self.memory_manager.monitor_memory("pre-fetch-complete")
# Execute crews with pre-fetched data
results = {}
for ticker in tickers:
ticker_data = prefetched_data.get(ticker, {})
# Create and execute crew with pre-fetched data
crew = DeepAnalysisCrew()
crew.set_prefetched_data(ticker_data)
result = crew.crew().kickoff(inputs={"ticker": ticker})
results[ticker] = result
# Monitor memory periodically
if len(results) % 10 == 0:
self.memory_manager.monitor_memory(f"crew-execution-{len(results)}")
# Monitor memory after all crews complete
self.memory_manager.monitor_memory("all-crews-complete")
# Get memory metrics for reporting
memory_metrics = self.prefetcher.get_memory_metrics()
# Validate memory constraints
if not self.prefetcher.validate_memory_constraints():
logger.warning("Memory constraints were exceeded during execution")
# Store metrics in state
self.state.batch_prefetch_metrics = {
"memory": memory_metrics,
"tickers_analyzed": len(results),
"successful": sum(1 for r in results.values() if r.get("success", False))
}
return {"results": results, "memory_metrics": memory_metrics}
def cleanup(self):
"""Clean up resources after Flow completion."""
if self.prefetcher:
logger.info("Cleaning up batch prefetch cache...")
cleanup_result = self.prefetcher.cleanup_cache()
logger.info(
f"Cache cleanup complete: {cleanup_result['files_removed']} files, "
f"{cleanup_result['disk_freed_mb']} MB freed"
)
Memory Monitoring¶
Automatic Monitoring Points¶
The BatchDataPreFetcher automatically monitors memory at these stages:
- pre-fetch-start: Before any data fetching begins
- yahoo-finance-complete: After Yahoo Finance batch fetch
- alpha-vantage-complete: After Alpha Vantage batch fetch (if enabled)
- cache-save-complete: After saving data to cache
Manual Monitoring¶
You can add custom monitoring points:
Memory Metrics¶
Each monitoring point captures:
- stage: Description of the stage
- memory_mb: Current memory usage in MB
- memory_bytes: Current memory usage in bytes
- delta_mb: Change from initial memory in MB
- peak_mb: Peak memory usage so far in MB
- within_limit: Whether memory is within 500 MB limit
Memory Constraints¶
Limits¶
- Maximum memory: 500 MB (configurable via
MAX_MEMORY_MBinmemory_manager.py) - Warning threshold: 80% of maximum (400 MB)
- Error threshold: 100% of maximum (500 MB)
Warnings and Errors¶
The system automatically logs:
- Warning at 80% threshold: Approaching memory limit
- Error at 100% threshold: Memory limit exceeded
Validation¶
After Flow completion, validate memory constraints:
if prefetcher.validate_memory_constraints():
logger.info("✓ Memory constraints validated")
else:
logger.error("✗ Memory constraints violated")
Cache Cleanup¶
Automatic Cleanup¶
Cache cleanup should be called after Flow completion:
Cleanup Metrics¶
The cleanup operation returns:
- cache_dir: Path to cache directory
- disk_freed_mb: Disk space freed in MB
- files_removed: Number of files removed
- success: Whether cleanup succeeded
Manual Cleanup¶
You can also use the memory manager directly:
Performance Metrics¶
Memory Metrics Structure¶
{
"initial_memory_mb": 100.0,
"peak_memory_mb": 150.0,
"final_memory_mb": 120.0,
"memory_increase_mb": 20.0,
"max_memory_limit_mb": 500,
"within_limit": True,
"peak_usage_percent": 30.0,
"samples": [
{
"stage": "pre-fetch-start",
"memory_mb": 100.0,
"delta_mb": 0.0,
"peak_mb": 100.0,
"within_limit": True
},
# ... more samples ...
],
"sample_count": 10
}
Including in Performance Reports¶
Memory metrics should be included in batch execution reports:
# Save to batch_execution_metrics.json
metrics = {
"execution": {
"total_time": 120.5,
"tickers_analyzed": 66,
"successful": 64
},
"memory": prefetcher.get_memory_metrics(),
"cleanup": cleanup_result
}
with open("batch_execution_metrics.json", "w") as f:
json.dump(metrics, f, indent=2)
Best Practices¶
1. Initialize Early¶
Create the memory manager at the start of batch processing:
prefetcher = BatchDataPreFetcher(session_id=session_id)
# Memory manager is automatically created
2. Monitor Key Stages¶
Add monitoring at important stages:
3. Check Constraints¶
Validate memory constraints after completion:
4. Always Clean Up¶
Clean up cache after Flow completion:
5. Include in Metrics¶
Include memory metrics in performance reports:
Troubleshooting¶
High Memory Usage¶
If memory usage is high:
- Check batch size: Reduce number of concurrent crews
- Disable Alpha Vantage: Use Yahoo Finance only
- Monitor stages: Identify which stage uses most memory
- Force garbage collection: Call
gc.collect()between stages
Memory Limit Exceeded¶
If memory limit is exceeded:
- Review logs: Check which stage exceeded limit
- Reduce batch size: Process fewer tickers at once
- Increase limit: Adjust
MAX_MEMORY_MBif system allows - Optimize data structures: Reduce data stored in memory
Cleanup Failures¶
If cache cleanup fails:
- Check permissions: Ensure write access to cache directory
- Check disk space: Ensure sufficient disk space
- Manual cleanup: Remove cache directory manually
- Review logs: Check error messages for details
Testing¶
Unit Tests¶
Test memory management functionality:
def test_memory_monitoring(tmp_path):
"""Test memory monitoring at different stages."""
manager = MemoryManager(session_id="test-123")
# Monitor at different stages
sample1 = manager.monitor_memory("stage-1")
assert sample1["within_limit"]
sample2 = manager.monitor_memory("stage-2")
assert sample2["peak_mb"] >= sample1["memory_mb"]
def test_cache_cleanup(tmp_path):
"""Test cache cleanup functionality."""
manager = MemoryManager(session_id="test-123")
# Create some cache files
cache_dir = Path(f"cache/batch_data/test-123")
cache_dir.mkdir(parents=True, exist_ok=True)
(cache_dir / "test.json").write_text("{}")
# Clean up
result = manager.cleanup_cache()
assert result["success"]
assert not cache_dir.exists()
def test_memory_constraints():
"""Test memory constraint validation."""
manager = MemoryManager(session_id="test-123")
# Should be within limits initially
assert manager.validate_memory_constraints()
Integration Tests¶
Test with actual batch processing:
def test_batch_prefetch_with_memory_management():
"""Test batch prefetch with memory monitoring."""
prefetcher = BatchDataPreFetcher(session_id="test-123")
# Pre-fetch data
data = prefetcher.prefetch_all_data(["AAPL", "MSFT"])
# Check memory metrics
metrics = prefetcher.get_memory_metrics()
assert metrics["within_limit"]
assert metrics["peak_memory_mb"] < 500
# Validate constraints
assert prefetcher.validate_memory_constraints()
# Clean up
cleanup = prefetcher.cleanup_cache()
assert cleanup["success"]
References¶
- Requirements: 17.70, 17.71, 17.72, 17.73, 17.74
- Implementation:
src/finwiz/utils/memory_manager.py - Integration:
src/finwiz/utils/batch_data_prefetcher.py - Configuration:
src/finwiz/config/batch_prefetch_config.py
Version: 1.0 Last Updated: 2025-01-25 Status: Implemented