0530 update add tpm function

master
josephwang 11 months ago
parent 3f515e9a88
commit 5d4de19c39
  1. 228
      TPM.py
  2. 54
      function_calling.py

228
TPM.py

@ -0,0 +1,228 @@
import json
import time
import numpy as np
import pandas as pd
from scipy.optimize import minimize
import yfinance as yf
class MVO(object):
@staticmethod
def portfolio_info(w, ret, market_ret, rf=0):
# return and drawdown
retPort = ret@w # T-dimensional array
cum_ret = (retPort+1).cumprod()
rolling_max=np.maximum.accumulate(cum_ret)
mdd = np.max((rolling_max - cum_ret)/rolling_max)
## Sharpe Ratio
stdPort = np.std(retPort)
vol = stdPort*15.87451
annual_ret = np.mean(retPort) * 252
annual_sr = (annual_ret-rf) / vol
## alpha, beta
cov = np.cov(retPort, market_ret)
beta = cov[0, 1] / cov[1, 1]
alpha = annual_ret - rf - beta*(np.mean(market_ret) * 252 - rf)
R2 = cov[0, 1]**2/(cov[0, 0] * cov[1, 1])
## n-day 95% VaR
var10 = -annual_ret*(10/252) + 1.645*vol*(10/252)**(1/2)
d = dict(annual_ret = annual_ret,
vol=vol,
mdd=mdd,
annual_sr=annual_sr,
beta=beta,
alpha=alpha,
var10=var10,
R2=R2)
return {key: round(d[key], 2) for key in d}
@staticmethod
def sharpe_ratio(w, ret):
cov = np.cov(ret.T)
# print(cov.shape, w.shape)
retPort = ret@w # T-dimensional array
stdPort = np.std(retPort)
return np.mean(retPort)/stdPort
@staticmethod
def sharpe_grad(w, ret, cov):
manual_ret = np.mean(ret, axis=0)
# print(cov.shape, w.shape)
retPort = ret@w # T-dimensional array
stdPort = np.std(retPort)
g1=manual_ret/stdPort
g2=np.mean(retPort)*stdPort**(-3)*cov@w
return g1-g2
@staticmethod
def sortino_ratio(w, ret):
retPort = ret@w # T-dimensional array
stdPort = np.std(np.maximum(-retPort, 0))
return np.mean(retPort)/stdPort
@staticmethod
def sortino_grad(w, ret, cov_sor):
manual_ret = np.mean(ret, axis=0)
# print(cov.shape, w.shape)
retPort = ret@w # T-dimensional arrayss
stdPort = np.std(retPort)
g1=manual_ret/stdPort
g2=np.mean(retPort)*stdPort**(-3)*cov_sor@w
return g1-g2
@staticmethod
def sortino_ratio(w, ret):
retPort = ret@w # T-dimensional array
stdPort = np.std(np.maximum(-retPort, 0))
return np.mean(retPort)/stdPort
@staticmethod
def sortino_grad(w, ret, cov_sor):
manual_ret = np.mean(ret, axis=0)
# print(cov.shape, w.shape)
retPort = ret@w # T-dimensional arrayss
stdPort = np.std(retPort)
g1=manual_ret/stdPort
g2=np.mean(retPort)*stdPort**(-3)*cov_sor@w
return g1-g2
# equivalent opt problem with min vol
@staticmethod
def volatility(w, ret):
retPort = ret@w # T-dimensional array
return np.std(retPort)
@staticmethod
def volatility_grad(w, ret, cov):
retPort = ret@w # T-dimensional array
stdPort = np.std(retPort)
return cov@w/stdPort
@staticmethod
def quadratic_utility(w, ret, gamma):
retPort = ret@w # T-dimensional array
varPort = np.var(retPort)
return np.mean(retPort) - 0.5*gamma*varPort
@staticmethod
def quadratic_utility_grad(w, ret, cov, gamma):
manual_ret = np.mean(ret, axis=0)
return manual_ret - gamma*cov@w
@classmethod
def opt(cls, ret, gamma=0, role="max_sharpe"):
n = ret.shape[1]
init=np.ones(n)/n
if role=="max_sharpe":
if n==1:
cov=np.array(np.cov(ret.T))
else:
cov=np.cov(ret.T)
loss = lambda w: -cls.sharpe_ratio(w, ret)
grad = lambda w: -cls.sharpe_grad(w, ret, cov)
elif role=="max_sortino":
if n==1:
cov = np.cov(np.maximum(ret, 0).T)
else:
cov = np.array(np.cov(np.maximum(ret, 0).T))
loss = lambda w: -cls.sortino_ratio(w, ret)
grad = lambda w: -cls.sortino_grad(w, ret, cov)
elif role=="min_volatility":
if n==1:
cov=np.array(np.cov(ret.T))
else:
cov=np.cov(ret.T)
loss = lambda w: cls.volatility(w, ret)
grad = lambda w: cls.volatility_grad(w, ret, cov)
elif role=="quadratic_utility":
if n==1:
cov=np.array(np.cov(ret.T))
else:
cov=np.cov(ret.T)
loss = lambda w: -cls.quadratic_utility(w, ret, gamma)
grad = lambda w: -cls.quadratic_utility_grad(w, ret, cov, gamma)
else:
return init
if n==1:
bnds = [[0,1]]
else:
bnds = [[0.03, 0.6] for i in range(n)]
opts = {'maxiter': 1000, 'disp': False}
cons = ({'type': 'eq', 'fun': lambda w: np.sum(w) - 1})
result = minimize(loss, init, method="SLSQP",\
options=opts, bounds=bnds, tol = None, jac = grad, constraints=cons)
sol = result['x']
return np.round(sol, 2)
def rolling_optimize(ret, lookback=126, backtest=126, role="max_sharpe", gamma=None):
n, num = ret.shape
period = (n - lookback)//backtest+1
weights = []
start = []
rets = []
for i in range(period):
curr = i*backtest+lookback
data_train = ret.iloc[curr-lookback:curr, :].to_numpy()
data_test = ret.iloc[curr:curr+backtest, :]
if len(data_test) == 0:
break
w = MVO.opt(data_train, role=role, gamma=gamma)
start.append(data_test.index[0])
weights.append(w)
rets.append(data_test.to_numpy()@w)
weight = pd.DataFrame(weights, columns=ret.columns, index=pd.to_datetime(start))
rets = np.hstack(rets)
equally_weighted = ret.iloc[lookback:, :].to_numpy()@np.ones(num)/num
rets = pd.DataFrame(np.vstack([rets, equally_weighted]).T, columns=['Portfolio', 'Equally'], index=ret.index[lookback:])
return weight, rets
def get_Data(tickers,start_date,end_date):
all_data_df = pd.DataFrame()
for ticker in tickers:
df = yf.Ticker(ticker)
data = yf.download(ticker, start=start_date, end=end_date)
data.index = pd.to_datetime(data.index).strftime('%Y-%m-%d')
file_name = ticker
data.rename(columns={'Adj Close': file_name}, inplace=True)
all_data_df = pd.concat([all_data_df, data[file_name]], axis=1, sort=False)
all_data_df.reset_index(inplace=True)
all_data_df = all_data_df.rename(columns={'index': 'Date'})
all_data_cleaned = all_data_df.dropna()
all_data_cleaned.set_index('Date', inplace=True)
all_data_cleaned.index = pd.to_datetime(all_data_cleaned.index, format='%Y-%m-%d')
return all_data_cleaned
def main(tickers, role, start_date, end_date, lookback, backtest, gamma = 0):
try:
data = get_Data(tickers,start_date,end_date)
except:
print("股票資料不合")
return False
n = len(data.index)
if n < lookback+backtest+63:
print("投資組合無法建立,資料長度與所選參數不符。")
return False
#限制回測最長時間為4年,非必要
elif n > 1009+lookback:
port = data.iloc[-(1009+lookback):, :]
else:
pass
length, num = data.shape
ret = data.pct_change().dropna()
weight, rets = rolling_optimize(ret, lookback, backtest, role=role, gamma=gamma)
weight.index = weight.index.astype(str)
rets.index = rets.index.astype(str)
rets = rets.round(5)
info = MVO.portfolio_info(np.array([1]), rets['Portfolio'].to_numpy().reshape(-1, 1), np.zeros(len(ret) - lookback))
info['assets'] = list(data.columns)
info['weight'] = weight
info['ret'] = rets
#return bar
rets.index.name = 'date'
rets.index = pd.to_datetime(rets.index)
ret_hist = rets.to_period('Q').groupby('date').apply(lambda x: (x + 1).prod() - 1)
info['bar'] = ret_hist
return info

