| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- import ccxt
- import pandas as pd
- import numpy as np
- from datetime import datetime, timedelta
- import math
- from scipy.stats import beta
- import backtrader as bt
- import os
- # Top 15 coins (symbols against USDT)
- coins = ['BTC/USDT', 'ETH/USDT', 'BNB/USDT', 'SOL/USDT', 'XRP/USDT', 'ADA/USDT', 'DOGE/USDT',
- 'AVAX/USDT', 'SHIB/USDT', 'DOT/USDT', 'LINK/USDT', 'TRX/USDT', 'UNI/USDT', 'LTC/USDT']
- exchange = ccxt.binance({'enableRateLimit': True})
- def fetch_ohlcv(symbol, timeframe='1h', days=90):
- # Create a filename based on symbol and timeframe
- safe_symbol = symbol.replace('/', '_')
- filename = f"ohlcv_{safe_symbol}_{timeframe}_{days}d.csv"
- if os.path.exists(filename):
- df = pd.read_csv(filename, parse_dates=['timestamp'], index_col='timestamp')
- return df
- since = int((datetime.now() - timedelta(days=days)).timestamp() * 1000)
- ohlcv = exchange.fetch_ohlcv(symbol, timeframe, since=since)
- df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
- df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
- df.set_index('timestamp', inplace=True)
- df.to_csv(filename)
- return df
- def compute_atr(df, period=14):
- high_low = df['high'] - df['low']
- high_close = np.abs(df['high'] - df['close'].shift())
- low_close = np.abs(df['low'] - df['close'].shift())
- tr = np.maximum(high_low, high_close, low_close)
- atr = tr.rolling(period).mean()
- return atr
- def compute_ema(df, short=12, long=26):
- df['ema_short'] = df['close'].ewm(span=short, adjust=False).mean()
- df['ema_long'] = df['close'].ewm(span=long, adjust=False).mean()
- df['trend'] = np.where(df['ema_short'] > df['ema_long'], 1, -1) # 1 = uptrend
- def compute_rewards(df):
- df['return'] = (df['close'] - df['open']) / df['open']
- df['atr'] = compute_atr(df)
- compute_ema(df)
- df['reward'] = df['return'] * df['trend'] / df['atr'].replace(0, np.nan) # Adjust by vol and trend
- if len(df) > 0:
- print('%d %f' % (len(df), df['reward'].iloc[len(df)-1]))
- else:
- print('DataFrame is empty, no rewards computed.')
- return df.dropna()
- class ThompsonSampling:
- def __init__(self, num_arms):
- self.alpha = np.ones(num_arms) # Successes +1
- self.beta = np.ones(num_arms) # Failures +1
- def select_arm(self):
- samples = [beta.rvs(a, b) for a, b in zip(self.alpha, self.beta)]
- return np.argmax(samples)
- def update(self, arm, reward): # Assume reward >0 is success
- self.alpha[arm] += 1 if reward > 0 else 0
- self.beta[arm] += 0 if reward > 0 else 1
- class UCB:
- def __init__(self, num_arms, c=2.0): # c is exploration constant
- self.num_arms = num_arms
- self.counts = np.zeros(num_arms) # Trades per coin
- self.mean_rewards = np.zeros(num_arms)
- self.total_pulls = 0
- self.c = c
- def select_arm(self):
- ucb_scores = np.zeros(self.num_arms)
- for i in range(self.num_arms):
- if self.counts[i] == 0:
- return i # Explore unpulled arms first
- ucb_scores[i] = self.mean_rewards[i] + self.c * math.sqrt(math.log(self.total_pulls) / self.counts[i])
- return np.argmax(ucb_scores)
- def update(self, arm, reward):
- self.counts[arm] += 1
- self.total_pulls += 1
- self.mean_rewards[arm] = (self.mean_rewards[arm] * (self.counts[arm] - 1) + reward) / self.counts[arm]
- class BanditStrategy(bt.Strategy):
- def __init__(self):
- self.ucb = UCB(len(coins)) # One arm per coin
- self.coin_map = {i: coin for i, coin in enumerate(coins)} # Map arm index to coin symbol
- # self.datas[0] is first coin, etc.
- def next(self):
- # Get current rewards for all coins (from custom 'reward' column)
- current_rewards = [self.datas[i].reward[0] for i in range(len(self.datas))] # [0] is current bar
-
- # Select arm (coin) using UCB
- arm = self.ucb.select_arm()
-
- # Get the data for the selected coin
- selected_data = self.datas[arm]
- # ucb_score = self.ucb.mean_rewards[arm] + self.ucb.c * math.sqrt(math.log(self.ucb.total_pulls ) / (self.ucb.counts[arm] + 1e-6))
- if self.ucb.total_pulls > 0:
- ucb_score = self.ucb.mean_rewards[arm] + self.ucb.c * math.sqrt(
- math.log(self.ucb.total_pulls) / (self.ucb.counts[arm] + 1e-6)
- )
- else:
- ucb_score = self.ucb.mean_rewards[arm]
- # Signal logic: Enter if score > threshold (e.g., 0.05)
- threshold = 0.05
- if ucb_score > threshold and not self.position: # Not already in position
- self.buy(data=selected_data, size=1) # Buy 1 unit (adjust for portfolio size)
- print(f"Entering trade on {self.coin_map[arm]} at {selected_data.close[0]}")
-
- # Exit logic: If in position, check for vol-adjusted profit target (e.g., 5-10%)
- if self.position:
- entry_price = self.position.price
- current_price = selected_data.close[0]
- atr = selected_data.atr[0] # Use custom ATR column
- profit_target = 0.05 + 0.05 * atr # Vol-adjusted (e.g., higher for volatile coins)
- if (current_price - entry_price) / entry_price > profit_target:
- self.sell(data=selected_data, size=self.position.size)
- print(f"Exiting trade on {self.coin_map[arm]} at {current_price}")
-
- # Update UCB with the realized reward for the selected arm
- realized_reward = current_rewards[arm] # Or calculate from trade if executed
- self.ucb.update(arm, realized_reward)
- class PandasCustom(bt.feeds.PandasData):
- lines = ('reward', 'atr',) # Add more if needed
- params = (
- ('reward', -1), # -1 means auto-detect column
- ('atr', -1),
- )
- def main():
- # Fetch data for all coins
-
- data = {coin: fetch_ohlcv(coin, '1h', 30) for coin in coins}
- # Apply to all data
- for coin in data:
- print(f"Computing rewards for {coin}...")
- data[coin] = compute_rewards(data[coin])
- # Usage example (simulate over time steps)
- ucb = UCB(len(coins))
- # for t in range(len(data[coins[0]])): # Assume aligned timestamps
- # arm = ucb.select_arm()
- # coin = coins[arm]
- # reward = data[coin].iloc[t]['reward'] # Or simulate trade reward
- # ucb.update(arm, reward)
- # min_len = min(len(df) for df in data.values()) # Ensure no out-of-bounds
- # print (min_len)
- # for t in range(min_len):
- # arm = ucb.select_arm()
- # coin = coins[arm]
- # reward = data[coin].iloc[t]['reward']
- # ucb.update(arm, reward)
- max_len = max(len(df) for df in data.values()) # Use the max length
- # for t in range(max_len):
- # arm = ucb.select_arm()
- # coin = coins[arm]
- # df = data[coin]
- # if t < len(df):
- # reward = df.iloc[t]['reward']
- # ucb.update(arm, reward)
- # # else: skip this step for this coin
- for t in range(max_len):
- arm = ucb.select_arm()
- coin = coins[arm]
- df = data[coin]
- # Check if DataFrame is non-empty and has 'reward' column
- if len(df) > 0 and 'reward' in df.columns and t < len(df):
- reward = df.iloc[t]['reward']
- ucb.update(arm, reward)
- # else: skip this step for this coin
- # Setup Cerebro and add data feeds
- cerebro = bt.Cerebro()
- cerebro.addstrategy(BanditStrategy)
- cerebro.broker.setcash(100000.0) # Starting capital
- cerebro.broker.setcommission(0.001) # 0.1% fees
- # Add each coin's data as a PandasData feed
- for i, (coin, df) in enumerate(data.items()):
- # Ensure DF is sorted and has no duplicates
- df = df.sort_index().drop_duplicates()
- #data_feed = bt.feeds.PandasData(
- data_feed = PandasCustom(
- dataname=df,
- datetime=None, # Uses index
- open='open',
- high='high',
- low='low',
- close='close',
- volume='volume',
- # Add custom lines for features (accessible as self.data.reward, etc.)
- #reward=-1, # -1 means auto-detect column
- #atr=-1,
- # Add more if needed (e.g., ema_short=-1)
- )
- cerebro.adddata(data_feed, name=coin) # Name for reference
- # Resample if needed (e.g., to daily), but hourly is fine
- # cerebro.resampledata(data_feed, timeframe=bt.TimeFrame.Days) # Optional
- # Run backtest
- print('Starting Portfolio Value: %.2f' % cerebro.broker.getvalue())
- cerebro.run()
- print('Final Portfolio Value: %.2f' % cerebro.broker.getvalue())
- # Optional: Plot results
- # cerebro.plot()
- if __name__ == "__main__":
- main()
|