Skip to content

Orchestrator Interactions

Version: 1.0 Last Updated: 2025-01-18 Status: Production

Overview

This document provides detailed diagrams and explanations of how orchestrators interact within the FinWiz Flow architecture.

Flow Execution Sequence

Complete Workflow

sequenceDiagram
    participant User
    participant Flow as FinwizFlow
    participant Val as ValidationOrchestrator
    participant Deep as DeepAnalysisOrchestrator
    participant Alt as AlternativesMatchingOrchestrator
    participant Disc as DiscoveryOrchestrator
    participant Rep as ReportingOrchestrator
    participant State as FinwizState

    User->>Flow: kickoff()
    Flow->>State: Initialize state

    Flow->>Val: validate_data_integration()
    Val->>State: Update validation status
    Val-->>Flow: Return validation result

    Flow->>Val: check_portfolio()
    Val->>State: Update portfolio data
    Val-->>Flow: Return portfolio result

    Flow->>Deep: run_deep_analysis_on_holdings()
    Deep->>State: Update deep_analysis_results
    Deep-->>Flow: Return analysis results

    Flow->>Alt: match_alternatives_for_holdings()
    Alt->>State: Update portfolio_alternatives
    Alt-->>Flow: Return alternatives

    Flow->>Disc: check_investment_discovery()
    Disc->>State: Update discovery_results
    Disc-->>Flow: Return discovery results

    Flow->>Rep: report()
    Rep->>State: Update final_report_path
    Rep-->>Flow: Return report path

    Flow-->>User: Return final result

Orchestrator Dependencies

graph TD
    Flow[FinwizFlow]

    Flow --> ErrorOrch[ErrorHandlingOrchestrator]
    Flow --> ProgOrch[ProgressTrackingOrchestrator]
    Flow --> UtilOrch[UtilityOrchestrator]
    Flow --> ValOrch[ValidationOrchestrator]
    Flow --> DeepOrch[DeepAnalysisOrchestrator]
    Flow --> AltOrch[AlternativesMatchingOrchestrator]
    Flow --> DiscOrch[DiscoveryOrchestrator]
    Flow --> RepOrch[ReportingOrchestrator]

    DeepOrch --> ErrorOrch
    DeepOrch --> ProgOrch
    DeepOrch --> UtilOrch

    AltOrch --> UtilOrch

    DiscOrch --> ErrorOrch

    RepOrch --> UtilOrch

    ValOrch --> UtilOrch

    style Flow fill:#e1f5ff
    style ErrorOrch fill:#ffe1e1
    style ProgOrch fill:#e1ffe1
    style UtilOrch fill:#fff5e1
    style ValOrch fill:#f5e1ff
    style DeepOrch fill:#e1fff5
    style AltOrch fill:#ffe1f5
    style DiscOrch fill:#f5ffe1
    style RepOrch fill:#e1e1ff

Orchestrator Interactions

1. Deep Analysis Flow

sequenceDiagram
    participant Flow
    participant Deep as DeepAnalysisOrchestrator
    participant Error as ErrorHandlingOrchestrator
    participant Prog as ProgressTrackingOrchestrator
    participant Util as UtilityOrchestrator
    participant Crew as CrewFactory
    participant State as FinwizState

    Flow->>Deep: run_deep_analysis_on_holdings(holdings)

    loop For each holding
        Deep->>Error: execute_crew_with_error_handling()
        Error->>Crew: Execute deep analysis crew
        Crew-->>Error: Return crew result
        Error-->>Deep: Return wrapped result

        Deep->>Util: parse_crew_output_for_holding()
        Util-->>Deep: Return parsed data

        Deep->>State: Store analysis result

        Deep->>Prog: update_progress()
        Prog->>State: Update progress metrics
    end

    Deep->>State: Store all results
    Deep-->>Flow: Return analysis results

2. Alternative Matching Flow

sequenceDiagram
    participant Flow
    participant Alt as AlternativesMatchingOrchestrator
    participant Util as UtilityOrchestrator
    participant State as FinwizState

    Flow->>Alt: match_alternatives_for_holdings(holdings, discovery)

    loop For each holding
        Alt->>Alt: Check grade < B

        alt Grade < B
            Alt->>Util: parse_crew_output_for_holding()
            Util-->>Alt: Return parsed alternatives
            Alt->>State: Store alternatives for holding
        else Grade >= B
            Alt->>Alt: Skip (no alternatives needed)
        end
    end

    Alt->>State: Store all alternatives
    Alt-->>Flow: Return alternatives map

3. Discovery Flow

