Skip to content

Complex Workflow: Finance Analyst

This use case demonstrates a real-world scenario: Financial Market Analysis.

It combines: 1. Multiple Frameworks: Agno, LangChain, CrewAI, and PydanticAI working together. 2. Tool Use: Agents equipped with search tools (Tavily). 3. Parallelism: Fetching Gold, Crypto, and Stock data simultaneously. 4. Synthesis: A final advisor agent that aggregates all reports into a conclusion. 5. Cost Calculation: Tracking the cost of execution.

use_case/finance_workflow.py

import asyncio
import os
import sys
from typing import Optional, Any, List
from dotenv import load_dotenv

load_dotenv()
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'src'))

from kibo_core import AgentConfig, create_agent

PROXY_URL = os.getenv("KIBO_PROXY_URL", "http://localhost:4000")
MODEL_NAME = "openai/gpt-4o-mini"
TAVILY_KEY = os.getenv("TAVILY_API_KEY")

def get_agno_tool(api_key: str):
    """Returns Agno Tavily Tool or None if import fails."""
    if not api_key: return None
    try:
        from agno.tools.tavily import TavilyTools
        return TavilyTools(api_key=api_key)
    except ImportError:
        return None

def get_langchain_tool(api_key: str):
    """Returns LangChain Tavily Tool or Mock."""
    tool = None
    if api_key:
        try:
             from langchain_community.tools.tavily_search import TavilySearchResults
             tool = TavilySearchResults(api_key=api_key)
        except ImportError: pass

    if not tool:
        from langchain.tools import Tool
        return Tool(name="search", func=lambda x: "Mock Data: Market is stable.", description="Search mock")
    return tool

def get_crewai_tool(api_key: str):
    """Returns Custom CrewAI Tavily Tool or Mock."""
    tool = None
    if api_key:
        try:
            from crewai.tools import BaseTool
            from tavily import TavilyClient
            class TavilyCrewTool(BaseTool):
                name: str = "TavilySearch"
                description: str = "Search web."
                def _run(self, query: str) -> str:
                    return str(TavilyClient(api_key=api_key).search(query, search_depth="basic"))
            tool = TavilyCrewTool()
        except ImportError: pass

    if not tool:
        try:
            from crewai.tools import BaseTool
            class MockTool(BaseTool):
                name: str="Search"
                description: str="Mock search"
                def _run(self, q: str) -> str: return "Mock Data: Stocks up."
            return MockTool()
        except ImportError: return None
    return tool

def make_agent(name: str, task: str, engine: str, tools: Optional[List[Any]] = None) -> Any:
    """Simplified agent creation wrapper."""

    tools = [] if tools is None else list(tools)

    config_params = {
        "base_url": PROXY_URL, 
        "temperature": 0.0,  # Deterministic output
        "tools": list(filter(None, tools)) # Remove Nones
    }

    return create_agent(AgentConfig(
        name=name,
        description=f"Specialist agent for {name}",
        instructions=task,
        agent=engine,
        model=MODEL_NAME,
        config=config_params
    ), api_key=os.getenv("OPENAI_API_KEY", "sk-dummy"))

def get_cost(res: Any) -> float:
    """Extracts cost from result metadata."""
    if not res or isinstance(res, Exception): return 0.0

    if res.metadata.get("cost") is not None:
        return float(res.metadata["cost"])

    usage = res.metadata.get("token_usage") or res.metadata.get("usage", {})

    if hasattr(usage, "request_tokens"):
        usage = {"prompt_tokens": usage.request_tokens, "completion_tokens": usage.response_tokens}
    elif hasattr(usage, "input_tokens"):
        usage = {"prompt_tokens": usage.input_tokens, "completion_tokens": usage.output_tokens, "cost": getattr(usage, "cost", None)}
    elif hasattr(usage, "__dict__"):
        usage = usage.__dict__

    if usage.get("response_cost") is not None:
        return float(usage["response_cost"])
    if usage.get("cost") is not None:
        return float(usage["cost"])

    p = usage.get("prompt_tokens", 0)
    c = usage.get("completion_tokens", 0)
    return (p * 0.15 + c * 0.60) / 1_000_000

async def main():
    print(f" Starting Kibo Workflow [Model: {MODEL_NAME}]")
    if not TAVILY_KEY: print(" Running in MOCK mode (No TAVILY_API_KEY)")

    agents = [
        make_agent("GoldAgent", "Find current Gold/XAU price (USD).", "agno", [get_agno_tool(TAVILY_KEY)]),
        make_agent("CryptoAgent", "Summarize top Blockchain news.", "langchain", [get_langchain_tool(TAVILY_KEY)]),
        make_agent("StockAgent", "Get prices: NVDA, MSFT, GOOG.", "crewai", [get_crewai_tool(TAVILY_KEY)])
    ]

    print("\n Launching parallel tasks...")
    prompts = [
        "Price of Gold today?", 
        "Blockchain news summary.", 
        "Stock prices for NVDA, MSFT, GOOG."
    ]

    futures = [agent.run_async(p) for agent, p in zip(agents, prompts)]

    results = await asyncio.gather(*[asyncio.to_thread(f.result) for f in futures], return_exceptions=True)

    context = ""
    total_cost = 0.0

    for i, res in enumerate(results):
        if isinstance(res, Exception):
            print(f" Task {i+1} failed: {res}")
            continue

        cost = get_cost(res)
        total_cost += cost
        src = res.metadata.get('adapter', 'Unknown')
        print(f" {src} done (${cost:.6f})")
        context += f"\n--- {src} Report ---\n{res.output_data}\n"

    print("\n Synthesizing advice...")
    advisor = make_agent(
        "Advisor", 
        "Analyze reports. Suggest portfolio allocation (Stocks/Crypto/Gold). Explain why.", 
        "pydantic_ai"
    )

    final = advisor.run(f"Market Data:\n{context}")

    cost_adv = get_cost(final)
    total_cost += cost_adv

    print("\n=== FINAL STRATEGY ===")
    print(final.output_data)
    print(f"\n💵 Total Cost: ${total_cost:.6f}")

if __name__ == "__main__":
    asyncio.run(main())