@ -1,12 +1,19 @@
import json
import time
from openai import OpenAI
from tenacity import retry, wait_random_exponential, stop_after_attempt
import sys
from TPM import main
from datetime import datetime, timedelta
GPT_MODEL = "gpt-4-turbo"
client = OpenAI(api_key="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")
def count_date(period):
today = datetime.today() - timedelta(days=1)
half_year_ago = today - timedelta(days=period)
half_year_ago_formatted = half_year_ago.strftime("%Y-%m-%d")
return today.strftime("%Y-%m-%d"), half_year_ago_formatted
@retry(wait=wait_random_exponential(multiplier=1, max=40), stop=stop_after_attempt(3))
def chat_completion_request(messages, tools=None, tool_choice=None, model=GPT_MODEL):
try:
@ -22,9 +29,18 @@ def chat_completion_request(messages, tools=None, tool_choice=None, model=GPT_MO
print(f"Exception: {e}")
sys.exit(1) # Add this line
def get_backtest(symbols, sliding_window="six months", frequency="semi-annually", function="sharpe ratio"):
symbol = " , ".join(symbols)
return f"Backtest result for {symbol} Annualized return: 10%, Annualized Sharpe ratio: 1.5, Annualized volatility: 20%, Maximum drawdown: 5%, Alpha: 0.1, Beta: 0.8, VaR10: 5%, R2: 0.9"
def get_tpm_backtest(symbols, sliding_window=126, frequency=126, function="max_sharpe",period="2022-01-01 to 2022-12-31"):
if period.isalnum() == False:
period = period.split(" to ")
start_date = period[0]
end_date = period[1]
else:
end_date, start_date = count_date(int(period))
print(symbols, sliding_window, frequency, function, period , start_date, end_date)
result = main(symbols, role=function, start_date=start_date, end_date=end_date, lookback=int(sliding_window), backtest=int(frequency))
if result == False:
return "投資組合無法建立,資料長度與所選參數不符。"
return f"Backtest result for {symbols} Annualized return: {result['annual_ret']}, Sharpe ratio: {result['annual_sr']}, Annualized volatility: {result['vol']}, Maximum drawdown: {result['mdd']}, for the period {start_date} to {end_date}"
def backest_main(query):
tools = [
{
@ -37,7 +53,7 @@ def backest_main(query):
"properties": {
"symbol": {
"type": "array",
"description": "An array of multiple portfolio symbol to be backtested if the symbol is Taiwan Stock exchage the code ex: TSMC to 2330.TW , if there is multiple symbol return a python list format",
"description": "An array of multiple portfolio symbol to be backtested if the symbol is Taiwan Stock exchage the code ex: TSMC to 2330.TW ,GOOGLE to GOOG , APPLE to AAPL , if there is multiple symbol return a python list format",
"items": {
"type": "string",
"description": "The symbol of the stock",
@ -45,21 +61,25 @@ def backest_main(query):
},
"sliding window": {
"type": "string",
"enum": ["one month", "three months", "six months", "one year"],
"description": "The sliding window size to be backtested in one month, three months , six months, one year",
"enum": ["21", "63", "126", "252"],
"description": "The sliding window size to be backtested in one month:21, three months:63 , six months:126, one year:252",
},
"frequency": {
"type": "string",
"enum": ["monthly", "quarterly", "semi-annually", "annually"],
"description": "The optimize frequency to be backtested in monthly, quarterly, semi-annually, annually",
"enum": ["21", "63", "126", "252"],
"description": "The optimize frequency to be backtested in monthly:21, quarterly:63, semi-annually:126, annually:252",
},
"function": {
"type": "string",
"enum": ["sharpe ratio", "sortino ratio", "volatility", "utility function"],
"enum": ["max_sharpe", "max_sortino", "min_volatility", "quadratic_utility"],
"description": "The optimize function to be backtested in sharpe ratio, sortino ratio,volatility, utility function",
},
"period": {
"type": "string",
"description": "The period of the backtest to be calculated If the user specifies a time period for backtesting, return it in the format ‘YYYY-MM-DD to YYYY-MM-DD’. If the user specifies the past year, return ‘360’. If the user specifies the past six months, return ‘180",
},
},
"required": ["symbol", "sliding window", "frequency", "function"],
"required": ["symbol", "sliding window", "frequency", "function","period"]
},
}
}
@ -68,7 +88,7 @@ def backest_main(query):
messages = []
sec_message = []
messages.append({"role": "system", "content": "You are a software developer who is writing a function to get the portfolio backtesting result by using different multiple symbol, sliding window, optimize frequency and optimize function , "
"only the symbol is required, the sliding window, frequency and function are optional. The sliding window size can be default by six months. The optimize frequency can be default by semi-annually. The optimize function can be default by sharpe ratio."})
"only the symbol is required, the sliding window, frequency and function are optional. The sliding window size can be default by six months. The optimize frequency can be default by semi-annually. The optimize function can be default by sharpe ratio. "})
messages.append({"role": "user", "content": query})
chat_response = chat_completion_request(messages,tools=tools)
@ -76,7 +96,7 @@ def backest_main(query):
assistant_messages = chat_response.choices[0].message.tool_calls
messages.append(assistant_messages)
available_functions = {
"get_backtest": get_backtest,
"get_backtest": get_tpm_backtest,
} # only one function in this example, but you can have multiple
for tool_call in assistant_messages:
function_name = tool_call.function.name
@ -87,13 +107,13 @@ def backest_main(query):
sliding_window=function_args.get("sliding window"),
frequency=function_args.get("frequency"),
function=function_args.get("function"),
period=function_args.get("period")
)
sec_message.append({"role": "system","content": "You are a professional financial analyst. The user will provide you with some results from their backtesting of an investment portfolio using the Efficient Frontier calculation. These results include annualized return, annualized Sharpe ratio, annualized volatility, maximum drawdown, Alpha, Beta, VaR10, and R2. Please provide professional advice in Traditional Chinese based on these reports. "})
sec_message.append({"role": "system","content": "You are a professional financial analyst. The user will provide you with some results from their backtesting of an investment portfolio using the Efficient Frontier calculation. These results include annualized return, annualized Sharpe ratio, annualized volatility, maximum drawdown. Please provide professional advice in 繁體中文 based on these reports. "})
sec_message.append({"role": "user","content": function_response})
result_messages = chat_completion_request(sec_message)
return result_messages.choices[0].message.content
query = "我想要使用台積電和蘋果股票來進行最大夏普比率的回測"
query = "我想要使用GOOGLE和台積電來進行最大夏普比率2019/7/1至2023/9/22的回測"
article = backest_main(query)
print(article)
Loading…
Cancel
Save