| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379 |
- import logging
- import math
- import os
- from datetime import datetime, timedelta
- import backtrader as bt
- import ccxt
- import numpy as np
- import pandas as pd
- import matplotlib.pyplot as plt # For equity curve plot
- # Configure logging (outputs to console)
- # logging.basicConfig(
- # level=logging.INFO,
- # format='%(asctime)s - %(levelname)s - %(message)s'
- # )
- # logger = logging.getLogger(__name__)
- # Configure logging to both console and file
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s',
- handlers=[
- logging.StreamHandler(), # Console output
- logging.FileHandler('ucb_backtest.log', mode='w') # Log file (overwrite each run)
- ]
- )
- logger = logging.getLogger(__name__)
- # Top 15 coins (as before)
- coins = ['BTC/USDT', 'ETH/USDT', 'BNB/USDT', 'SOL/USDT', 'XRP/USDT', 'ADA/USDT', 'DOGE/USDT',
- 'AVAX/USDT', 'SHIB/USDT', 'DOT/USDT', 'LINK/USDT', 'TRX/USDT', 'UNI/USDT', 'LTC/USDT']
- exchange = ccxt.binance({'enableRateLimit': True})
- # UCB Class (simplified for backtest; updates with rewards each bar)
- class UCB:
- def __init__(self, num_arms, c=2.0):
- self.num_arms = num_arms
- self.counts = np.zeros(num_arms)
- self.mean_rewards = np.zeros(num_arms)
- self.total_pulls = 0
- self.c = c
- def compute_scores(self):
- ucb_scores = np.zeros(self.num_arms)
- for i in range(self.num_arms):
- if self.counts[i] == 0:
- ucb_scores[i] = float('inf') # Encourage exploration
- else:
- ucb_scores[i] = self.mean_rewards[i] + self.c * math.sqrt(math.log(self.total_pulls + 1) / self.counts[i])
- return ucb_scores
- def update(self, arm, reward):
- self.counts[arm] += 1
- self.total_pulls += 1
- self.mean_rewards[arm] = (self.mean_rewards[arm] * (self.counts[arm] - 1) + reward) / self.counts[arm]
- # Feature Computation Functions (from previous; applied per coin's data)
- def compute_atr(df, period=14):
- high_low = df['high'] - df['low']
- high_close = np.abs(df['high'] - df['close'].shift())
- low_close = np.abs(df['low'] - df['close'].shift())
- tr = np.maximum(high_low, high_close, low_close)
- atr = tr.rolling(period).mean()
- return atr
- def compute_ema(df, short=12, long=26):
- df['ema_short'] = df['close'].ewm(span=short, adjust=False).mean()
- df['ema_long'] = df['close'].ewm(span=long, adjust=False).mean()
- df['trend'] = np.where(df['ema_short'] > df['ema_long'], 1, -1)
- def compute_reward(df):
- df['return'] = (df['close'] - df['open']) / df['open']
- df['atr'] = compute_atr(df)
- compute_ema(df)
- reward = df['return'] * df['trend'] / df['atr'].replace(0, np.nan)
- return reward.iloc[-1] if not reward.empty else 0 # Latest reward
- # Fetch historical OHLCV data for a symbol (with caching)
- def fetch_historical_ohlcv(symbol, timeframe='1h', start_date=None, end_date=None, limit=1000, refresh=False):
- os.makedirs('dat', exist_ok=True)
- program_prefix = 'ucb_backtest'
- symbol_safe = symbol.replace('/', '-')
- start_str = start_date.strftime('%Y%m%d') if start_date else 'none'
- end_str = end_date.strftime('%Y%m%d') if end_date else 'none'
- filename = f"dat/{program_prefix}_{symbol_safe}_{timeframe}_{start_str}_{end_str}.csv"
-
- if not refresh and os.path.exists(filename):
- try:
- df = pd.read_csv(filename, index_col='timestamp', parse_dates=True)
- logger.info(f"Loaded cached data for {symbol} from {filename}")
- return df
- except Exception as e:
- logger.warning(f"Error loading cache for {symbol}: {str(e)}; fetching fresh data")
-
- try:
- since = int(start_date.timestamp() * 1000) if start_date else None
- ohlcv = []
- while True:
- data = exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit)
- if not data:
- break
- ohlcv.extend(data)
- since = data[-1][0] + 1
- if end_date and data[-1][0] >= int(end_date.timestamp() * 1000):
- break
- df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
- df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
- df.set_index('timestamp', inplace=True)
- if end_date:
- df = df[df.index <= end_date]
- logger.info(f"Fetched {len(df)} historical candles for {symbol}")
- df.to_csv(filename)
- logger.info(f"Saved data for {symbol} to {filename}")
- return df
- except Exception as e:
- logger.error(f"Error fetching historical data for {symbol}: {str(e)}")
- return pd.DataFrame()
- # Custom Backtrader Strategy with UCB
- class UCBStrategy(bt.Strategy):
- params = (
- ('position_size_pct', 0.10), # Fixed 10% per position
- ('top_n', 10), # Select top-3 coins
- ('min_hold_bars', 4), # Short-term: Min 4 hours
- ('max_hold_bars', 36), # Mid-term: Max 24 hours
- ('stop_loss_pct', 0.05), # 5% stop-loss
- ('take_profit_pct', 0.10), # 10% take-profit
- ('ucb_c', 2.0), # UCB exploration param
- )
- # hold duration min 6: max 24: final port 36.9%
- def __init__(self):
- self.ucb = UCB(len(coins), self.p.ucb_c)
- self.position_entry_bars = {} # Track bars since entry per position
- self.position_entry_prices = {} # Track entry price per position
- self.holdings_history = [] # To store holdings by date
- self.portfolio_value_history = [] # To store portfolio value by date
- self.pnl_per_coin = {coin: 0.0 for coin in coins} # <-- Add this line
- self.coin_value_history = [] # List of dicts: {'datetime': ..., 'BTC/USDT': ..., ...}
- def next(self):
- # Step 1: Update UCB with latest rewards for all coins
- rewards = []
- for i, data in enumerate(self.datas):
- df = pd.DataFrame({
- 'open': [data.open[0]],
- 'high': [data.high[0]],
- 'low': [data.low[0]],
- 'close': [data.close[0]],
- }, index=[data.datetime.datetime()])
- reward = compute_reward(df)
- rewards.append(reward)
- self.ucb.update(i, reward) # Update UCB for every coin every bar
- # Step 2: Check exits for open positions
- for data in self.datas:
- coin = data._name
- pos = self.getposition(data).size
- if pos != 0:
- bars_held = self.position_entry_bars.get(coin, 0) + 1
- self.position_entry_bars[coin] = bars_held
- entry_price = self.position_entry_prices[coin]
- current_price = data.close[0]
- pnl_pct = (current_price - entry_price) / entry_price
- # Exit conditions
- if bars_held < self.p.min_hold_bars:
- continue # Enforce min hold
- if bars_held >= self.p.max_hold_bars or pnl_pct <= -self.p.stop_loss_pct or pnl_pct >= self.p.take_profit_pct:
- self.pnl_per_coin[coin] += (current_price - entry_price) * abs(pos)
- self.close(data)
- logger.info(
- f"Trade Closed: Code={coin}, Exit Time={data.datetime.datetime()}, "
- f"Exit Price={current_price:.4f}, PnL %={pnl_pct:.4f}, "
- f"Quantity Long={pos:.4f}, Quantity Closed={pos:.4f}"
- )
- del self.position_entry_bars[coin]
- del self.position_entry_prices[coin]
- # Step 3: Select top-N coins via UCB scores
- scores = self.ucb.compute_scores()
- top_indices = np.argsort(scores)[-self.p.top_n:]
- top_coins = [coins[i] for i in top_indices]
- # Step 4: Enter new positions if possible
- portfolio_value = self.broker.getvalue()
- cash = self.broker.getcash()
- for coin in top_coins:
- data = self.getdatabyname(coin)
- if self.getposition(data).size == 0 and cash >= portfolio_value * self.p.position_size_pct:
- price = data.close[0]
- size = (portfolio_value * self.p.position_size_pct) / price
- self.buy(data=data, size=size)
- self.position_entry_bars[coin] = 0
- self.position_entry_prices[coin] = price
- logger.info(
- f"Trade Executed: Code={coin}, Entry Time={data.datetime.datetime()}, "
- f"Entry Price={price:.4f}, Quantity Long={size:.4f}, Quantity Closed=0.0000"
- )
- # --- Record holdings and portfolio value ---
- current_datetime = self.datas[0].datetime.datetime()
- holdings = {data._name: self.getposition(data).size for data in self.datas}
- portfolio_value = self.broker.getvalue()
- self.holdings_history.append({'datetime': current_datetime, **holdings})
- self.portfolio_value_history.append({'datetime': current_datetime, 'portfolio_value': portfolio_value})
- coin_values = {'datetime': current_datetime}
- for data in self.datas:
- coin = data._name
- size = self.getposition(data).size
- price = data.close[0]
- coin_values[coin] = size * price # Market value of position
- self.coin_value_history.append(coin_values)
- def notify_order(self, order):
- if order.status in [order.Completed]:
- pass # Can add more logging if needed
- # Entry point
- def main():
- exchange_params = {
- 'binance': {
- 'commission': 0.001,
- 'slippage': 0.0,
- 'timeframe': '1h',
- },
- 'coinbase': {
- 'commission': 0.0015,
- 'slippage': 0.0,
- 'timeframe': '1h',
- },
- 'kraken': {
- 'commission': 0.0026,
- 'slippage': 0.0,
- 'timeframe': '1h',
- },
- # Add more exchanges as needed
- }
- # Use parameters in your setup
- selected_exchange = 'binance' # Set this as needed
- commission = exchange_params[selected_exchange]['commission']
- slippage = exchange_params[selected_exchange]['slippage']
- timeframe = exchange_params[selected_exchange]['timeframe']
- # Backtest parameters
- end_date = datetime.now()
- start_date = end_date - timedelta(days=15) # Last 1 year
- initial_capital = 10000.0
- refresh = False # Set to True to force fresh data fetch
-
- logger.info(f"Starting backtest from {start_date} to {end_date} (refresh={refresh})")
-
- # Fetch/load data
- data_feeds = {}
- for coin in coins:
- df = fetch_historical_ohlcv(coin, start_date=start_date, end_date=end_date, refresh=refresh)
- if not df.empty:
- data_feeds[coin] = bt.feeds.PandasData(dataname=df, name=coin)
-
- if not data_feeds:
- logger.error("No data available; aborting")
- return
-
- # Set up Backtrader
- cerebro = bt.Cerebro()
- for coin, feed in data_feeds.items():
- cerebro.adddata(feed)
-
-
- cerebro.addstrategy(
- UCBStrategy,
- position_size_pct=0.15,
- top_n=5,
- min_hold_bars=6,
- max_hold_bars=24,
- stop_loss_pct=0.03,
- take_profit_pct=0.08,
- ucb_c=1.5
- )
- cerebro.broker.setcash(initial_capital)
- cerebro.broker.setcommission(commission=0.001) # 0.1%
- cerebro.broker.setcommission(commission=commission)
- # Optionally, set slippage
- cerebro.broker.set_slippage_perc(slippage)
- # Add analyzers
- cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')
- cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
- cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')
- cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
-
- # Run backtest
- results = cerebro.run()
- strat = results[0]
-
- # Convert to DataFrame
- holdings_df = pd.DataFrame(strat.holdings_history)
- portfolio_df = pd.DataFrame(strat.portfolio_value_history)
- coin_value_df = pd.DataFrame(strat.coin_value_history)
- # Save to CSV
- holdings_df.to_csv('holdings_by_date.csv', index=False)
- portfolio_df.to_csv('portfolio_value_by_date.csv', index=False)
- # Print metrics
- try:
- logger.info(f"Final Portfolio Value: {cerebro.broker.getvalue():.2f}")
- except Exception as e:
- logger.error(f"Error logging Final Portfolio Value: {e}")
- try:
- logger.info(f"Sharpe Ratio: {strat.analyzers.sharpe.get_analysis().get('sharperatio', 0):.2f}")
- except Exception as e:
- logger.error(f"Error logging Sharpe Ratio: {e}")
- try:
- logger.info(f"Max Drawdown: {strat.analyzers.drawdown.get_analysis().max.drawdown:.2f}%")
- except Exception as e:
- logger.error(f"Error logging Max Drawdown: {e}")
- try:
- logger.info(f"Total Return: {strat.analyzers.returns.get_analysis().rtot:.4f}")
- except Exception as e:
- logger.error(f"Error logging Total Return: {e}")
- try:
- logger.info(f"Number of Trades: {strat.analyzers.trades.get_analysis().total.total}")
- except Exception as e:
- logger.error(f"Error logging Number of Trades: {e}")
-
- # Plot trades (entries/exits) on candlestick charts for each coin
- cerebro.plot(style='candle', iplot=False, numfigs=1) # Generates one figure with subplots per coin
-
- # Plot net P/L per coin as a bar chart
- pnl_data = strat.pnl_per_coin
- fig, ax = plt.subplots(figsize=(12, 6))
- ax.bar(pnl_data.keys(), pnl_data.values(), color=['green' if v > 0 else 'red' for v in pnl_data.values()])
- ax.set_title('Net P/L per Coin at End of Backtest')
- ax.set_xlabel('Coin')
- ax.set_ylabel('Net P/L ($)')
- ax.grid(True)
- plt.xticks(rotation=45, ha='right')
- plt.tight_layout()
- plt.show()
- plt.figure(figsize=(14, 7))
- plt.plot(coin_value_df['datetime'], coin_value_df.drop('datetime', axis=1).sum(axis=1), label='Total Portfolio Value', color='black', linewidth=2)
- for coin in coins:
- if coin in coin_value_df.columns:
- plt.plot(coin_value_df['datetime'], coin_value_df[coin], label=coin, alpha=0.6, linewidth=1)
- plt.title('Portfolio Value Over Time (with Individual Coin Holdings)')
- plt.xlabel('Date')
- plt.ylabel('Value ($)')
- plt.legend(loc='upper left', fontsize='small', ncol=2)
- plt.grid(True)
- plt.tight_layout()
- plt.show()
-
- if __name__ == "__main__":
- main()
|