You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

254 lines
8.8 KiB

import json
import time
import numpy as np
import pandas as pd
from scipy.optimize import minimize
import yfinance as yf
import psycopg2
from config import SQL_CONFIG
class MVO(object):
@staticmethod
def portfolio_info(w, ret, market_ret, rf=0):
# return and drawdown
retPort = ret@w # T-dimensional array
cum_ret = (retPort+1).cumprod()
rolling_max=np.maximum.accumulate(cum_ret)
mdd = np.max((rolling_max - cum_ret)/rolling_max)
## Sharpe Ratio
stdPort = np.std(retPort)
vol = stdPort*15.87451
annual_ret = np.mean(retPort) * 252
annual_sr = (annual_ret-rf) / vol
## alpha, beta
cov = np.cov(retPort, market_ret)
beta = cov[0, 1] / cov[1, 1]
alpha = annual_ret - rf - beta*(np.mean(market_ret) * 252 - rf)
R2 = cov[0, 1]**2/(cov[0, 0] * cov[1, 1])
## n-day 95% VaR
var10 = -annual_ret*(10/252) + 1.645*vol*(10/252)**(1/2)
d = dict(annual_ret = annual_ret,
vol=vol,
mdd=mdd,
annual_sr=annual_sr,
beta=beta,
alpha=alpha,
var10=var10,
R2=R2)
return {key: round(d[key], 2) for key in d}
@staticmethod
def sharpe_ratio(w, ret):
cov = np.cov(ret.T)
# print(cov.shape, w.shape)
retPort = ret@w # T-dimensional array
stdPort = np.std(retPort)
return np.mean(retPort)/stdPort
@staticmethod
def sharpe_grad(w, ret, cov):
manual_ret = np.mean(ret, axis=0)
# print(cov.shape, w.shape)
retPort = ret@w # T-dimensional array
stdPort = np.std(retPort)
g1=manual_ret/stdPort
g2=np.mean(retPort)*stdPort**(-3)*cov@w
return g1-g2
@staticmethod
def sortino_ratio(w, ret):
retPort = ret@w # T-dimensional array
stdPort = np.std(np.maximum(-retPort, 0))
return np.mean(retPort)/stdPort
@staticmethod
def sortino_grad(w, ret, cov_sor):
manual_ret = np.mean(ret, axis=0)
# print(cov.shape, w.shape)
retPort = ret@w # T-dimensional arrayss
stdPort = np.std(retPort)
g1=manual_ret/stdPort
g2=np.mean(retPort)*stdPort**(-3)*cov_sor@w
return g1-g2
@staticmethod
def sortino_ratio(w, ret):
retPort = ret@w # T-dimensional array
stdPort = np.std(np.maximum(-retPort, 0))
return np.mean(retPort)/stdPort
@staticmethod
def sortino_grad(w, ret, cov_sor):
manual_ret = np.mean(ret, axis=0)
# print(cov.shape, w.shape)
retPort = ret@w # T-dimensional arrayss
stdPort = np.std(retPort)
g1=manual_ret/stdPort
g2=np.mean(retPort)*stdPort**(-3)*cov_sor@w
return g1-g2
# equivalent opt problem with min vol
@staticmethod
def volatility(w, ret):
retPort = ret@w # T-dimensional array
return np.std(retPort)
@staticmethod
def volatility_grad(w, ret, cov):
retPort = ret@w # T-dimensional array
stdPort = np.std(retPort)
return cov@w/stdPort
@staticmethod
def quadratic_utility(w, ret, gamma):
retPort = ret@w # T-dimensional array
varPort = np.var(retPort)
return np.mean(retPort) - 0.5*gamma*varPort
@staticmethod
def quadratic_utility_grad(w, ret, cov, gamma):
manual_ret = np.mean(ret, axis=0)
return manual_ret - gamma*cov@w
@classmethod
def opt(cls, ret, gamma=0, role="max_sharpe"):
n = ret.shape[1]
init=np.ones(n)/n
if role=="max_sharpe":
if n==1:
cov=np.array(np.cov(ret.T))
else:
cov=np.cov(ret.T)
loss = lambda w: -cls.sharpe_ratio(w, ret)
grad = lambda w: -cls.sharpe_grad(w, ret, cov)
elif role=="max_sortino":
if n==1:
cov = np.cov(np.maximum(ret, 0).T)
else:
cov = np.array(np.cov(np.maximum(ret, 0).T))
loss = lambda w: -cls.sortino_ratio(w, ret)
grad = lambda w: -cls.sortino_grad(w, ret, cov)
elif role=="min_volatility":
if n==1:
cov=np.array(np.cov(ret.T))
else:
cov=np.cov(ret.T)
loss = lambda w: cls.volatility(w, ret)
grad = lambda w: cls.volatility_grad(w, ret, cov)
elif role=="quadratic_utility":
if n==1:
cov=np.array(np.cov(ret.T))
else:
cov=np.cov(ret.T)
loss = lambda w: -cls.quadratic_utility(w, ret, gamma)
grad = lambda w: -cls.quadratic_utility_grad(w, ret, cov, gamma)
else:
return init
if n==1:
bnds = [[0,1]]
else:
bnds = [[0.03, 0.6] for i in range(n)]
opts = {'maxiter': 1000, 'disp': False}
cons = ({'type': 'eq', 'fun': lambda w: np.sum(w) - 1})
result = minimize(loss, init, method="SLSQP",\
options=opts, bounds=bnds, tol = None, jac = grad, constraints=cons)
sol = result['x']
return np.round(sol, 2)
def rolling_optimize(ret, lookback=126, backtest=126, role="max_sharpe", gamma=None):
n, num = ret.shape
period = (n - lookback)//backtest+1
weights = []
start = []
rets = []
for i in range(period):
curr = i*backtest+lookback
data_train = ret.iloc[curr-lookback:curr, :].to_numpy()
data_test = ret.iloc[curr:curr+backtest, :]
if len(data_test) == 0:
break
w = MVO.opt(data_train, role=role, gamma=gamma)
start.append(data_test.index[0])
weights.append(w)
rets.append(data_test.to_numpy()@w)
weight = pd.DataFrame(weights, columns=ret.columns, index=pd.to_datetime(start))
rets = np.hstack(rets)
equally_weighted = ret.iloc[lookback:, :].to_numpy()@np.ones(num)/num
rets = pd.DataFrame(np.vstack([rets, equally_weighted]).T, columns=['Portfolio', 'Equally'], index=ret.index[lookback:])
return weight, rets
def get_Data(tickers,start_date,end_date):
all_data_df = pd.DataFrame()
for ticker in tickers:
df = yf.Ticker(ticker)
data = yf.download(ticker, start=start_date, end=end_date)
data.index = pd.to_datetime(data.index).strftime('%Y-%m-%d')
file_name = ticker
data.rename(columns={'Adj Close': file_name}, inplace=True)
all_data_df = pd.concat([all_data_df, data[file_name]], axis=1, sort=False)
all_data_df.reset_index(inplace=True)
all_data_df = all_data_df.rename(columns={'index': 'Date'})
all_data_cleaned = all_data_df.dropna()
all_data_cleaned.set_index('Date', inplace=True)
all_data_cleaned.index = pd.to_datetime(all_data_cleaned.index, format='%Y-%m-%d')
return all_data_cleaned
def get_stock(stock_list: list,start_date , end_date):
conn = psycopg2.connect(**SQL_CONFIG)
sql1="SELECT ticker, date, price FROM stock_price where ticker = ANY(%s)"
sql2="SELECT ticker, date, price FROM stock_price_tw where ticker = ANY(%s) ;"
tw = []
us = []
for stock in stock_list:
if stock[0].isdigit():
tw.append(stock)
else:
us.append(stock)
with conn:
with conn.cursor() as curs:
curs.execute(sql1, (us,))
data_us= curs.fetchall()
curs.execute(sql2, (tw,))
data_tw= curs.fetchall()
data = data_us+data_tw
dfStock = pd.DataFrame(data, columns=['ticker', 'date', 'price'])
dfStock['date'] = pd.to_datetime(dfStock['date'])
dfStock = dfStock.drop_duplicates()
g = dfStock.groupby('ticker')
port = pd.concat([g.get_group(t).set_index('date')['price'] for t in stock_list], axis=1, join='inner')
port.columns=stock_list
df = port.loc[start_date:end_date]
return df
def main(tickers, role, start_date, end_date, lookback, backtest, gamma = 0):
try:
data = get_stock(tickers,start_date,end_date)
except:
print("股票資料不合")
return False
n = len(data.index)
if n < lookback+backtest+63:
print("投資組合無法建立,資料長度與所選參數不符。")
return False
#限制回測最長時間為4年,非必要
elif n > 1009+lookback:
port = data.iloc[-(1009+lookback):, :]
else:
pass
length, num = data.shape
ret = data.pct_change().dropna()
weight, rets = rolling_optimize(ret, lookback, backtest, role=role, gamma=gamma)
weight.index = weight.index.astype(str)
rets.index = rets.index.astype(str)
rets = rets.round(5)
info = MVO.portfolio_info(np.array([1]), rets['Portfolio'].to_numpy().reshape(-1, 1), np.zeros(len(ret) - lookback))
info['assets'] = list(data.columns)
info['weight'] = weight
info['ret'] = rets
#return bar
rets.index.name = 'date'
rets.index = pd.to_datetime(rets.index)
ret_hist = rets.to_period('Q').groupby('date').apply(lambda x: (x + 1).prod() - 1)
info['bar'] = ret_hist
return info