| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481 |
- import logging
- import math
- import os
- from datetime import datetime, timedelta
- import backtrader as bt
- import ccxt
- import numpy as np
- import pandas as pd
- import matplotlib.pyplot as plt # For equity curve plot
- # Configure logging (outputs to console)
- # logging.basicConfig(
- # level=logging.INFO,
- # format='%(asctime)s - %(levelname)s - %(message)s'
- # )
- # logger = logging.getLogger(__name__)
- # Configure logging to both console and file
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s',
- handlers=[
- logging.StreamHandler(), # Console output
- logging.FileHandler('ucb_backtest.log', mode='w') # Log file (overwrite each run)
- ]
- )
- logger = logging.getLogger(__name__)
- # Top 15 coins (as before)
- coins = ['BTC/USDT', 'ETH/USDT', 'BNB/USDT', 'SOL/USDT', 'XRP/USDT', 'ADA/USDT', 'DOGE/USDT',
- 'AVAX/USDT', 'SHIB/USDT', 'DOT/USDT', 'LINK/USDT', 'TRX/USDT', 'UNI/USDT', 'LTC/USDT']
- exchange = ccxt.binance({'enableRateLimit': True})
- # UCB Class (simplified for backtest; updates with rewards each bar)
- class UCB:
- def __init__(self, num_arms, c=2.0):
- self.num_arms = num_arms
- self.counts = np.zeros(num_arms)
- self.mean_rewards = np.zeros(num_arms)
- self.total_pulls = 0
- self.c = c
- def compute_scores(self):
- ucb_scores = np.zeros(self.num_arms)
- for i in range(self.num_arms):
- if self.counts[i] == 0:
- ucb_scores[i] = float('inf') # Encourage exploration
- else:
- ucb_scores[i] = self.mean_rewards[i] + self.c * math.sqrt(math.log(self.total_pulls + 1) / self.counts[i])
- return ucb_scores
- def update(self, arm, reward):
- self.counts[arm] += 1
- self.total_pulls += 1
- self.mean_rewards[arm] = (self.mean_rewards[arm] * (self.counts[arm] - 1) + reward) / self.counts[arm]
- # Feature Computation Functions (from previous; applied per coin's data)
- def compute_atr(df, period=14):
- high_low = df['high'] - df['low']
- high_close = np.abs(df['high'] - df['close'].shift())
- low_close = np.abs(df['low'] - df['close'].shift())
- tr = np.maximum(high_low, high_close, low_close)
- atr = tr.rolling(period).mean()
- return atr
- def compute_ema(df, short=12, long=26):
- df['ema_short'] = df['close'].ewm(span=short, adjust=False).mean()
- df['ema_long'] = df['close'].ewm(span=long, adjust=False).mean()
- df['trend'] = np.where(df['ema_short'] > df['ema_long'], 1, -1)
- def compute_reward(df):
- df['return'] = (df['close'] - df['open']) / df['open']
- df['atr'] = compute_atr(df)
- compute_ema(df)
- reward = df['return'] * df['trend'] / df['atr'].replace(0, np.nan)
- return reward.iloc[-1] if not reward.empty else 0 # Latest reward
- # Fetch historical OHLCV data for a symbol (with caching)
- def fetch_historical_ohlcv(symbol, timeframe='1h', start_date=None, end_date=None, limit=1000, refresh=False):
- os.makedirs('dat', exist_ok=True)
- program_prefix = 'ucb_backtest'
- symbol_safe = symbol.replace('/', '-')
- start_str = start_date.strftime('%Y%m%d') if start_date else 'none'
- end_str = end_date.strftime('%Y%m%d') if end_date else 'none'
- filename = f"dat/{program_prefix}_{symbol_safe}_{timeframe}_{start_str}_{end_str}.csv"
-
- if not refresh and os.path.exists(filename):
- try:
- df = pd.read_csv(filename, index_col='timestamp', parse_dates=True)
- logger.info(f"Loaded cached data for {symbol} from {filename}")
- return df
- except Exception as e:
- logger.warning(f"Error loading cache for {symbol}: {str(e)}; fetching fresh data")
-
- try:
- since = int(start_date.timestamp() * 1000) if start_date else None
- ohlcv = []
- while True:
- data = exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit)
- if not data:
- break
- ohlcv.extend(data)
- since = data[-1][0] + 1
- if end_date and data[-1][0] >= int(end_date.timestamp() * 1000):
- break
- df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
- df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
- df.set_index('timestamp', inplace=True)
- if end_date:
- df = df[df.index <= end_date]
- logger.info(f"Fetched {len(df)} historical candles for {symbol}")
- df.to_csv(filename)
- logger.info(f"Saved data for {symbol} to {filename}")
- return df
- except Exception as e:
- logger.error(f"Error fetching historical data for {symbol}: {str(e)}")
- return pd.DataFrame()
- # Custom Backtrader Strategy with UCB
- class UCBStrategy(bt.Strategy):
- params = (
- ('position_size_pct', 0.10), # Fixed 10% per position
- ('top_n', 10), # Select top-3 coins
- ('min_hold_bars', 4), # Short-term: Min 4 hours
- ('max_hold_bars', 36), # Mid-term: Max 24 hours
- ('stop_loss_pct', 0.05), # 5% stop-loss
- ('take_profit_pct', 0.10), # 10% take-profit
- ('ucb_c', 2.0), # UCB exploration param
- )
- # hold duration min 6: max 24: final port 36.9%
- def __init__(self):
- self.ucb = UCB(len(coins), self.p.ucb_c)
- self.position_entry_bars = {} # Track bars since entry per position
- self.position_entry_prices = {} # Track entry price per position
- self.holdings_history = [] # To store holdings by date
- self.portfolio_value_history = [] # To store portfolio value by date
- self.pnl_per_coin = {coin: 0.0 for coin in coins} # <-- Add this line
- self.coin_value_history = [] # List of dicts: {'datetime': ..., 'BTC/USDT': ..., ...}
- def next(self):
- # Step 1: Update UCB with latest rewards for all coins
- rewards = []
- for i, data in enumerate(self.datas):
- df = pd.DataFrame({
- 'open': [data.open[0]],
- 'high': [data.high[0]],
- 'low': [data.low[0]],
- 'close': [data.close[0]],
- }, index=[data.datetime.datetime()])
- reward = compute_reward(df)
- rewards.append(reward)
- self.ucb.update(i, reward) # Update UCB for every coin every bar
- # Step 2: Check exits for open positions
- for data in self.datas:
- coin = data._name
- pos = self.getposition(data).size
- if pos != 0:
- bars_held = self.position_entry_bars.get(coin, 0) + 1
- self.position_entry_bars[coin] = bars_held
- entry_price = self.position_entry_prices[coin]
- current_price = data.close[0]
- pnl_pct = (current_price - entry_price) / entry_price
- # Exit conditions
- if bars_held < self.p.min_hold_bars:
- continue # Enforce min hold
- if bars_held >= self.p.max_hold_bars or pnl_pct <= -self.p.stop_loss_pct or pnl_pct >= self.p.take_profit_pct:
- self.pnl_per_coin[coin] += (current_price - entry_price) * abs(pos)
- self.close(data)
- logger.info(
- f"Trade Closed: Code={coin}, Exit Time={data.datetime.datetime()}, "
- f"Exit Price={current_price:.4f}, PnL %={pnl_pct:.4f}, "
- f"Quantity Long={pos:.4f}, Quantity Closed={pos:.4f}"
- )
- del self.position_entry_bars[coin]
- del self.position_entry_prices[coin]
- # Step 3: Select top-N coins via UCB scores
- scores = self.ucb.compute_scores()
- top_indices = np.argsort(scores)[-self.p.top_n:]
- top_coins = [coins[i] for i in top_indices]
- # Step 4: Enter new positions if possible
- portfolio_value = self.broker.getvalue()
- cash = self.broker.getcash()
- for coin in top_coins:
- data = self.getdatabyname(coin)
- if self.getposition(data).size == 0 and cash >= portfolio_value * self.p.position_size_pct:
- price = data.close[0]
- size = (portfolio_value * self.p.position_size_pct) / price
- self.buy(data=data, size=size)
- self.position_entry_bars[coin] = 0
- self.position_entry_prices[coin] = price
- logger.info(
- f"Trade Executed: Code={coin}, Entry Time={data.datetime.datetime()}, "
- f"Entry Price={price:.4f}, Quantity Long={size:.4f}, Quantity Closed=0.0000"
- )
- # --- Record holdings and portfolio value ---
- current_datetime = self.datas[0].datetime.datetime()
- holdings = {data._name: self.getposition(data).size for data in self.datas}
- portfolio_value = self.broker.getvalue()
- self.holdings_history.append({'datetime': current_datetime, **holdings})
- self.portfolio_value_history.append({'datetime': current_datetime, 'portfolio_value': portfolio_value})
- coin_values = {'datetime': current_datetime}
- for data in self.datas:
- coin = data._name
- size = self.getposition(data).size
- price = data.close[0]
- coin_values[coin] = size * price # Market value of position
- self.coin_value_history.append(coin_values)
- def notify_order(self, order):
- if order.status in [order.Completed]:
- pass # Can add more logging if needed
- def run_ucb_backtest(
- position_size_pct=0.10,
- top_n=10,
- min_hold_bars=4,
- max_hold_bars=36,
- stop_loss_pct=0.05,
- take_profit_pct=0.10,
- ucb_c=2.0,
- initial_capital=10000.0,
- start_date=None,
- end_date=None,
- refresh=False,
- selected_exchange='binance'
- ):
- exchange_params = {
- 'binance': {
- 'commission': 0.001,
- 'slippage': 0.0,
- 'timeframe': '1h',
- },
- 'coinbase': {
- 'commission': 0.0015,
- 'slippage': 0.0,
- 'timeframe': '1h',
- },
- 'kraken': {
- 'commission': 0.0026,
- 'slippage': 0.0,
- 'timeframe': '1h',
- },
- # Add more exchanges as needed
- }
- # Use parameters in your setup
- selected_exchange = 'binance' # Set this as needed
- commission = exchange_params[selected_exchange]['commission']
- slippage = exchange_params[selected_exchange]['slippage']
- timeframe = exchange_params[selected_exchange]['timeframe']
- # Backtest parameters
- end_date = datetime.now()
- start_date = end_date - timedelta(days=15) # Last 1 year
- initial_capital = 10000.0
- refresh = False # Set to True to force fresh data fetch
-
- logger.info(f"Starting backtest from {start_date} to {end_date} (refresh={refresh})")
-
- # Fetch/load data
- data_feeds = {}
- for coin in coins:
- df = fetch_historical_ohlcv(coin, start_date=start_date, end_date=end_date, refresh=refresh)
- if not df.empty:
- data_feeds[coin] = bt.feeds.PandasData(dataname=df, name=coin)
-
- if not data_feeds:
- logger.error("No data available; aborting")
- return
-
- # Set up Backtrader
- cerebro = bt.Cerebro()
- for coin, feed in data_feeds.items():
- cerebro.adddata(feed)
-
-
- cerebro.addstrategy(
- UCBStrategy,
- position_size_pct=position_size_pct,
- top_n=top_n,
- min_hold_bars=min_hold_bars,
- max_hold_bars=max_hold_bars,
- stop_loss_pct=stop_loss_pct,
- take_profit_pct=take_profit_pct,
- ucb_c=ucb_c
- )
- cerebro.broker.setcash(initial_capital)
- cerebro.broker.setcommission(commission=0.001) # 0.1%
- cerebro.broker.setcommission(commission=commission)
- # Optionally, set slippage
- cerebro.broker.set_slippage_perc(slippage)
- # Add analyzers
- cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')
- cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
- cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')
- cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
-
- # Run backtest
- results = cerebro.run()
- strat = results[0]
-
- # Convert to DataFrame
- holdings_df = pd.DataFrame(strat.holdings_history)
- portfolio_df = pd.DataFrame(strat.portfolio_value_history)
- coin_value_df = pd.DataFrame(strat.coin_value_history)
- # Save to CSV
- holdings_df.to_csv('holdings_by_date.csv', index=False)
- portfolio_df.to_csv('portfolio_value_by_date.csv', index=False)
- # Print metrics
- try:
- logger.info(f"Final Portfolio Value: {cerebro.broker.getvalue():.2f}")
- except Exception as e:
- logger.error(f"Error logging Final Portfolio Value: {e}")
- try:
- logger.info(f"Sharpe Ratio: {strat.analyzers.sharpe.get_analysis().get('sharperatio', 0):.2f}")
- except Exception as e:
- logger.error(f"Error logging Sharpe Ratio: {e}")
- try:
- logger.info(f"Max Drawdown: {strat.analyzers.drawdown.get_analysis().max.drawdown:.2f}%")
- except Exception as e:
- logger.error(f"Error logging Max Drawdown: {e}")
- try:
- logger.info(f"Total Return: {strat.analyzers.returns.get_analysis().rtot:.4f}")
- except Exception as e:
- logger.error(f"Error logging Total Return: {e}")
- try:
- logger.info(f"Number of Trades: {strat.analyzers.trades.get_analysis().total.total}")
- except Exception as e:
- logger.error(f"Error logging Number of Trades: {e}")
-
- # Collect metrics
- metrics = {}
- try:
- metrics['final_portfolio_value'] = cerebro.broker.getvalue()
- except Exception:
- metrics['final_portfolio_value'] = None
- try:
- metrics['sharpe'] = strat.analyzers.sharpe.get_analysis().get('sharperatio', 0)
- except Exception:
- metrics['sharpe'] = None
- try:
- metrics['max_drawdown'] = strat.analyzers.drawdown.get_analysis().max.drawdown
- except Exception:
- metrics['max_drawdown'] = None
- try:
- metrics['total_return'] = strat.analyzers.returns.get_analysis().rtot
- except Exception:
- metrics['total_return'] = None
- try:
- metrics['num_trades'] = strat.analyzers.trades.get_analysis().total.total
- except Exception:
- metrics['num_trades'] = None
- try:
- trade_analysis = strat.analyzers.trades.get_analysis()
- metrics['num_trades'] = trade_analysis.total.total if 'total' in trade_analysis and 'total' in trade_analysis.total else None
- metrics['num_wins'] = trade_analysis.won.total if 'won' in trade_analysis and 'total' in trade_analysis.won else None
- metrics['num_loss'] = trade_analysis.lost.total if 'lost' in trade_analysis and 'total' in trade_analysis.lost else None
- metrics['win_rate'] = (metrics['num_wins'] / metrics['num_trades']) if metrics['num_trades'] and metrics['num_wins'] is not None else None
- metrics['avg_win'] = trade_analysis.won.pnl.average if 'won' in trade_analysis and 'pnl' in trade_analysis.won and 'average' in trade_analysis.won.pnl else None
- metrics['avg_loss'] = trade_analysis.lost.pnl.average if 'lost' in trade_analysis and 'pnl' in trade_analysis.lost and 'average' in trade_analysis.lost.pnl else None
- except Exception:
- metrics['num_trades'] = None
- metrics['num_wins'] = None
- metrics['num_loss'] = None
- metrics['win_rate'] = None
- metrics['avg_win'] = None
- metrics['avg_loss'] = None
- return metrics
- def main():
- # Define parameter grid for batch testing
- # (position_size_pct, top_n, min_hold_bars, max_hold_bars, stop_loss_pct, take_profit_pct, ucb_c, days)
- param_grid = [
- (0.10, 3, 4, 12, 0.05, 0.10, 2.0, 5),
- (0.10, 5, 6, 24, 0.03, 0.08, 1.5, 5),
- (0.20, 7, 8, 36, 0.04, 0.12, 2.5, 5),
- (0.10, 10, 4, 36, 0.05, 0.10, 1.0, 5),
- (0.12, 12, 5, 20, 0.02, 0.07, 2.2, 5),
- # Add more parameter sets as desired
- ]
- results = []
- for idx, (position_size_pct, top_n, min_hold_bars, max_hold_bars, stop_loss_pct, take_profit_pct, ucb_c, days) in enumerate(param_grid):
- logger.info(f"Running backtest {idx+1}/{len(param_grid)}: "
- f"pos_size={position_size_pct}, top_n={top_n}, min_hold={min_hold_bars}, max_hold={max_hold_bars}, "
- f"stop_loss={stop_loss_pct}, take_profit={take_profit_pct}, ucb_c={ucb_c}, days={days}")
- end_date = datetime.now()
- start_date = end_date - timedelta(days=days)
- metrics = run_ucb_backtest(
- position_size_pct=position_size_pct,
- top_n=top_n,
- min_hold_bars=min_hold_bars,
- max_hold_bars=max_hold_bars,
- stop_loss_pct=stop_loss_pct,
- take_profit_pct=take_profit_pct,
- ucb_c=ucb_c,
- initial_capital=10000.0,
- start_date=start_date,
- end_date=end_date,
- refresh=False,
- selected_exchange='binance'
- )
- # Add params to metrics for tracking
- metrics.update({
- 'position_size_pct': position_size_pct,
- 'top_n': top_n,
- 'min_hold_bars': min_hold_bars,
- 'max_hold_bars': max_hold_bars,
- 'stop_loss_pct': stop_loss_pct,
- 'take_profit_pct': take_profit_pct,
- 'ucb_c': ucb_c,
- 'days': days
- })
- results.append(metrics)
- # Convert results to DataFrame
- results_df = pd.DataFrame(results)
- results_df.to_csv('ucb_batch_results.csv', index=False)
- print("Batch backtest results saved to ucb_batch_results.csv")
- print(results_df)
- # Plot performance (final portfolio value) for each parameter set
- plt.figure(figsize=(12, 6))
- plt.bar(range(len(results_df)), results_df['final_portfolio_value'], tick_label=[
- f"top{row['top_n']}_min{row['min_hold_bars']}_max{row['max_hold_bars']}_ucb{row['ucb_c']}_d{row['days']}"
- for _, row in results_df.iterrows()
- ])
- plt.ylabel('Final Portfolio Value')
- plt.xlabel('Parameter Set')
- plt.title('Final Portfolio Value for Each Parameter Set')
- plt.xticks(rotation=45, ha='right')
- plt.tight_layout()
- plt.savefig("ucb_batch_performancef-btplot_top{top_n}_min{min_hold_bars}_max{max_hold_bars}_ucb{ucb_c}.png")
- plt.close()
- print("Performance plot saved to ucb_batch_performance.png")
- # Keep the original main for CLI usage
- if __name__ == "__main__":
- main()
|