|
|
|
import json
|
|
|
|
import time
|
|
|
|
import numpy as np
|
|
|
|
import pandas as pd
|
|
|
|
from scipy.optimize import minimize
|
|
|
|
import yfinance as yf
|
|
|
|
import psycopg2
|
|
|
|
from config import SQL_CONFIG
|
|
|
|
class MVO(object):
|
|
|
|
@staticmethod
|
|
|
|
def portfolio_info(w, ret, market_ret, rf=0):
|
|
|
|
# return and drawdown
|
|
|
|
retPort = ret@w # T-dimensional array
|
|
|
|
cum_ret = (retPort+1).cumprod()
|
|
|
|
rolling_max=np.maximum.accumulate(cum_ret)
|
|
|
|
mdd = np.max((rolling_max - cum_ret)/rolling_max)
|
|
|
|
|
|
|
|
## Sharpe Ratio
|
|
|
|
stdPort = np.std(retPort)
|
|
|
|
vol = stdPort*15.87451
|
|
|
|
annual_ret = np.mean(retPort) * 252
|
|
|
|
annual_sr = (annual_ret-rf) / vol
|
|
|
|
|
|
|
|
## alpha, beta
|
|
|
|
cov = np.cov(retPort, market_ret)
|
|
|
|
beta = cov[0, 1] / cov[1, 1]
|
|
|
|
alpha = annual_ret - rf - beta*(np.mean(market_ret) * 252 - rf)
|
|
|
|
R2 = cov[0, 1]**2/(cov[0, 0] * cov[1, 1])
|
|
|
|
|
|
|
|
## n-day 95% VaR
|
|
|
|
var10 = -annual_ret*(10/252) + 1.645*vol*(10/252)**(1/2)
|
|
|
|
d = dict(annual_ret = annual_ret,
|
|
|
|
vol=vol,
|
|
|
|
mdd=mdd,
|
|
|
|
annual_sr=annual_sr,
|
|
|
|
beta=beta,
|
|
|
|
alpha=alpha,
|
|
|
|
var10=var10,
|
|
|
|
R2=R2)
|
|
|
|
return {key: round(d[key], 2) for key in d}
|
|
|
|
@staticmethod
|
|
|
|
def sharpe_ratio(w, ret):
|
|
|
|
cov = np.cov(ret.T)
|
|
|
|
# print(cov.shape, w.shape)
|
|
|
|
retPort = ret@w # T-dimensional array
|
|
|
|
stdPort = np.std(retPort)
|
|
|
|
return np.mean(retPort)/stdPort
|
|
|
|
@staticmethod
|
|
|
|
def sharpe_grad(w, ret, cov):
|
|
|
|
manual_ret = np.mean(ret, axis=0)
|
|
|
|
# print(cov.shape, w.shape)
|
|
|
|
retPort = ret@w # T-dimensional array
|
|
|
|
stdPort = np.std(retPort)
|
|
|
|
g1=manual_ret/stdPort
|
|
|
|
g2=np.mean(retPort)*stdPort**(-3)*cov@w
|
|
|
|
return g1-g2
|
|
|
|
@staticmethod
|
|
|
|
def sortino_ratio(w, ret):
|
|
|
|
retPort = ret@w # T-dimensional array
|
|
|
|
stdPort = np.std(np.maximum(-retPort, 0))
|
|
|
|
return np.mean(retPort)/stdPort
|
|
|
|
@staticmethod
|
|
|
|
def sortino_grad(w, ret, cov_sor):
|
|
|
|
manual_ret = np.mean(ret, axis=0)
|
|
|
|
# print(cov.shape, w.shape)
|
|
|
|
retPort = ret@w # T-dimensional arrayss
|
|
|
|
stdPort = np.std(retPort)
|
|
|
|
g1=manual_ret/stdPort
|
|
|
|
g2=np.mean(retPort)*stdPort**(-3)*cov_sor@w
|
|
|
|
return g1-g2
|
|
|
|
@staticmethod
|
|
|
|
def sortino_ratio(w, ret):
|
|
|
|
retPort = ret@w # T-dimensional array
|
|
|
|
stdPort = np.std(np.maximum(-retPort, 0))
|
|
|
|
return np.mean(retPort)/stdPort
|
|
|
|
@staticmethod
|
|
|
|
def sortino_grad(w, ret, cov_sor):
|
|
|
|
manual_ret = np.mean(ret, axis=0)
|
|
|
|
# print(cov.shape, w.shape)
|
|
|
|
retPort = ret@w # T-dimensional arrayss
|
|
|
|
stdPort = np.std(retPort)
|
|
|
|
g1=manual_ret/stdPort
|
|
|
|
g2=np.mean(retPort)*stdPort**(-3)*cov_sor@w
|
|
|
|
return g1-g2
|
|
|
|
# equivalent opt problem with min vol
|
|
|
|
@staticmethod
|
|
|
|
def volatility(w, ret):
|
|
|
|
retPort = ret@w # T-dimensional array
|
|
|
|
return np.std(retPort)
|
|
|
|
@staticmethod
|
|
|
|
def volatility_grad(w, ret, cov):
|
|
|
|
retPort = ret@w # T-dimensional array
|
|
|
|
stdPort = np.std(retPort)
|
|
|
|
return cov@w/stdPort
|
|
|
|
@staticmethod
|
|
|
|
def quadratic_utility(w, ret, gamma):
|
|
|
|
retPort = ret@w # T-dimensional array
|
|
|
|
varPort = np.var(retPort)
|
|
|
|
return np.mean(retPort) - 0.5*gamma*varPort
|
|
|
|
@staticmethod
|
|
|
|
def quadratic_utility_grad(w, ret, cov, gamma):
|
|
|
|
manual_ret = np.mean(ret, axis=0)
|
|
|
|
return manual_ret - gamma*cov@w
|
|
|
|
@classmethod
|
|
|
|
def opt(cls, ret, gamma=0, role="max_sharpe"):
|
|
|
|
n = ret.shape[1]
|
|
|
|
init=np.ones(n)/n
|
|
|
|
if role=="max_sharpe":
|
|
|
|
if n==1:
|
|
|
|
cov=np.array(np.cov(ret.T))
|
|
|
|
else:
|
|
|
|
cov=np.cov(ret.T)
|
|
|
|
loss = lambda w: -cls.sharpe_ratio(w, ret)
|
|
|
|
grad = lambda w: -cls.sharpe_grad(w, ret, cov)
|
|
|
|
elif role=="max_sortino":
|
|
|
|
if n==1:
|
|
|
|
cov = np.cov(np.maximum(ret, 0).T)
|
|
|
|
else:
|
|
|
|
cov = np.array(np.cov(np.maximum(ret, 0).T))
|
|
|
|
loss = lambda w: -cls.sortino_ratio(w, ret)
|
|
|
|
grad = lambda w: -cls.sortino_grad(w, ret, cov)
|
|
|
|
elif role=="min_volatility":
|
|
|
|
if n==1:
|
|
|
|
cov=np.array(np.cov(ret.T))
|
|
|
|
else:
|
|
|
|
cov=np.cov(ret.T)
|
|
|
|
loss = lambda w: cls.volatility(w, ret)
|
|
|
|
grad = lambda w: cls.volatility_grad(w, ret, cov)
|
|
|
|
elif role=="quadratic_utility":
|
|
|
|
if n==1:
|
|
|
|
cov=np.array(np.cov(ret.T))
|
|
|
|
else:
|
|
|
|
cov=np.cov(ret.T)
|
|
|
|
loss = lambda w: -cls.quadratic_utility(w, ret, gamma)
|
|
|
|
grad = lambda w: -cls.quadratic_utility_grad(w, ret, cov, gamma)
|
|
|
|
else:
|
|
|
|
return init
|
|
|
|
if n==1:
|
|
|
|
bnds = [[0,1]]
|
|
|
|
else:
|
|
|
|
bnds = [[0.03, 0.6] for i in range(n)]
|
|
|
|
opts = {'maxiter': 1000, 'disp': False}
|
|
|
|
cons = ({'type': 'eq', 'fun': lambda w: np.sum(w) - 1})
|
|
|
|
result = minimize(loss, init, method="SLSQP",\
|
|
|
|
options=opts, bounds=bnds, tol = None, jac = grad, constraints=cons)
|
|
|
|
sol = result['x']
|
|
|
|
return np.round(sol, 2)
|
|
|
|
def rolling_optimize(ret, lookback=126, backtest=126, role="max_sharpe", gamma=None):
|
|
|
|
n, num = ret.shape
|
|
|
|
period = (n - lookback)//backtest+1
|
|
|
|
weights = []
|
|
|
|
start = []
|
|
|
|
rets = []
|
|
|
|
for i in range(period):
|
|
|
|
curr = i*backtest+lookback
|
|
|
|
data_train = ret.iloc[curr-lookback:curr, :].to_numpy()
|
|
|
|
data_test = ret.iloc[curr:curr+backtest, :]
|
|
|
|
if len(data_test) == 0:
|
|
|
|
break
|
|
|
|
w = MVO.opt(data_train, role=role, gamma=gamma)
|
|
|
|
start.append(data_test.index[0])
|
|
|
|
weights.append(w)
|
|
|
|
rets.append(data_test.to_numpy()@w)
|
|
|
|
weight = pd.DataFrame(weights, columns=ret.columns, index=pd.to_datetime(start))
|
|
|
|
rets = np.hstack(rets)
|
|
|
|
equally_weighted = ret.iloc[lookback:, :].to_numpy()@np.ones(num)/num
|
|
|
|
rets = pd.DataFrame(np.vstack([rets, equally_weighted]).T, columns=['Portfolio', 'Equally'], index=ret.index[lookback:])
|
|
|
|
return weight, rets
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_Data(tickers,start_date,end_date):
|
|
|
|
all_data_df = pd.DataFrame()
|
|
|
|
|
|
|
|
for ticker in tickers:
|
|
|
|
df = yf.Ticker(ticker)
|
|
|
|
data = yf.download(ticker, start=start_date, end=end_date)
|
|
|
|
|
|
|
|
data.index = pd.to_datetime(data.index).strftime('%Y-%m-%d')
|
|
|
|
file_name = ticker
|
|
|
|
data.rename(columns={'Adj Close': file_name}, inplace=True)
|
|
|
|
|
|
|
|
all_data_df = pd.concat([all_data_df, data[file_name]], axis=1, sort=False)
|
|
|
|
|
|
|
|
all_data_df.reset_index(inplace=True)
|
|
|
|
all_data_df = all_data_df.rename(columns={'index': 'Date'})
|
|
|
|
all_data_cleaned = all_data_df.dropna()
|
|
|
|
|
|
|
|
all_data_cleaned.set_index('Date', inplace=True)
|
|
|
|
all_data_cleaned.index = pd.to_datetime(all_data_cleaned.index, format='%Y-%m-%d')
|
|
|
|
return all_data_cleaned
|
|
|
|
def get_stock(stock_list: list,start_date , end_date):
|
|
|
|
conn = psycopg2.connect(**SQL_CONFIG)
|
|
|
|
sql1="SELECT ticker, date, price FROM stock_price where ticker = ANY(%s)"
|
|
|
|
sql2="SELECT ticker, date, price FROM stock_price_tw where ticker = ANY(%s) ;"
|
|
|
|
tw = []
|
|
|
|
us = []
|
|
|
|
for stock in stock_list:
|
|
|
|
if stock[0].isdigit():
|
|
|
|
tw.append(stock)
|
|
|
|
else:
|
|
|
|
us.append(stock)
|
|
|
|
with conn:
|
|
|
|
with conn.cursor() as curs:
|
|
|
|
curs.execute(sql1, (us,))
|
|
|
|
data_us= curs.fetchall()
|
|
|
|
curs.execute(sql2, (tw,))
|
|
|
|
data_tw= curs.fetchall()
|
|
|
|
data = data_us+data_tw
|
|
|
|
dfStock = pd.DataFrame(data, columns=['ticker', 'date', 'price'])
|
|
|
|
dfStock['date'] = pd.to_datetime(dfStock['date'])
|
|
|
|
dfStock = dfStock.drop_duplicates()
|
|
|
|
g = dfStock.groupby('ticker')
|
|
|
|
port = pd.concat([g.get_group(t).set_index('date')['price'] for t in stock_list], axis=1, join='inner')
|
|
|
|
port.columns=stock_list
|
|
|
|
df = port.loc[start_date:end_date]
|
|
|
|
return df
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(tickers, role, start_date, end_date, lookback, backtest, gamma = 0):
|
|
|
|
try:
|
|
|
|
data = get_stock(tickers,start_date,end_date)
|
|
|
|
except:
|
|
|
|
print("股票資料不合")
|
|
|
|
return False
|
|
|
|
n = len(data.index)
|
|
|
|
if n < lookback+backtest+63:
|
|
|
|
print("投資組合無法建立,資料長度與所選參數不符。")
|
|
|
|
return False
|
|
|
|
#限制回測最長時間為4年,非必要
|
|
|
|
elif n > 1009+lookback:
|
|
|
|
port = data.iloc[-(1009+lookback):, :]
|
|
|
|
else:
|
|
|
|
pass
|
|
|
|
|
|
|
|
length, num = data.shape
|
|
|
|
ret = data.pct_change().dropna()
|
|
|
|
weight, rets = rolling_optimize(ret, lookback, backtest, role=role, gamma=gamma)
|
|
|
|
weight.index = weight.index.astype(str)
|
|
|
|
rets.index = rets.index.astype(str)
|
|
|
|
rets = rets.round(5)
|
|
|
|
|
|
|
|
info = MVO.portfolio_info(np.array([1]), rets['Portfolio'].to_numpy().reshape(-1, 1), np.zeros(len(ret) - lookback))
|
|
|
|
info['assets'] = list(data.columns)
|
|
|
|
info['weight'] = weight
|
|
|
|
info['ret'] = rets
|
|
|
|
#return bar
|
|
|
|
rets.index.name = 'date'
|
|
|
|
rets.index = pd.to_datetime(rets.index)
|
|
|
|
ret_hist = rets.to_period('Q').groupby('date').apply(lambda x: (x + 1).prod() - 1)
|
|
|
|
info['bar'] = ret_hist
|
|
|
|
|
|
|
|
return info
|