sequenceDiagram
    participant Flow
    participant Disc as DiscoveryOrchestrator
    participant Error as ErrorHandlingOrchestrator
    participant Crew as CrewFactory
    participant State as FinwizState

    Flow->>Disc: check_investment_discovery()

    par Parallel Discovery
        Disc->>Error: execute_crew_with_error_handling(crypto_crew)
        Error->>Crew: Execute crypto discovery
        Crew-->>Error: Return crypto results
        Error-->>Disc: Return wrapped results
    and
        Disc->>Error: execute_crew_with_error_handling(stock_crew)
        Error->>Crew: Execute stock discovery
        Crew-->>Error: Return stock results
        Error-->>Disc: Return wrapped results
    and
        Disc->>Error: execute_crew_with_error_handling(etf_crew)
        Error->>Crew: Execute ETF discovery
        Crew-->>Error: Return ETF results
        Error-->>Disc: Return wrapped results
    end

    Disc->>Disc: Consolidate all results
    Disc->>State: Store discovery results
    Disc-->>Flow: Return consolidated results

4. Reporting Flow

sequenceDiagram
    participant Flow
    participant Rep as ReportingOrchestrator
    participant Util as UtilityOrchestrator
    participant State as FinwizState
    participant FS as FileSystem

    Flow->>Rep: report()

    Rep->>State: Get crew export paths
    Rep->>Rep: consolidate_reports()

    loop For each crew export
        Rep->>FS: Read export file
        FS-->>Rep: Return export data
        Rep->>Util: parse_crew_output()
        Util-->>Rep: Return parsed data
    end

    Rep->>Rep: generate_final_report()
    Rep->>Rep: generate_html_from_export()

    Rep->>FS: Write HTML report
    Rep->>State: Store report path
    Rep-->>Flow: Return report path

State Management

State Flow

graph LR
    Init[Initialize State] --> Val[Validation]
    Val --> Deep[Deep Analysis]
    Deep --> Alt[Alternative Matching]
    Alt --> Disc[Discovery]
    Disc --> Rep[Reporting]
    Rep --> Final[Final State]

    style Init fill:#e1f5ff
    style Val fill:#f5e1ff
    style Deep fill:#e1fff5
    style Alt fill:#ffe1f5
    style Disc fill:#f5ffe1
    style Rep fill:#e1e1ff
    style Final fill:#ffe1e1

State Updates by Orchestrator

Orchestrator State Fields Updated
ValidationOrchestrator validation_complete, portfolio_data
DeepAnalysisOrchestrator deep_analysis_results, deep_analysis_success, deep_analysis_error
AlternativesMatchingOrchestrator portfolio_alternatives
DiscoveryOrchestrator discovery_results
ReportingOrchestrator final_report_path, crew_export_paths
ProgressTrackingOrchestrator holdings_processed, total_holdings, progress_percentage

Error Handling Flow

sequenceDiagram
    participant Orch as Any Orchestrator
    participant Error as ErrorHandlingOrchestrator
    participant Crew as CrewFactory
    participant State as FinwizState

    Orch->>Error: execute_crew_with_error_handling(crew_func)

    Error->>Crew: Execute crew

    alt Success
        Crew-->>Error: Return result
        Error->>Error: Wrap success result
        Error-->>Orch: Return {success: true, data: result}
    else Failure
        Crew-->>Error: Raise exception
        Error->>Error: generate_error_summary()
        Error->>State: Store error info
        Error-->>Orch: Return {success: false, error: info}
    end

    Orch->>Orch: Handle result
    Orch->>State: Update state accordingly

Progress Tracking Flow

sequenceDiagram
    participant Deep as DeepAnalysisOrchestrator
    participant Prog as ProgressTrackingOrchestrator
    participant State as FinwizState
    participant FS as FileSystem

    Deep->>Prog: update_progress(processed, total)
    Prog->>Prog: Calculate percentage
    Prog->>State: Update progress fields
    Prog->>Prog: Log progress

    opt Save Metrics
        Deep->>Prog: save_batch_metrics_to_file(metrics, path)
        Prog->>FS: Write metrics file
    end

Data Parsing Flow

sequenceDiagram
    participant Orch as Any Orchestrator
    participant Util as UtilityOrchestrator
    participant Crew as CrewOutput

    Orch->>Util: parse_crew_output_for_holding(output, ticker)
    Util->>Crew: Access raw output
    Util->>Util: Extract ticker data
    Util->>Util: Parse JSON/Pydantic
    Util->>Util: Validate structure
    Util-->>Orch: Return parsed data

    Orch->>Util: calculate_grade_distribution(holdings)
    Util->>Util: Aggregate grades
    Util-->>Orch: Return distribution

    Orch->>Util: extract_sec_filing_urls(output)
    Util->>Util: Parse URLs
    Util->>Util: validate_and_fix_sec_urls()
    Util-->>Orch: Return validated URLs

