import logging import math import os from datetime import datetime, timedelta import backtrader as bt import ccxt import numpy as np import pandas as pd import matplotlib.pyplot as plt # For equity curve plot # Configure logging (outputs to console) # logging.basicConfig( # level=logging.INFO, # format='%(asctime)s - %(levelname)s - %(message)s' # ) # logger = logging.getLogger(__name__) # Configure logging to both console and file logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), # Console output logging.FileHandler('ucb_backtest.log', mode='w') # Log file (overwrite each run) ] ) logger = logging.getLogger(__name__) # Top 15 coins (as before) coins = ['BTC/USDT', 'ETH/USDT', 'BNB/USDT', 'SOL/USDT', 'XRP/USDT', 'ADA/USDT', 'DOGE/USDT', 'AVAX/USDT', 'SHIB/USDT', 'DOT/USDT', 'LINK/USDT', 'TRX/USDT', 'UNI/USDT', 'LTC/USDT'] exchange = ccxt.binance({'enableRateLimit': True}) # UCB Class (simplified for backtest; updates with rewards each bar) class UCB: def __init__(self, num_arms, c=2.0): self.num_arms = num_arms self.counts = np.zeros(num_arms) self.mean_rewards = np.zeros(num_arms) self.total_pulls = 0 self.c = c def compute_scores(self): ucb_scores = np.zeros(self.num_arms) for i in range(self.num_arms): if self.counts[i] == 0: ucb_scores[i] = float('inf') # Encourage exploration else: ucb_scores[i] = self.mean_rewards[i] + self.c * math.sqrt(math.log(self.total_pulls + 1) / self.counts[i]) return ucb_scores def update(self, arm, reward): self.counts[arm] += 1 self.total_pulls += 1 self.mean_rewards[arm] = (self.mean_rewards[arm] * (self.counts[arm] - 1) + reward) / self.counts[arm] # Feature Computation Functions (from previous; applied per coin's data) def compute_atr(df, period=14): high_low = df['high'] - df['low'] high_close = np.abs(df['high'] - df['close'].shift()) low_close = np.abs(df['low'] - df['close'].shift()) tr = np.maximum(high_low, high_close, low_close) atr = tr.rolling(period).mean() return atr def compute_ema(df, short=12, long=26): df['ema_short'] = df['close'].ewm(span=short, adjust=False).mean() df['ema_long'] = df['close'].ewm(span=long, adjust=False).mean() df['trend'] = np.where(df['ema_short'] > df['ema_long'], 1, -1) def compute_reward(df): df['return'] = (df['close'] - df['open']) / df['open'] df['atr'] = compute_atr(df) compute_ema(df) reward = df['return'] * df['trend'] / df['atr'].replace(0, np.nan) return reward.iloc[-1] if not reward.empty else 0 # Latest reward # Fetch historical OHLCV data for a symbol (with caching) def fetch_historical_ohlcv(symbol, timeframe='1h', start_date=None, end_date=None, limit=1000, refresh=False): os.makedirs('dat', exist_ok=True) program_prefix = 'ucb_backtest' symbol_safe = symbol.replace('/', '-') start_str = start_date.strftime('%Y%m%d') if start_date else 'none' end_str = end_date.strftime('%Y%m%d') if end_date else 'none' filename = f"dat/{program_prefix}_{symbol_safe}_{timeframe}_{start_str}_{end_str}.csv" if not refresh and os.path.exists(filename): try: df = pd.read_csv(filename, index_col='timestamp', parse_dates=True) logger.info(f"Loaded cached data for {symbol} from {filename}") return df except Exception as e: logger.warning(f"Error loading cache for {symbol}: {str(e)}; fetching fresh data") try: since = int(start_date.timestamp() * 1000) if start_date else None ohlcv = [] while True: data = exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit) if not data: break ohlcv.extend(data) since = data[-1][0] + 1 if end_date and data[-1][0] >= int(end_date.timestamp() * 1000): break df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') df.set_index('timestamp', inplace=True) if end_date: df = df[df.index <= end_date] logger.info(f"Fetched {len(df)} historical candles for {symbol}") df.to_csv(filename) logger.info(f"Saved data for {symbol} to {filename}") return df except Exception as e: logger.error(f"Error fetching historical data for {symbol}: {str(e)}") return pd.DataFrame() # Custom Backtrader Strategy with UCB class UCBStrategy(bt.Strategy): params = ( ('position_size_pct', 0.10), # Fixed 10% per position ('top_n', 10), # Select top-3 coins ('min_hold_bars', 4), # Short-term: Min 4 hours ('max_hold_bars', 36), # Mid-term: Max 24 hours ('stop_loss_pct', 0.05), # 5% stop-loss ('take_profit_pct', 0.10), # 10% take-profit ('ucb_c', 2.0), # UCB exploration param ) # hold duration min 6: max 24: final port 36.9% def __init__(self): self.ucb = UCB(len(coins), self.p.ucb_c) self.position_entry_bars = {} # Track bars since entry per position self.position_entry_prices = {} # Track entry price per position self.holdings_history = [] # To store holdings by date self.portfolio_value_history = [] # To store portfolio value by date self.pnl_per_coin = {coin: 0.0 for coin in coins} # <-- Add this line self.coin_value_history = [] # List of dicts: {'datetime': ..., 'BTC/USDT': ..., ...} def next(self): # Step 1: Update UCB with latest rewards for all coins rewards = [] for i, data in enumerate(self.datas): df = pd.DataFrame({ 'open': [data.open[0]], 'high': [data.high[0]], 'low': [data.low[0]], 'close': [data.close[0]], }, index=[data.datetime.datetime()]) reward = compute_reward(df) rewards.append(reward) self.ucb.update(i, reward) # Update UCB for every coin every bar # Step 2: Check exits for open positions for data in self.datas: coin = data._name pos = self.getposition(data).size if pos != 0: bars_held = self.position_entry_bars.get(coin, 0) + 1 self.position_entry_bars[coin] = bars_held entry_price = self.position_entry_prices[coin] current_price = data.close[0] pnl_pct = (current_price - entry_price) / entry_price # Exit conditions if bars_held < self.p.min_hold_bars: continue # Enforce min hold if bars_held >= self.p.max_hold_bars or pnl_pct <= -self.p.stop_loss_pct or pnl_pct >= self.p.take_profit_pct: self.pnl_per_coin[coin] += (current_price - entry_price) * abs(pos) self.close(data) logger.info( f"Trade Closed: Code={coin}, Exit Time={data.datetime.datetime()}, " f"Exit Price={current_price:.4f}, PnL %={pnl_pct:.4f}, " f"Quantity Long={pos:.4f}, Quantity Closed={pos:.4f}" ) del self.position_entry_bars[coin] del self.position_entry_prices[coin] # Step 3: Select top-N coins via UCB scores scores = self.ucb.compute_scores() top_indices = np.argsort(scores)[-self.p.top_n:] top_coins = [coins[i] for i in top_indices] # Step 4: Enter new positions if possible portfolio_value = self.broker.getvalue() cash = self.broker.getcash() for coin in top_coins: data = self.getdatabyname(coin) if self.getposition(data).size == 0 and cash >= portfolio_value * self.p.position_size_pct: price = data.close[0] size = (portfolio_value * self.p.position_size_pct) / price self.buy(data=data, size=size) self.position_entry_bars[coin] = 0 self.position_entry_prices[coin] = price logger.info( f"Trade Executed: Code={coin}, Entry Time={data.datetime.datetime()}, " f"Entry Price={price:.4f}, Quantity Long={size:.4f}, Quantity Closed=0.0000" ) # --- Record holdings and portfolio value --- current_datetime = self.datas[0].datetime.datetime() holdings = {data._name: self.getposition(data).size for data in self.datas} portfolio_value = self.broker.getvalue() self.holdings_history.append({'datetime': current_datetime, **holdings}) self.portfolio_value_history.append({'datetime': current_datetime, 'portfolio_value': portfolio_value}) coin_values = {'datetime': current_datetime} for data in self.datas: coin = data._name size = self.getposition(data).size price = data.close[0] coin_values[coin] = size * price # Market value of position self.coin_value_history.append(coin_values) def notify_order(self, order): if order.status in [order.Completed]: pass # Can add more logging if needed def run_ucb_backtest( position_size_pct=0.10, top_n=10, min_hold_bars=4, max_hold_bars=36, stop_loss_pct=0.05, take_profit_pct=0.10, ucb_c=2.0, initial_capital=10000.0, start_date=None, end_date=None, refresh=False, selected_exchange='binance' ): exchange_params = { 'binance': { 'commission': 0.001, 'slippage': 0.0, 'timeframe': '1h', }, 'coinbase': { 'commission': 0.0015, 'slippage': 0.0, 'timeframe': '1h', }, 'kraken': { 'commission': 0.0026, 'slippage': 0.0, 'timeframe': '1h', }, # Add more exchanges as needed } # Use parameters in your setup selected_exchange = 'binance' # Set this as needed commission = exchange_params[selected_exchange]['commission'] slippage = exchange_params[selected_exchange]['slippage'] timeframe = exchange_params[selected_exchange]['timeframe'] # Backtest parameters end_date = datetime.now() start_date = end_date - timedelta(days=15) # Last 1 year initial_capital = 10000.0 refresh = False # Set to True to force fresh data fetch logger.info(f"Starting backtest from {start_date} to {end_date} (refresh={refresh})") # Fetch/load data data_feeds = {} for coin in coins: df = fetch_historical_ohlcv(coin, start_date=start_date, end_date=end_date, refresh=refresh) if not df.empty: data_feeds[coin] = bt.feeds.PandasData(dataname=df, name=coin) if not data_feeds: logger.error("No data available; aborting") return # Set up Backtrader cerebro = bt.Cerebro() for coin, feed in data_feeds.items(): cerebro.adddata(feed) cerebro.addstrategy( UCBStrategy, position_size_pct=position_size_pct, top_n=top_n, min_hold_bars=min_hold_bars, max_hold_bars=max_hold_bars, stop_loss_pct=stop_loss_pct, take_profit_pct=take_profit_pct, ucb_c=ucb_c ) cerebro.broker.setcash(initial_capital) cerebro.broker.setcommission(commission=0.001) # 0.1% cerebro.broker.setcommission(commission=commission) # Optionally, set slippage cerebro.broker.set_slippage_perc(slippage) # Add analyzers cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe') cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown') cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades') cerebro.addanalyzer(bt.analyzers.Returns, _name='returns') # Run backtest results = cerebro.run() strat = results[0] # Convert to DataFrame holdings_df = pd.DataFrame(strat.holdings_history) portfolio_df = pd.DataFrame(strat.portfolio_value_history) coin_value_df = pd.DataFrame(strat.coin_value_history) # Save to CSV holdings_df.to_csv('holdings_by_date.csv', index=False) portfolio_df.to_csv('portfolio_value_by_date.csv', index=False) # Print metrics try: logger.info(f"Final Portfolio Value: {cerebro.broker.getvalue():.2f}") except Exception as e: logger.error(f"Error logging Final Portfolio Value: {e}") try: logger.info(f"Sharpe Ratio: {strat.analyzers.sharpe.get_analysis().get('sharperatio', 0):.2f}") except Exception as e: logger.error(f"Error logging Sharpe Ratio: {e}") try: logger.info(f"Max Drawdown: {strat.analyzers.drawdown.get_analysis().max.drawdown:.2f}%") except Exception as e: logger.error(f"Error logging Max Drawdown: {e}") try: logger.info(f"Total Return: {strat.analyzers.returns.get_analysis().rtot:.4f}") except Exception as e: logger.error(f"Error logging Total Return: {e}") try: logger.info(f"Number of Trades: {strat.analyzers.trades.get_analysis().total.total}") except Exception as e: logger.error(f"Error logging Number of Trades: {e}") # Collect metrics metrics = {} try: metrics['final_portfolio_value'] = cerebro.broker.getvalue() except Exception: metrics['final_portfolio_value'] = None try: metrics['sharpe'] = strat.analyzers.sharpe.get_analysis().get('sharperatio', 0) except Exception: metrics['sharpe'] = None try: metrics['max_drawdown'] = strat.analyzers.drawdown.get_analysis().max.drawdown except Exception: metrics['max_drawdown'] = None try: metrics['total_return'] = strat.analyzers.returns.get_analysis().rtot except Exception: metrics['total_return'] = None try: metrics['num_trades'] = strat.analyzers.trades.get_analysis().total.total except Exception: metrics['num_trades'] = None try: trade_analysis = strat.analyzers.trades.get_analysis() metrics['num_trades'] = trade_analysis.total.total if 'total' in trade_analysis and 'total' in trade_analysis.total else None metrics['num_wins'] = trade_analysis.won.total if 'won' in trade_analysis and 'total' in trade_analysis.won else None metrics['num_loss'] = trade_analysis.lost.total if 'lost' in trade_analysis and 'total' in trade_analysis.lost else None metrics['win_rate'] = (metrics['num_wins'] / metrics['num_trades']) if metrics['num_trades'] and metrics['num_wins'] is not None else None metrics['avg_win'] = trade_analysis.won.pnl.average if 'won' in trade_analysis and 'pnl' in trade_analysis.won and 'average' in trade_analysis.won.pnl else None metrics['avg_loss'] = trade_analysis.lost.pnl.average if 'lost' in trade_analysis and 'pnl' in trade_analysis.lost and 'average' in trade_analysis.lost.pnl else None except Exception: metrics['num_trades'] = None metrics['num_wins'] = None metrics['num_loss'] = None metrics['win_rate'] = None metrics['avg_win'] = None metrics['avg_loss'] = None return metrics def main(): # Define parameter grid for batch testing # (position_size_pct, top_n, min_hold_bars, max_hold_bars, stop_loss_pct, take_profit_pct, ucb_c, days) param_grid = [ (0.10, 3, 4, 12, 0.05, 0.10, 2.0, 5), (0.10, 5, 6, 24, 0.03, 0.08, 1.5, 5), (0.20, 7, 8, 36, 0.04, 0.12, 2.5, 5), (0.10, 10, 4, 36, 0.05, 0.10, 1.0, 5), (0.12, 12, 5, 20, 0.02, 0.07, 2.2, 5), # Add more parameter sets as desired ] results = [] for idx, (position_size_pct, top_n, min_hold_bars, max_hold_bars, stop_loss_pct, take_profit_pct, ucb_c, days) in enumerate(param_grid): logger.info(f"Running backtest {idx+1}/{len(param_grid)}: " f"pos_size={position_size_pct}, top_n={top_n}, min_hold={min_hold_bars}, max_hold={max_hold_bars}, " f"stop_loss={stop_loss_pct}, take_profit={take_profit_pct}, ucb_c={ucb_c}, days={days}") end_date = datetime.now() start_date = end_date - timedelta(days=days) metrics = run_ucb_backtest( position_size_pct=position_size_pct, top_n=top_n, min_hold_bars=min_hold_bars, max_hold_bars=max_hold_bars, stop_loss_pct=stop_loss_pct, take_profit_pct=take_profit_pct, ucb_c=ucb_c, initial_capital=10000.0, start_date=start_date, end_date=end_date, refresh=False, selected_exchange='binance' ) # Add params to metrics for tracking metrics.update({ 'position_size_pct': position_size_pct, 'top_n': top_n, 'min_hold_bars': min_hold_bars, 'max_hold_bars': max_hold_bars, 'stop_loss_pct': stop_loss_pct, 'take_profit_pct': take_profit_pct, 'ucb_c': ucb_c, 'days': days }) results.append(metrics) # Convert results to DataFrame results_df = pd.DataFrame(results) results_df.to_csv('ucb_batch_results.csv', index=False) print("Batch backtest results saved to ucb_batch_results.csv") print(results_df) # Plot performance (final portfolio value) for each parameter set plt.figure(figsize=(12, 6)) plt.bar(range(len(results_df)), results_df['final_portfolio_value'], tick_label=[ f"top{row['top_n']}_min{row['min_hold_bars']}_max{row['max_hold_bars']}_ucb{row['ucb_c']}_d{row['days']}" for _, row in results_df.iterrows() ]) plt.ylabel('Final Portfolio Value') plt.xlabel('Parameter Set') plt.title('Final Portfolio Value for Each Parameter Set') plt.xticks(rotation=45, ha='right') plt.tight_layout() plt.savefig("ucb_batch_performancef-btplot_top{top_n}_min{min_hold_bars}_max{max_hold_bars}_ucb{ucb_c}.png") plt.close() print("Performance plot saved to ucb_batch_performance.png") # Keep the original main for CLI usage if __name__ == "__main__": main()