import logging import math import os # For file/path operations from datetime import datetime, timedelta import ccxt import matplotlib.pyplot as plt import matplotlib import numpy as np import pandas as pd matplotlib.use('TkAgg') # Or 'Qt5Agg' if you have Qt installed # Configure logging (outputs to console) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Top 15 coins (as before) coins = ['BTC/USDT', 'ETH/USDT', 'BNB/USDT', 'SOL/USDT', 'XRP/USDT', 'ADA/USDT', 'DOGE/USDT', 'AVAX/USDT', 'SHIB/USDT', 'DOT/USDT', 'LINK/USDT', 'TRX/USDT', 'UNI/USDT', 'LTC/USDT'] exchange = ccxt.binance({'enableRateLimit': True}) # UCB Class (from previous implementation) class UCB: def __init__(self, num_arms, c=2.0): self.num_arms = num_arms self.counts = np.zeros(num_arms) self.mean_rewards = np.zeros(num_arms) self.total_pulls = 0 self.c = c def select_arm(self): ucb_scores = np.zeros(self.num_arms) for i in range(self.num_arms): if self.counts[i] == 0: return i ucb_scores[i] = self.mean_rewards[i] + self.c * math.sqrt(math.log(self.total_pulls + 1) / self.counts[i]) return np.argmax(ucb_scores) def update(self, arm, reward): self.counts[arm] += 1 self.total_pulls += 1 self.mean_rewards[arm] = (self.mean_rewards[arm] * (self.counts[arm] - 1) + reward) / self.counts[arm] # Feature Computation Functions (from previous implementation) def compute_atr(df, period=14): high_low = df['high'] - df['low'] high_close = np.abs(df['high'] - df['close'].shift()) low_close = np.abs(df['low'] - df['close'].shift()) tr = np.maximum(high_low, high_close, low_close) atr = tr.rolling(period).mean() return atr def compute_ema(df, short=12, long=26): df['ema_short'] = df['close'].ewm(span=short, adjust=False).mean() df['ema_long'] = df['close'].ewm(span=long, adjust=False).mean() df['trend'] = np.where(df['ema_short'] > df['ema_long'], 1, -1) def compute_rewards(df): df['return'] = (df['close'] - df['open']) / df['open'] df['atr'] = compute_atr(df) compute_ema(df) df['reward'] = df['return'] * df['trend'] / df['atr'].replace(0, np.nan) return df.dropna() # Fetch historical OHLCV data for a symbol (with caching) def fetch_historical_ohlcv(symbol, timeframe='1h', start_date=None, end_date=None, limit=1000, refresh=False): # Create dat folder if it doesn't exist os.makedirs('dat', exist_ok=True) # Generate safe filename with program prefix program_prefix = 'ucb_backtest' symbol_safe = symbol.replace('/', '-') start_str = start_date.strftime('%Y%m%d') if start_date else 'none' end_str = end_date.strftime('%Y%m%d') if end_date else 'none' filename = f"dat/{program_prefix}_{symbol_safe}_{timeframe}_{start_str}_{end_str}.csv" if not refresh and os.path.exists(filename): try: df = pd.read_csv(filename, index_col='timestamp', parse_dates=True) logger.info(f"Loaded cached data for {symbol} from {filename}") return df except Exception as e: logger.warning(f"Error loading cache for {symbol}: {str(e)}; fetching fresh data") # Fetch fresh data try: since = int(start_date.timestamp() * 1000) if start_date else None ohlcv = [] while True: data = exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit) if not data: break ohlcv.extend(data) since = data[-1][0] + 1 # Next batch starts after last timestamp if end_date and data[-1][0] >= int(end_date.timestamp() * 1000): break df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') df.set_index('timestamp', inplace=True) if end_date: df = df[df.index <= end_date] logger.info(f"Fetched {len(df)} historical candles for {symbol}") # Save to cache df.to_csv(filename) logger.info(f"Saved data for {symbol} to {filename}") return df except Exception as e: logger.error(f"Error fetching historical data for {symbol}: {str(e)}") return pd.DataFrame() # Backtest function (added refresh param) def backtest_ucb(start_date, end_date, initial_capital=10000.0, refresh=False): # Fetch historical data for all coins (with caching) historical_data = {} for coin in coins: df = fetch_historical_ohlcv(coin, start_date=start_date, end_date=end_date, refresh=refresh) if not df.empty: historical_data[coin] = compute_rewards(df) if not historical_data: logger.error("No historical data available; aborting backtest") return None # Find common timestamps across all coins (align data) all_timestamps = sorted(set.intersection(*(set(df.index) for df in historical_data.values()))) logger.info(f"Backtesting over {len(all_timestamps)} aligned periods") # Initialize UCB and portfolio tracking ucb = UCB(len(coins)) portfolio_values = [initial_capital] selected_coins = [] period_returns = [] current_capital = initial_capital for i in range(1, len(all_timestamps)): # Start from second period to have prior data current_time = all_timestamps[i] prev_time = all_timestamps[i-1] # Get data slices up to current time for reward computation current_data = {} for coin in coins: if coin in historical_data: df_slice = historical_data[coin].loc[:current_time] if not df_slice.empty: current_data[coin] = df_slice if not current_data: continue # Select arm (coin) arm = ucb.select_arm() coin = coins[arm] if coin not in current_data or current_data[coin].empty: logger.warning(f"No data for selected coin {coin} at {current_time}; skipping") continue # Simulate trade: Get return from prev to current close prev_close = historical_data[coin].loc[prev_time, 'close'] if prev_time in historical_data[coin].index else None current_close = historical_data[coin].loc[current_time, 'close'] if prev_close is None: continue period_return = (current_close - prev_close) / prev_close reward = current_data[coin].loc[current_time, 'reward'] # Use computed reward for UCB update # Calculate quantity (simulated: full allocation) quantity = current_capital / prev_close if prev_close != 0 else 0 # Log trade details logger.info(f"Trade Executed: Code={coin}, Entry Time={prev_time}, Entry Price={prev_close:.4f}, Quantity={quantity:.4f}") logger.info(f"Trade Closed: Exit Time={current_time}, Exit Price={current_close:.4f}, Realized Return={period_return:.4f}, Reward={reward:.4f}") # Update portfolio current_capital *= (1 + period_return) portfolio_values.append(current_capital) period_returns.append(period_return) selected_coins.append(coin) # Update UCB with realized reward ucb.update(arm, reward) logger.debug(f"Period {current_time}: Selected {coin}, Return: {period_return:.4f}, Reward: {reward:.4f}, Capital: {current_capital:.2f}") # Compute performance metrics if not period_returns: logger.error("No trades executed; aborting metrics") return None total_return = (current_capital - initial_capital) / initial_capital num_periods = len(period_returns) days = (end_date - start_date).days annualized_return = (1 + total_return) ** (365 / days) - 1 if days > 0 else 0 sharpe_ratio = np.mean(period_returns) / np.std(period_returns) * np.sqrt(8760) if np.std(period_returns) != 0 else 0 # Annualized, 8760 hours/year max_drawdown = np.min(np.cumprod(1 + np.array(period_returns)) / np.maximum.accumulate(np.cumprod(1 + np.array(period_returns)))) - 1 results = { 'total_return': total_return, 'annualized_return': annualized_return, 'sharpe_ratio': sharpe_ratio, 'max_drawdown': max_drawdown, 'final_capital': current_capital, 'num_trades': num_periods } logger.info(f"Backtest Results: {results}") # Plot equity curve plt.figure(figsize=(10, 6)) plt.plot(all_timestamps[:len(portfolio_values)], portfolio_values, label='Portfolio Value') plt.title('UCB Strategy Equity Curve') plt.xlabel('Date') plt.ylabel('Portfolio Value') plt.legend() plt.grid(True) plt.show() return results # Entry point def main(): # Backtest parameters (adjust as needed) end_date = datetime.now() start_date = end_date - timedelta(days=180) # Last 1 year initial_capital = 10000.0 refresh = True # Set to True to force fresh data fetch logger.info(f"Starting backtest from {start_date} to {end_date} (refresh={refresh})") backtest_ucb(start_date, end_date, initial_capital, refresh) if __name__ == "__main__": main()