import ccxt import pandas as pd import numpy as np from datetime import datetime, timedelta import math from scipy.stats import beta import backtrader as bt import os # Top 15 coins (symbols against USDT) coins = ['BTC/USDT', 'ETH/USDT', 'BNB/USDT', 'SOL/USDT', 'XRP/USDT', 'ADA/USDT', 'DOGE/USDT', 'AVAX/USDT', 'SHIB/USDT', 'DOT/USDT', 'LINK/USDT', 'TRX/USDT', 'UNI/USDT', 'LTC/USDT'] exchange = ccxt.binance({'enableRateLimit': True}) def fetch_ohlcv(symbol, timeframe='1h', days=90): # Create a filename based on symbol and timeframe safe_symbol = symbol.replace('/', '_') filename = f"ohlcv_{safe_symbol}_{timeframe}_{days}d.csv" if os.path.exists(filename): df = pd.read_csv(filename, parse_dates=['timestamp'], index_col='timestamp') return df since = int((datetime.now() - timedelta(days=days)).timestamp() * 1000) ohlcv = exchange.fetch_ohlcv(symbol, timeframe, since=since) df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') df.set_index('timestamp', inplace=True) df.to_csv(filename) return df def compute_atr(df, period=14): high_low = df['high'] - df['low'] high_close = np.abs(df['high'] - df['close'].shift()) low_close = np.abs(df['low'] - df['close'].shift()) tr = np.maximum(high_low, high_close, low_close) atr = tr.rolling(period).mean() return atr def compute_ema(df, short=12, long=26): df['ema_short'] = df['close'].ewm(span=short, adjust=False).mean() df['ema_long'] = df['close'].ewm(span=long, adjust=False).mean() df['trend'] = np.where(df['ema_short'] > df['ema_long'], 1, -1) # 1 = uptrend def compute_rewards(df): df['return'] = (df['close'] - df['open']) / df['open'] df['atr'] = compute_atr(df) compute_ema(df) df['reward'] = df['return'] * df['trend'] / df['atr'].replace(0, np.nan) # Adjust by vol and trend if len(df) > 0: print('%d %f' % (len(df), df['reward'].iloc[len(df)-1])) else: print('DataFrame is empty, no rewards computed.') return df.dropna() class ThompsonSampling: def __init__(self, num_arms): self.alpha = np.ones(num_arms) # Successes +1 self.beta = np.ones(num_arms) # Failures +1 def select_arm(self): samples = [beta.rvs(a, b) for a, b in zip(self.alpha, self.beta)] return np.argmax(samples) def update(self, arm, reward): # Assume reward >0 is success self.alpha[arm] += 1 if reward > 0 else 0 self.beta[arm] += 0 if reward > 0 else 1 class UCB: def __init__(self, num_arms, c=2.0): # c is exploration constant self.num_arms = num_arms self.counts = np.zeros(num_arms) # Trades per coin self.mean_rewards = np.zeros(num_arms) self.total_pulls = 0 self.c = c def select_arm(self): ucb_scores = np.zeros(self.num_arms) for i in range(self.num_arms): if self.counts[i] == 0: return i # Explore unpulled arms first ucb_scores[i] = self.mean_rewards[i] + self.c * math.sqrt(math.log(self.total_pulls) / self.counts[i]) return np.argmax(ucb_scores) def update(self, arm, reward): self.counts[arm] += 1 self.total_pulls += 1 self.mean_rewards[arm] = (self.mean_rewards[arm] * (self.counts[arm] - 1) + reward) / self.counts[arm] class BanditStrategy(bt.Strategy): def __init__(self): self.ucb = UCB(len(coins)) # One arm per coin self.coin_map = {i: coin for i, coin in enumerate(coins)} # Map arm index to coin symbol # self.datas[0] is first coin, etc. def next(self): # Get current rewards for all coins (from custom 'reward' column) current_rewards = [self.datas[i].reward[0] for i in range(len(self.datas))] # [0] is current bar # Select arm (coin) using UCB arm = self.ucb.select_arm() # Get the data for the selected coin selected_data = self.datas[arm] # ucb_score = self.ucb.mean_rewards[arm] + self.ucb.c * math.sqrt(math.log(self.ucb.total_pulls ) / (self.ucb.counts[arm] + 1e-6)) if self.ucb.total_pulls > 0: ucb_score = self.ucb.mean_rewards[arm] + self.ucb.c * math.sqrt( math.log(self.ucb.total_pulls) / (self.ucb.counts[arm] + 1e-6) ) else: ucb_score = self.ucb.mean_rewards[arm] # Signal logic: Enter if score > threshold (e.g., 0.05) threshold = 0.05 if ucb_score > threshold and not self.position: # Not already in position self.buy(data=selected_data, size=1) # Buy 1 unit (adjust for portfolio size) print(f"Entering trade on {self.coin_map[arm]} at {selected_data.close[0]}") # Exit logic: If in position, check for vol-adjusted profit target (e.g., 5-10%) if self.position: entry_price = self.position.price current_price = selected_data.close[0] atr = selected_data.atr[0] # Use custom ATR column profit_target = 0.05 + 0.05 * atr # Vol-adjusted (e.g., higher for volatile coins) if (current_price - entry_price) / entry_price > profit_target: self.sell(data=selected_data, size=self.position.size) print(f"Exiting trade on {self.coin_map[arm]} at {current_price}") # Update UCB with the realized reward for the selected arm realized_reward = current_rewards[arm] # Or calculate from trade if executed self.ucb.update(arm, realized_reward) class PandasCustom(bt.feeds.PandasData): lines = ('reward', 'atr',) # Add more if needed params = ( ('reward', -1), # -1 means auto-detect column ('atr', -1), ) def main(): # Fetch data for all coins data = {coin: fetch_ohlcv(coin, '1h', 30) for coin in coins} # Apply to all data for coin in data: print(f"Computing rewards for {coin}...") data[coin] = compute_rewards(data[coin]) # Usage example (simulate over time steps) ucb = UCB(len(coins)) # for t in range(len(data[coins[0]])): # Assume aligned timestamps # arm = ucb.select_arm() # coin = coins[arm] # reward = data[coin].iloc[t]['reward'] # Or simulate trade reward # ucb.update(arm, reward) # min_len = min(len(df) for df in data.values()) # Ensure no out-of-bounds # print (min_len) # for t in range(min_len): # arm = ucb.select_arm() # coin = coins[arm] # reward = data[coin].iloc[t]['reward'] # ucb.update(arm, reward) max_len = max(len(df) for df in data.values()) # Use the max length # for t in range(max_len): # arm = ucb.select_arm() # coin = coins[arm] # df = data[coin] # if t < len(df): # reward = df.iloc[t]['reward'] # ucb.update(arm, reward) # # else: skip this step for this coin for t in range(max_len): arm = ucb.select_arm() coin = coins[arm] df = data[coin] # Check if DataFrame is non-empty and has 'reward' column if len(df) > 0 and 'reward' in df.columns and t < len(df): reward = df.iloc[t]['reward'] ucb.update(arm, reward) # else: skip this step for this coin # Setup Cerebro and add data feeds cerebro = bt.Cerebro() cerebro.addstrategy(BanditStrategy) cerebro.broker.setcash(100000.0) # Starting capital cerebro.broker.setcommission(0.001) # 0.1% fees # Add each coin's data as a PandasData feed for i, (coin, df) in enumerate(data.items()): # Ensure DF is sorted and has no duplicates df = df.sort_index().drop_duplicates() #data_feed = bt.feeds.PandasData( data_feed = PandasCustom( dataname=df, datetime=None, # Uses index open='open', high='high', low='low', close='close', volume='volume', # Add custom lines for features (accessible as self.data.reward, etc.) #reward=-1, # -1 means auto-detect column #atr=-1, # Add more if needed (e.g., ema_short=-1) ) cerebro.adddata(data_feed, name=coin) # Name for reference # Resample if needed (e.g., to daily), but hourly is fine # cerebro.resampledata(data_feed, timeframe=bt.TimeFrame.Days) # Optional # Run backtest print('Starting Portfolio Value: %.2f' % cerebro.broker.getvalue()) cerebro.run() print('Final Portfolio Value: %.2f' % cerebro.broker.getvalue()) # Optional: Plot results # cerebro.plot() if __name__ == "__main__": main()