投資組合大擂台 Ver. 2
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

249 lines
8.5 KiB

"""
LLM Investment Advisor Service
提供投資策略的AI分析服務包含
- 投資建議生成
- 風險評估
- 市場洞察
- Prompt工程管理
"""
import os
import json
import time
import logging
from typing import Dict, Any, Optional, Tuple
from functools import lru_cache
import time
import openai
from openai import OpenAI
from config_openai import OPENAI_CONFIG, RATE_LIMITS
# 設定日誌
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PromptManager:
"""管理不同的Prompt模板"""
def __init__(self):
self.system_prompt = self._get_system_prompt()
def _get_system_prompt(self) -> str:
"""系統提示詞"""
return """你是一位經驗豐富的專業投資顧問,擁有超過15年的投資經驗和深厚的金融知識。
請基於提供的投資組合數據提供專業客觀且實用的投資建議
請從以下幾個面向進行分析
1. 整體表現評估年化報酬風險指標
2. 風險收益特性分析夏普比率最大回落
3. 市場環境適配性評估
4. 具體的改進建議和操作建議
5. 風險管理和再平衡建議
請確保你的回答
- 專業且易懂避免過度技術術語
- 基於數據事實客觀分析
- 提供可操作的具體建議
- 考慮台灣市場的特殊性如果適用"""
def build_strategy_context(self, strategy_data: Dict[str, Any]) -> str:
"""將策略資料轉換為結構化context"""
return f"""
策略基本資訊
- 策略編號{strategy_data.get('id', 'N/A')}
- 策略名稱{strategy_data.get('name', 'N/A')}
- 投資目標{strategy_data.get('role', 'N/A')}
- 建立時間{strategy_data.get('date', 'N/A')}
- 建立者{strategy_data.get('username', 'N/A')}
績效指標
- 年化報酬率{strategy_data.get('annual_ret', 0):.2%}
- 年化波動率{strategy_data.get('vol', 0):.2%}
- 年化夏普比率{strategy_data.get('annual_sr', 0):.2f}
- 最大回落MDD{strategy_data.get('mdd', 0):.2%}
- Alpha值{strategy_data.get('alpha', 0):.4f}
- Beta值{strategy_data.get('beta', 0):.4f}
- VaR (10%){strategy_data.get('var10', 0):.2%}
- R-squared{strategy_data.get('r2', 0):.4f}
- Gamma值{strategy_data.get('gamma', 0):.4f}
投資組合配置
- 包含資產{', '.join(strategy_data.get('assets', []))}
- 市場類型{'台灣市場' if strategy_data.get('tw', True) else '美國市場'}
"""
def get_investment_advice_prompt(self, strategy_data: Dict[str, Any]) -> str:
"""生成投資建議的完整Prompt"""
context = self.build_strategy_context(strategy_data)
return f"""{self.system_prompt}
{context}
請提供詳細的投資建議分析包含
1. 整體表現評估該策略的強項和弱點
2. 風險評估當前風險水平的評價和建議
3. 市場適配性該策略在當前市場環境下的適配程度
4. 改進建議具體的可操作改進建議
5. 未來展望未來3-6個月的投資建議
請用繁體中文回答結構清晰建議具體可行"""
class LLMInvestmentAdvisor:
"""LLM投資顧問主類"""
def __init__(self, api_key: Optional[str] = None):
"""初始化LLM服務"""
self.api_key = api_key or OPENAI_CONFIG['api_key']
if not self.api_key or self.api_key == 'your-api-key-here':
raise ValueError("OpenAI API key is required. Please set OPENAI_API_KEY environment variable.")
self.client = OpenAI(
api_key=self.api_key,
timeout=OPENAI_CONFIG.get('timeout', 30)
)
self.prompt_manager = PromptManager()
self.model = OPENAI_CONFIG.get('model', 'gpt-4')
self.max_tokens = OPENAI_CONFIG.get('max_tokens', 2000)
self.temperature = OPENAI_CONFIG.get('temperature', 0.7)
# 快取設定
self.cache: Dict[str, Tuple[str, float]] = {}
self.cache_timeout = 3600 # 1小時快取
def _is_cache_valid(self, cache_time: float) -> bool:
"""檢查快取是否有效"""
return time.time() - cache_time < self.cache_timeout
@lru_cache(maxsize=100)
def generate_advice(self, strategy_id: str, strategy_data: Dict[str, Any]) -> str:
"""生成投資建議
Args:
strategy_id: 策略ID
strategy_data: 策略資料字典
Returns:
str: 投資建議文本
"""
cache_key = f"advice_{strategy_id}_{hash(str(strategy_data))}"
# 檢查快取
if cache_key in self.cache:
advice, cache_time = self.cache[cache_key]
if self._is_cache_valid(cache_time):
logger.info(f"Returning cached advice for strategy {strategy_id}")
return advice
try:
# 構建prompt
prompt = self.prompt_manager.get_investment_advice_prompt(strategy_data)
# 調用OpenAI API
response = self._call_openai_with_retry(prompt)
# 快取結果
self.cache[cache_key] = (response, time.time())
logger.info(f"Generated new advice for strategy {strategy_id}")
return response
except Exception as e:
logger.error(f"Error generating advice for strategy {strategy_id}: {str(e)}")
return self._get_fallback_advice(strategy_data)
def _call_openai_with_retry(self, prompt: str) -> str:
"""調用OpenAI API,包含重試機制
Args:
prompt: 完整的prompt
Returns:
str: API回應內容
"""
max_retries = RATE_LIMITS.get('max_retries', 3)
retry_delay = RATE_LIMITS.get('retry_delay', 2)
for attempt in range(max_retries):
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "你是一位專業的投資顧問。"},
{"role": "user", "content": prompt}
],
max_tokens=self.max_tokens,
temperature=self.temperature,
top_p=0.9
)
return response.choices[0].message.content.strip()
except openai.RateLimitError as e:
if attempt < max_retries - 1:
# 指數退避:1s, 2s, 4s
wait_time = retry_delay ** attempt
logger.warning(f"Rate limit exceeded, retrying in {wait_time}s... (attempt {attempt + 1}/{max_retries})")
time.sleep(wait_time)
continue
else:
logger.error(f"Rate limit exceeded after {max_retries} attempts")
raise e
except openai.APIError as e:
if attempt < max_retries - 1:
wait_time = retry_delay ** attempt
logger.warning(f"API error, retrying in {wait_time}s... (attempt {attempt + 1}/{max_retries}): {str(e)}")
time.sleep(wait_time)
continue
else:
logger.error(f"API error after {max_retries} attempts: {str(e)}")
raise e
except Exception as e:
logger.error(f"Unexpected error calling OpenAI: {str(e)}")
raise e
def _get_fallback_advice(self, strategy_data: Dict[str, Any]) -> str:
"""獲取fallback投資建議"""
annual_ret = strategy_data.get('annual_ret', 0)
vol = strategy_data.get('vol', 0)
sharpe = strategy_data.get('annual_sr', 0)
return f"""
基於您的投資策略數據我提供以下初步分析
📊 **表現評估**
- 年化報酬率{annual_ret:.2%} - {'表現良好' if annual_ret > 0.1 else '有改進空間'}
- 年化波動率{vol:.2%} - {'風險適中' if vol < 0.2 else '風險較高'}
- 夏普比率{sharpe:.2f} - {'風險調整後報酬優良' if sharpe > 1 else '有改進空間'}
💡 **初步建議**
1. 持續監控市場變化
2. 定期檢視投資組合配置
3. 考慮分散投資降低風險
*注意此為預設建議如需詳細分析請稍後再試*
"""
def clear_cache(self):
"""清除快取"""
self.cache.clear()
self.generate_advice.cache_clear()
logger.info("LLM advice cache cleared")
# 創建全域實例
llm_advisor = None
def get_llm_advisor() -> LLMInvestmentAdvisor:
"""獲取LLM顧問實例"""
global llm_advisor
if llm_advisor is None:
llm_advisor = LLMInvestmentAdvisor()
return llm_advisor