import logging import math import os from datetime import datetime, timedelta import backtrader as bt import ccxt import numpy as np import pandas as pd import matplotlib.pyplot as plt # For equity curve plot # Configure logging (outputs to console) # logging.basicConfig( # level=logging.INFO, # format='%(asctime)s - %(levelname)s - %(message)s' # ) # logger = logging.getLogger(__name__) # Configure logging to both console and file logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), # Console output logging.FileHandler('ucb_backtest.log', mode='w') # Log file (overwrite each run) ] ) logger = logging.getLogger(__name__) # Top 15 coins (as before) coins = ['BTC/USDT', 'ETH/USDT', 'BNB/USDT', 'SOL/USDT', 'XRP/USDT', 'ADA/USDT', 'DOGE/USDT', 'AVAX/USDT', 'SHIB/USDT', 'DOT/USDT', 'LINK/USDT', 'TRX/USDT', 'UNI/USDT', 'LTC/USDT'] exchange = ccxt.binance({'enableRateLimit': True}) # UCB Class (simplified for backtest; updates with rewards each bar) class UCB: def __init__(self, num_arms, c=2.0): self.num_arms = num_arms self.counts = np.zeros(num_arms) self.mean_rewards = np.zeros(num_arms) self.total_pulls = 0 self.c = c def compute_scores(self): ucb_scores = np.zeros(self.num_arms) for i in range(self.num_arms): if self.counts[i] == 0: ucb_scores[i] = float('inf') # Encourage exploration else: ucb_scores[i] = self.mean_rewards[i] + self.c * math.sqrt(math.log(self.total_pulls + 1) / self.counts[i]) return ucb_scores def update(self, arm, reward): self.counts[arm] += 1 self.total_pulls += 1 self.mean_rewards[arm] = (self.mean_rewards[arm] * (self.counts[arm] - 1) + reward) / self.counts[arm] # Feature Computation Functions (from previous; applied per coin's data) def compute_atr(df, period=14): high_low = df['high'] - df['low'] high_close = np.abs(df['high'] - df['close'].shift()) low_close = np.abs(df['low'] - df['close'].shift()) tr = np.maximum(high_low, high_close, low_close) atr = tr.rolling(period).mean() return atr def compute_ema(df, short=12, long=26): df['ema_short'] = df['close'].ewm(span=short, adjust=False).mean() df['ema_long'] = df['close'].ewm(span=long, adjust=False).mean() df['trend'] = np.where(df['ema_short'] > df['ema_long'], 1, -1) def compute_reward(df): df['return'] = (df['close'] - df['open']) / df['open'] df['atr'] = compute_atr(df) compute_ema(df) reward = df['return'] * df['trend'] / df['atr'].replace(0, np.nan) return reward.iloc[-1] if not reward.empty else 0 # Latest reward # Fetch historical OHLCV data for a symbol (with caching) def fetch_historical_ohlcv(symbol, timeframe='1h', start_date=None, end_date=None, limit=1000, refresh=False): os.makedirs('dat', exist_ok=True) program_prefix = 'ucb_backtest' symbol_safe = symbol.replace('/', '-') start_str = start_date.strftime('%Y%m%d') if start_date else 'none' end_str = end_date.strftime('%Y%m%d') if end_date else 'none' filename = f"dat/{program_prefix}_{symbol_safe}_{timeframe}_{start_str}_{end_str}.csv" if not refresh and os.path.exists(filename): try: df = pd.read_csv(filename, index_col='timestamp', parse_dates=True) logger.info(f"Loaded cached data for {symbol} from {filename}") return df except Exception as e: logger.warning(f"Error loading cache for {symbol}: {str(e)}; fetching fresh data") try: since = int(start_date.timestamp() * 1000) if start_date else None ohlcv = [] while True: data = exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit) if not data: break ohlcv.extend(data) since = data[-1][0] + 1 if end_date and data[-1][0] >= int(end_date.timestamp() * 1000): break df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') df.set_index('timestamp', inplace=True) if end_date: df = df[df.index <= end_date] logger.info(f"Fetched {len(df)} historical candles for {symbol}") df.to_csv(filename) logger.info(f"Saved data for {symbol} to {filename}") return df except Exception as e: logger.error(f"Error fetching historical data for {symbol}: {str(e)}") return pd.DataFrame() # Custom Backtrader Strategy with UCB class UCBStrategy(bt.Strategy): params = ( ('position_size_pct', 0.10), # Fixed 10% per position ('top_n', 10), # Select top-3 coins ('min_hold_bars', 4), # Short-term: Min 4 hours ('max_hold_bars', 36), # Mid-term: Max 24 hours ('stop_loss_pct', 0.05), # 5% stop-loss ('take_profit_pct', 0.10), # 10% take-profit ('ucb_c', 2.0), # UCB exploration param ) # hold duration min 6: max 24: final port 36.9% def __init__(self): self.ucb = UCB(len(coins), self.p.ucb_c) self.position_entry_bars = {} # Track bars since entry per position self.position_entry_prices = {} # Track entry price per position self.holdings_history = [] # To store holdings by date self.portfolio_value_history = [] # To store portfolio value by date self.pnl_per_coin = {coin: 0.0 for coin in coins} # <-- Add this line self.coin_value_history = [] # List of dicts: {'datetime': ..., 'BTC/USDT': ..., ...} def next(self): # Step 1: Update UCB with latest rewards for all coins rewards = [] for i, data in enumerate(self.datas): df = pd.DataFrame({ 'open': [data.open[0]], 'high': [data.high[0]], 'low': [data.low[0]], 'close': [data.close[0]], }, index=[data.datetime.datetime()]) reward = compute_reward(df) rewards.append(reward) self.ucb.update(i, reward) # Update UCB for every coin every bar # Step 2: Check exits for open positions for data in self.datas: coin = data._name pos = self.getposition(data).size if pos != 0: bars_held = self.position_entry_bars.get(coin, 0) + 1 self.position_entry_bars[coin] = bars_held entry_price = self.position_entry_prices[coin] current_price = data.close[0] pnl_pct = (current_price - entry_price) / entry_price # Exit conditions if bars_held < self.p.min_hold_bars: continue # Enforce min hold if bars_held >= self.p.max_hold_bars or pnl_pct <= -self.p.stop_loss_pct or pnl_pct >= self.p.take_profit_pct: self.pnl_per_coin[coin] += (current_price - entry_price) * abs(pos) self.close(data) logger.info( f"Trade Closed: Code={coin}, Exit Time={data.datetime.datetime()}, " f"Exit Price={current_price:.4f}, PnL %={pnl_pct:.4f}, " f"Quantity Long={pos:.4f}, Quantity Closed={pos:.4f}" ) del self.position_entry_bars[coin] del self.position_entry_prices[coin] # Step 3: Select top-N coins via UCB scores scores = self.ucb.compute_scores() top_indices = np.argsort(scores)[-self.p.top_n:] top_coins = [coins[i] for i in top_indices] # Step 4: Enter new positions if possible portfolio_value = self.broker.getvalue() cash = self.broker.getcash() for coin in top_coins: data = self.getdatabyname(coin) if self.getposition(data).size == 0 and cash >= portfolio_value * self.p.position_size_pct: price = data.close[0] size = (portfolio_value * self.p.position_size_pct) / price self.buy(data=data, size=size) self.position_entry_bars[coin] = 0 self.position_entry_prices[coin] = price logger.info( f"Trade Executed: Code={coin}, Entry Time={data.datetime.datetime()}, " f"Entry Price={price:.4f}, Quantity Long={size:.4f}, Quantity Closed=0.0000" ) # --- Record holdings and portfolio value --- current_datetime = self.datas[0].datetime.datetime() holdings = {data._name: self.getposition(data).size for data in self.datas} portfolio_value = self.broker.getvalue() self.holdings_history.append({'datetime': current_datetime, **holdings}) self.portfolio_value_history.append({'datetime': current_datetime, 'portfolio_value': portfolio_value}) coin_values = {'datetime': current_datetime} for data in self.datas: coin = data._name size = self.getposition(data).size price = data.close[0] coin_values[coin] = size * price # Market value of position self.coin_value_history.append(coin_values) def notify_order(self, order): if order.status in [order.Completed]: pass # Can add more logging if needed # Entry point def main(): exchange_params = { 'binance': { 'commission': 0.001, 'slippage': 0.0, 'timeframe': '1h', }, 'coinbase': { 'commission': 0.0015, 'slippage': 0.0, 'timeframe': '1h', }, 'kraken': { 'commission': 0.0026, 'slippage': 0.0, 'timeframe': '1h', }, # Add more exchanges as needed } # Use parameters in your setup selected_exchange = 'binance' # Set this as needed commission = exchange_params[selected_exchange]['commission'] slippage = exchange_params[selected_exchange]['slippage'] timeframe = exchange_params[selected_exchange]['timeframe'] # Backtest parameters end_date = datetime.now() start_date = end_date - timedelta(days=15) # Last 1 year initial_capital = 10000.0 refresh = False # Set to True to force fresh data fetch logger.info(f"Starting backtest from {start_date} to {end_date} (refresh={refresh})") # Fetch/load data data_feeds = {} for coin in coins: df = fetch_historical_ohlcv(coin, start_date=start_date, end_date=end_date, refresh=refresh) if not df.empty: data_feeds[coin] = bt.feeds.PandasData(dataname=df, name=coin) if not data_feeds: logger.error("No data available; aborting") return # Set up Backtrader cerebro = bt.Cerebro() for coin, feed in data_feeds.items(): cerebro.adddata(feed) cerebro.addstrategy( UCBStrategy, position_size_pct=0.15, top_n=5, min_hold_bars=6, max_hold_bars=24, stop_loss_pct=0.03, take_profit_pct=0.08, ucb_c=1.5 ) cerebro.broker.setcash(initial_capital) cerebro.broker.setcommission(commission=0.001) # 0.1% cerebro.broker.setcommission(commission=commission) # Optionally, set slippage cerebro.broker.set_slippage_perc(slippage) # Add analyzers cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe') cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown') cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades') cerebro.addanalyzer(bt.analyzers.Returns, _name='returns') # Run backtest results = cerebro.run() strat = results[0] # Convert to DataFrame holdings_df = pd.DataFrame(strat.holdings_history) portfolio_df = pd.DataFrame(strat.portfolio_value_history) coin_value_df = pd.DataFrame(strat.coin_value_history) # Save to CSV holdings_df.to_csv('holdings_by_date.csv', index=False) portfolio_df.to_csv('portfolio_value_by_date.csv', index=False) # Print metrics try: logger.info(f"Final Portfolio Value: {cerebro.broker.getvalue():.2f}") except Exception as e: logger.error(f"Error logging Final Portfolio Value: {e}") try: logger.info(f"Sharpe Ratio: {strat.analyzers.sharpe.get_analysis().get('sharperatio', 0):.2f}") except Exception as e: logger.error(f"Error logging Sharpe Ratio: {e}") try: logger.info(f"Max Drawdown: {strat.analyzers.drawdown.get_analysis().max.drawdown:.2f}%") except Exception as e: logger.error(f"Error logging Max Drawdown: {e}") try: logger.info(f"Total Return: {strat.analyzers.returns.get_analysis().rtot:.4f}") except Exception as e: logger.error(f"Error logging Total Return: {e}") try: logger.info(f"Number of Trades: {strat.analyzers.trades.get_analysis().total.total}") except Exception as e: logger.error(f"Error logging Number of Trades: {e}") # Plot trades (entries/exits) on candlestick charts for each coin cerebro.plot(style='candle', iplot=False, numfigs=1) # Generates one figure with subplots per coin # Plot net P/L per coin as a bar chart pnl_data = strat.pnl_per_coin fig, ax = plt.subplots(figsize=(12, 6)) ax.bar(pnl_data.keys(), pnl_data.values(), color=['green' if v > 0 else 'red' for v in pnl_data.values()]) ax.set_title('Net P/L per Coin at End of Backtest') ax.set_xlabel('Coin') ax.set_ylabel('Net P/L ($)') ax.grid(True) plt.xticks(rotation=45, ha='right') plt.tight_layout() plt.show() plt.figure(figsize=(14, 7)) plt.plot(coin_value_df['datetime'], coin_value_df.drop('datetime', axis=1).sum(axis=1), label='Total Portfolio Value', color='black', linewidth=2) for coin in coins: if coin in coin_value_df.columns: plt.plot(coin_value_df['datetime'], coin_value_df[coin], label=coin, alpha=0.6, linewidth=1) plt.title('Portfolio Value Over Time (with Individual Coin Holdings)') plt.xlabel('Date') plt.ylabel('Value ($)') plt.legend(loc='upper left', fontsize='small', ncol=2) plt.grid(True) plt.tight_layout() plt.show() if __name__ == "__main__": main()