import json import time import numpy as np import pandas as pd from scipy.optimize import minimize import yfinance as yf import psycopg2 from config import SQL_CONFIG class MVO(object): @staticmethod def portfolio_info(w, ret, market_ret, rf=0): # return and drawdown retPort = ret@w # T-dimensional array cum_ret = (retPort+1).cumprod() rolling_max=np.maximum.accumulate(cum_ret) mdd = np.max((rolling_max - cum_ret)/rolling_max) ## Sharpe Ratio stdPort = np.std(retPort) vol = stdPort*15.87451 annual_ret = np.mean(retPort) * 252 annual_sr = (annual_ret-rf) / vol ## alpha, beta cov = np.cov(retPort, market_ret) beta = cov[0, 1] / cov[1, 1] alpha = annual_ret - rf - beta*(np.mean(market_ret) * 252 - rf) R2 = cov[0, 1]**2/(cov[0, 0] * cov[1, 1]) ## n-day 95% VaR var10 = -annual_ret*(10/252) + 1.645*vol*(10/252)**(1/2) d = dict(annual_ret = annual_ret, vol=vol, mdd=mdd, annual_sr=annual_sr, beta=beta, alpha=alpha, var10=var10, R2=R2) return {key: round(d[key], 2) for key in d} @staticmethod def sharpe_ratio(w, ret): cov = np.cov(ret.T) # print(cov.shape, w.shape) retPort = ret@w # T-dimensional array stdPort = np.std(retPort) return np.mean(retPort)/stdPort @staticmethod def sharpe_grad(w, ret, cov): manual_ret = np.mean(ret, axis=0) # print(cov.shape, w.shape) retPort = ret@w # T-dimensional array stdPort = np.std(retPort) g1=manual_ret/stdPort g2=np.mean(retPort)*stdPort**(-3)*cov@w return g1-g2 @staticmethod def sortino_ratio(w, ret): retPort = ret@w # T-dimensional array stdPort = np.std(np.maximum(-retPort, 0)) return np.mean(retPort)/stdPort @staticmethod def sortino_grad(w, ret, cov_sor): manual_ret = np.mean(ret, axis=0) # print(cov.shape, w.shape) retPort = ret@w # T-dimensional arrayss stdPort = np.std(retPort) g1=manual_ret/stdPort g2=np.mean(retPort)*stdPort**(-3)*cov_sor@w return g1-g2 @staticmethod def sortino_ratio(w, ret): retPort = ret@w # T-dimensional array stdPort = np.std(np.maximum(-retPort, 0)) return np.mean(retPort)/stdPort @staticmethod def sortino_grad(w, ret, cov_sor): manual_ret = np.mean(ret, axis=0) # print(cov.shape, w.shape) retPort = ret@w # T-dimensional arrayss stdPort = np.std(retPort) g1=manual_ret/stdPort g2=np.mean(retPort)*stdPort**(-3)*cov_sor@w return g1-g2 # equivalent opt problem with min vol @staticmethod def volatility(w, ret): retPort = ret@w # T-dimensional array return np.std(retPort) @staticmethod def volatility_grad(w, ret, cov): retPort = ret@w # T-dimensional array stdPort = np.std(retPort) return cov@w/stdPort @staticmethod def quadratic_utility(w, ret, gamma): retPort = ret@w # T-dimensional array varPort = np.var(retPort) return np.mean(retPort) - 0.5*gamma*varPort @staticmethod def quadratic_utility_grad(w, ret, cov, gamma): manual_ret = np.mean(ret, axis=0) return manual_ret - gamma*cov@w @classmethod def opt(cls, ret, gamma=0, role="max_sharpe"): n = ret.shape[1] init=np.ones(n)/n if role=="max_sharpe": if n==1: cov=np.array(np.cov(ret.T)) else: cov=np.cov(ret.T) loss = lambda w: -cls.sharpe_ratio(w, ret) grad = lambda w: -cls.sharpe_grad(w, ret, cov) elif role=="max_sortino": if n==1: cov = np.cov(np.maximum(ret, 0).T) else: cov = np.array(np.cov(np.maximum(ret, 0).T)) loss = lambda w: -cls.sortino_ratio(w, ret) grad = lambda w: -cls.sortino_grad(w, ret, cov) elif role=="min_volatility": if n==1: cov=np.array(np.cov(ret.T)) else: cov=np.cov(ret.T) loss = lambda w: cls.volatility(w, ret) grad = lambda w: cls.volatility_grad(w, ret, cov) elif role=="quadratic_utility": if n==1: cov=np.array(np.cov(ret.T)) else: cov=np.cov(ret.T) loss = lambda w: -cls.quadratic_utility(w, ret, gamma) grad = lambda w: -cls.quadratic_utility_grad(w, ret, cov, gamma) else: return init if n==1: bnds = [[0,1]] else: bnds = [[0.03, 0.6] for i in range(n)] opts = {'maxiter': 1000, 'disp': False} cons = ({'type': 'eq', 'fun': lambda w: np.sum(w) - 1}) result = minimize(loss, init, method="SLSQP",\ options=opts, bounds=bnds, tol = None, jac = grad, constraints=cons) sol = result['x'] return np.round(sol, 2) def rolling_optimize(ret, lookback=126, backtest=126, role="max_sharpe", gamma=None): n, num = ret.shape period = (n - lookback)//backtest+1 weights = [] start = [] rets = [] for i in range(period): curr = i*backtest+lookback data_train = ret.iloc[curr-lookback:curr, :].to_numpy() data_test = ret.iloc[curr:curr+backtest, :] if len(data_test) == 0: break w = MVO.opt(data_train, role=role, gamma=gamma) start.append(data_test.index[0]) weights.append(w) rets.append(data_test.to_numpy()@w) weight = pd.DataFrame(weights, columns=ret.columns, index=pd.to_datetime(start)) rets = np.hstack(rets) equally_weighted = ret.iloc[lookback:, :].to_numpy()@np.ones(num)/num rets = pd.DataFrame(np.vstack([rets, equally_weighted]).T, columns=['Portfolio', 'Equally'], index=ret.index[lookback:]) return weight, rets def get_Data(tickers,start_date,end_date): all_data_df = pd.DataFrame() for ticker in tickers: df = yf.Ticker(ticker) data = yf.download(ticker, start=start_date, end=end_date) data.index = pd.to_datetime(data.index).strftime('%Y-%m-%d') file_name = ticker data.rename(columns={'Adj Close': file_name}, inplace=True) all_data_df = pd.concat([all_data_df, data[file_name]], axis=1, sort=False) all_data_df.reset_index(inplace=True) all_data_df = all_data_df.rename(columns={'index': 'Date'}) all_data_cleaned = all_data_df.dropna() all_data_cleaned.set_index('Date', inplace=True) all_data_cleaned.index = pd.to_datetime(all_data_cleaned.index, format='%Y-%m-%d') return all_data_cleaned def get_stock(stock_list: list,start_date , end_date): conn = psycopg2.connect(**SQL_CONFIG) sql1="SELECT ticker, date, price FROM stock_price where ticker = ANY(%s)" sql2="SELECT ticker, date, price FROM stock_price_tw where ticker = ANY(%s) ;" tw = [] us = [] for stock in stock_list: if stock[0].isdigit(): tw.append(stock) else: us.append(stock) with conn: with conn.cursor() as curs: curs.execute(sql1, (us,)) data_us= curs.fetchall() curs.execute(sql2, (tw,)) data_tw= curs.fetchall() data = data_us+data_tw dfStock = pd.DataFrame(data, columns=['ticker', 'date', 'price']) dfStock['date'] = pd.to_datetime(dfStock['date']) dfStock = dfStock.drop_duplicates() g = dfStock.groupby('ticker') port = pd.concat([g.get_group(t).set_index('date')['price'] for t in stock_list], axis=1, join='inner') port.columns=stock_list df = port.loc[start_date:end_date] return df def main(tickers, role, start_date, end_date, lookback, backtest, gamma = 0): try: data = get_stock(tickers,start_date,end_date) except: print("股票資料不合") return False n = len(data.index) if n < lookback+backtest+63: print("投資組合無法建立,資料長度與所選參數不符。") return False #限制回測最長時間為4年,非必要 elif n > 1009+lookback: port = data.iloc[-(1009+lookback):, :] else: pass length, num = data.shape ret = data.pct_change().dropna() weight, rets = rolling_optimize(ret, lookback, backtest, role=role, gamma=gamma) weight.index = weight.index.astype(str) rets.index = rets.index.astype(str) rets = rets.round(5) info = MVO.portfolio_info(np.array([1]), rets['Portfolio'].to_numpy().reshape(-1, 1), np.zeros(len(ret) - lookback)) info['assets'] = list(data.columns) info['weight'] = weight info['ret'] = rets #return bar rets.index.name = 'date' rets.index = pd.to_datetime(rets.index) ret_hist = rets.to_period('Q').groupby('date').apply(lambda x: (x + 1).prod() - 1) info['bar'] = ret_hist return info