Lazy Loading Pattern

sequenceDiagram
    participant User
    participant Flow as FinwizFlow
    participant Prop as @property
    participant Orch as Orchestrator

    User->>Flow: Access flow.deep_analysis_orch
    Flow->>Prop: Call property getter

    alt First Access
        Prop->>Prop: Check if _deep_analysis_orch is None
        Prop->>Orch: Create DeepAnalysisOrchestrator(state, **deps)
        Orch-->>Prop: Return instance
        Prop->>Flow: Store in _deep_analysis_orch
        Prop-->>User: Return orchestrator
    else Subsequent Access
        Prop->>Prop: Check if _deep_analysis_orch is None
        Prop->>Flow: Return cached _deep_analysis_orch
        Prop-->>User: Return orchestrator
    end

Dependency Injection

graph TD
    Flow[FinwizFlow]
    Deps[OrchestratorDependencies]

    Flow --> Deps

    Deps --> CF[CrewFactory]
    Deps --> IM[IntegrationManager]
    Deps --> EH[ErrorHandler]
    Deps --> SM[StateManager]
    Deps --> RC[ResilienceConfig]
    Deps --> BC[BatchPrefetchConfig]

    Orch1[ErrorHandlingOrchestrator]
    Orch2[DeepAnalysisOrchestrator]
    Orch3[ReportingOrchestrator]

    Deps -.-> Orch1
    Deps -.-> Orch2
    Deps -.-> Orch3

    style Flow fill:#e1f5ff
    style Deps fill:#ffe1e1
    style Orch1 fill:#e1ffe1
    style Orch2 fill:#e1fff5
    style Orch3 fill:#e1e1ff

Communication Patterns

1. Synchronous Communication

Most orchestrator interactions are synchronous:

Python
# Flow calls orchestrator
result = self.deep_analysis_orch.run_deep_analysis_on_holdings(holdings)

# Orchestrator calls another orchestrator
wrapped_result = self.error_handler_orch.execute_crew_with_error_handling(
    crew_func, "crew_name"
)

2. State-Based Communication

Orchestrators communicate through shared state:

Python
# Orchestrator A updates state
self.state.deep_analysis_results = results

# Orchestrator B reads state
results = self.state.deep_analysis_results

3. Return Value Communication

Flow methods return data for downstream listeners:

Python
@listen("check_portfolio")
def analyze_holdings(self) -> dict[str, Any]:
    results = self.deep_analysis_orch.run_deep_analysis_on_holdings(holdings)
    return {"analysis": results}  # Passed to next listener

@listen("analyze_holdings")
def match_alternatives(self, analysis_data: dict[str, Any]) -> dict[str, Any]:
    results = analysis_data["analysis"]  # Received from upstream
    # Process and return

Performance Considerations

Lazy Loading Benefits

graph LR
    A[Flow Created] --> B{Orchestrator Accessed?}
    B -->|No| C[No Memory Used]
    B -->|Yes| D[Orchestrator Created]
    D --> E[Cached for Reuse]
    E --> F{Accessed Again?}
    F -->|Yes| G[Return Cached Instance]
    F -->|No| C

Parallel Execution

Discovery crews can run in parallel:

Python
# Parallel execution using asyncio
async def run_discovery():
    crypto_task = asyncio.create_task(check_crypto())
    stock_task = asyncio.create_task(check_stock())
    etf_task = asyncio.create_task(check_etf())

    results = await asyncio.gather(
        crypto_task, stock_task, etf_task
    )
    return consolidate_results(results)

Best Practices

1. Orchestrator Communication

DO:

  • Use state for persistent data
  • Return data for downstream listeners
  • Use error handling orchestrator for crew execution
  • Keep orchestrators focused on single responsibility

DON'T:

  • Create circular dependencies
  • Store large data in memory
  • Skip error handling
  • Mix responsibilities

2. State Management

DO:

  • Update state through orchestrators
  • Use Pydantic models for type safety
  • Validate state updates
  • Document state fields

DON'T:

  • Modify state directly from Flow
  • Use unstructured state
  • Skip validation
  • Create state inconsistencies

3. Error Handling

DO:

  • Wrap crew executions with error handling
  • Log errors with context
  • Return structured error info
  • Update state with error status

DON'T:

  • Ignore errors
  • Raise unhandled exceptions
  • Lose error context
  • Skip error logging

Version: 1.0 Last Updated: 2025-01-18 Status: Production