| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- import logging
- import math
- import os # For file/path operations
- from datetime import datetime, timedelta
- import ccxt
- import matplotlib.pyplot as plt
- import matplotlib
- import numpy as np
- import pandas as pd
- matplotlib.use('TkAgg') # Or 'Qt5Agg' if you have Qt installed
- # Configure logging (outputs to console)
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s'
- )
- logger = logging.getLogger(__name__)
- # Top 15 coins (as before)
- coins = ['BTC/USDT', 'ETH/USDT', 'BNB/USDT', 'SOL/USDT', 'XRP/USDT', 'ADA/USDT', 'DOGE/USDT',
- 'AVAX/USDT', 'SHIB/USDT', 'DOT/USDT', 'LINK/USDT', 'TRX/USDT', 'UNI/USDT', 'LTC/USDT']
- exchange = ccxt.binance({'enableRateLimit': True})
- # UCB Class (from previous implementation)
- class UCB:
- def __init__(self, num_arms, c=2.0):
- self.num_arms = num_arms
- self.counts = np.zeros(num_arms)
- self.mean_rewards = np.zeros(num_arms)
- self.total_pulls = 0
- self.c = c
- def select_arm(self):
- ucb_scores = np.zeros(self.num_arms)
- for i in range(self.num_arms):
- if self.counts[i] == 0:
- return i
- ucb_scores[i] = self.mean_rewards[i] + self.c * math.sqrt(math.log(self.total_pulls + 1) / self.counts[i])
- return np.argmax(ucb_scores)
- def update(self, arm, reward):
- self.counts[arm] += 1
- self.total_pulls += 1
- self.mean_rewards[arm] = (self.mean_rewards[arm] * (self.counts[arm] - 1) + reward) / self.counts[arm]
- # Feature Computation Functions (from previous implementation)
- def compute_atr(df, period=14):
- high_low = df['high'] - df['low']
- high_close = np.abs(df['high'] - df['close'].shift())
- low_close = np.abs(df['low'] - df['close'].shift())
- tr = np.maximum(high_low, high_close, low_close)
- atr = tr.rolling(period).mean()
- return atr
- def compute_ema(df, short=12, long=26):
- df['ema_short'] = df['close'].ewm(span=short, adjust=False).mean()
- df['ema_long'] = df['close'].ewm(span=long, adjust=False).mean()
- df['trend'] = np.where(df['ema_short'] > df['ema_long'], 1, -1)
- def compute_rewards(df):
- df['return'] = (df['close'] - df['open']) / df['open']
- df['atr'] = compute_atr(df)
- compute_ema(df)
- df['reward'] = df['return'] * df['trend'] / df['atr'].replace(0, np.nan)
- return df.dropna()
- # Fetch historical OHLCV data for a symbol (with caching)
- def fetch_historical_ohlcv(symbol, timeframe='1h', start_date=None, end_date=None, limit=1000, refresh=False):
- # Create dat folder if it doesn't exist
- os.makedirs('dat', exist_ok=True)
-
- # Generate safe filename with program prefix
- program_prefix = 'ucb_backtest'
- symbol_safe = symbol.replace('/', '-')
- start_str = start_date.strftime('%Y%m%d') if start_date else 'none'
- end_str = end_date.strftime('%Y%m%d') if end_date else 'none'
- filename = f"dat/{program_prefix}_{symbol_safe}_{timeframe}_{start_str}_{end_str}.csv"
-
- if not refresh and os.path.exists(filename):
- try:
- df = pd.read_csv(filename, index_col='timestamp', parse_dates=True)
- logger.info(f"Loaded cached data for {symbol} from {filename}")
- return df
- except Exception as e:
- logger.warning(f"Error loading cache for {symbol}: {str(e)}; fetching fresh data")
-
- # Fetch fresh data
- try:
- since = int(start_date.timestamp() * 1000) if start_date else None
- ohlcv = []
- while True:
- data = exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit)
- if not data:
- break
- ohlcv.extend(data)
- since = data[-1][0] + 1 # Next batch starts after last timestamp
- if end_date and data[-1][0] >= int(end_date.timestamp() * 1000):
- break
- df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
- df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
- df.set_index('timestamp', inplace=True)
- if end_date:
- df = df[df.index <= end_date]
- logger.info(f"Fetched {len(df)} historical candles for {symbol}")
-
- # Save to cache
- df.to_csv(filename)
- logger.info(f"Saved data for {symbol} to {filename}")
- return df
- except Exception as e:
- logger.error(f"Error fetching historical data for {symbol}: {str(e)}")
- return pd.DataFrame()
- # Backtest function (added refresh param)
- def backtest_ucb(start_date, end_date, initial_capital=10000.0, refresh=False):
- # Fetch historical data for all coins (with caching)
- historical_data = {}
- for coin in coins:
- df = fetch_historical_ohlcv(coin, start_date=start_date, end_date=end_date, refresh=refresh)
- if not df.empty:
- historical_data[coin] = compute_rewards(df)
-
- if not historical_data:
- logger.error("No historical data available; aborting backtest")
- return None
-
- # Find common timestamps across all coins (align data)
- all_timestamps = sorted(set.intersection(*(set(df.index) for df in historical_data.values())))
- logger.info(f"Backtesting over {len(all_timestamps)} aligned periods")
-
- # Initialize UCB and portfolio tracking
- ucb = UCB(len(coins))
- portfolio_values = [initial_capital]
- selected_coins = []
- period_returns = []
-
- current_capital = initial_capital
- for i in range(1, len(all_timestamps)): # Start from second period to have prior data
- current_time = all_timestamps[i]
- prev_time = all_timestamps[i-1]
-
- # Get data slices up to current time for reward computation
- current_data = {}
- for coin in coins:
- if coin in historical_data:
- df_slice = historical_data[coin].loc[:current_time]
- if not df_slice.empty:
- current_data[coin] = df_slice
-
- if not current_data:
- continue
-
- # Select arm (coin)
- arm = ucb.select_arm()
- coin = coins[arm]
-
- if coin not in current_data or current_data[coin].empty:
- logger.warning(f"No data for selected coin {coin} at {current_time}; skipping")
- continue
-
- # Simulate trade: Get return from prev to current close
- prev_close = historical_data[coin].loc[prev_time, 'close'] if prev_time in historical_data[coin].index else None
- current_close = historical_data[coin].loc[current_time, 'close']
- if prev_close is None:
- continue
- period_return = (current_close - prev_close) / prev_close
- reward = current_data[coin].loc[current_time, 'reward'] # Use computed reward for UCB update
- # Calculate quantity (simulated: full allocation)
- quantity = current_capital / prev_close if prev_close != 0 else 0
- # Log trade details
- logger.info(f"Trade Executed: Code={coin}, Entry Time={prev_time}, Entry Price={prev_close:.4f}, Quantity={quantity:.4f}")
- logger.info(f"Trade Closed: Exit Time={current_time}, Exit Price={current_close:.4f}, Realized Return={period_return:.4f}, Reward={reward:.4f}")
-
- # Update portfolio
- current_capital *= (1 + period_return)
- portfolio_values.append(current_capital)
- period_returns.append(period_return)
- selected_coins.append(coin)
-
- # Update UCB with realized reward
- ucb.update(arm, reward)
- logger.debug(f"Period {current_time}: Selected {coin}, Return: {period_return:.4f}, Reward: {reward:.4f}, Capital: {current_capital:.2f}")
-
- # Compute performance metrics
- if not period_returns:
- logger.error("No trades executed; aborting metrics")
- return None
-
- total_return = (current_capital - initial_capital) / initial_capital
- num_periods = len(period_returns)
- days = (end_date - start_date).days
- annualized_return = (1 + total_return) ** (365 / days) - 1 if days > 0 else 0
- sharpe_ratio = np.mean(period_returns) / np.std(period_returns) * np.sqrt(8760) if np.std(period_returns) != 0 else 0 # Annualized, 8760 hours/year
- max_drawdown = np.min(np.cumprod(1 + np.array(period_returns)) / np.maximum.accumulate(np.cumprod(1 + np.array(period_returns)))) - 1
-
- results = {
- 'total_return': total_return,
- 'annualized_return': annualized_return,
- 'sharpe_ratio': sharpe_ratio,
- 'max_drawdown': max_drawdown,
- 'final_capital': current_capital,
- 'num_trades': num_periods
- }
-
- logger.info(f"Backtest Results: {results}")
-
- # Plot equity curve
- plt.figure(figsize=(10, 6))
- plt.plot(all_timestamps[:len(portfolio_values)], portfolio_values, label='Portfolio Value')
- plt.title('UCB Strategy Equity Curve')
- plt.xlabel('Date')
- plt.ylabel('Portfolio Value')
- plt.legend()
- plt.grid(True)
- plt.show()
-
- return results
- # Entry point
- def main():
- # Backtest parameters (adjust as needed)
- end_date = datetime.now()
- start_date = end_date - timedelta(days=60) # Last 1 year
- initial_capital = 10000.0
- refresh = True # Set to True to force fresh data fetch
-
- logger.info(f"Starting backtest from {start_date} to {end_date} (refresh={refresh})")
- backtest_ucb(start_date, end_date, initial_capital, refresh)
- if __name__ == "__main__":
- main()
|