import time import logging import pandas as pd from binance.client import Client from binance.enums import * from binance import ThreadedWebsocketManager from twisted.internet import reactor from abc import ABC, abstractmethod # For abstract base class # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Signal Actions (extensible enum-like) ACTION_BUY = 'buy' ACTION_SELL_TP = 'sell_tp' # Take profit ACTION_SELL_SL = 'sell_sl' # Stop loss ACTION_CANCEL = 'cancel' # Cancel existing order class BaseStrategy(ABC): """Abstract base class for strategies. Inherit and implement generate_signal().""" def __init__(self, params=None): self.params = params or {} # Strategy-specific params (e.g., {'sma_periods': 50}) @abstractmethod def generate_signal(self, symbol, data, current_position): """ Generate a signal based on data. - data: pd.DataFrame with OHLCV. - current_position: dict {'amount': float, 'entry_price': float}. Returns: dict like {'action': 'buy', 'amount': float, 'limit_price': float} or None. """ pass def pre_process_data(self, data): """Optional hook: Pre-process data before signal generation.""" return data def post_signal(self, signal): """Optional hook: Post-process signal (e.g., adjust for risk).""" return signal class SMAStrategy(BaseStrategy): """Example strategy: Buy if price > SMA, sell on TP/SL.""" def generate_signal(self, symbol, data, current_position): data = self.pre_process_data(data) periods = self.params.get('sma_periods', 50) buy_discount_pct = self.params.get('buy_discount_pct', 0.005) take_profit_pct = self.params.get('take_profit_pct', 0.05) stop_loss_pct = self.params.get('stop_loss_pct', -0.03) df = pd.DataFrame(data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_av', 'trades', 'tb_base_av', 'tb_quote_av', 'ignore']) df['close'] = df['close'].astype(float) sma = df['close'].rolling(window=periods).mean().iloc[-1] current_price = df['close'].iloc[-1] if current_position['amount'] > 0: # Check exit pnl_pct = (current_price - current_position['entry_price']) / current_position['entry_price'] if pnl_pct >= take_profit_pct: limit_price = current_price * (1 + self.params.get('sell_premium_pct', 0.005)) return self.post_signal({'action': ACTION_SELL_TP, 'amount': current_position['amount'], 'limit_price': limit_price}) elif pnl_pct <= stop_loss_pct: limit_price = current_price * (1 - self.params.get('sell_premium_pct', 0.005)) return self.post_signal({'action': ACTION_SELL_SL, 'amount': current_position['amount'], 'limit_price': limit_price, 'stop_price': limit_price}) else: # Check entry if current_price > sma: amount = self.params.get('position_size_usd', 100) / current_price limit_price = current_price * (1 - buy_discount_pct) return self.post_signal({'action': ACTION_BUY, 'amount': amount, 'limit_price': limit_price}) return None class TradingBot: def __init__(self, api_key, api_secret, strategy, config=None): self.client = Client(api_key, api_secret, testnet=False) # Set testnet=True for testing self.bm = ThreadedWebsocketManager(api_key, api_secret, testnet=False) self.strategy = strategy self.config = config or { 'symbols': ['BTCUSDT'], 'timeframe': '1h', 'poll_interval': 30, 'loop_interval': 60, 'order_timeout': 300 # Seconds before canceling unfilled orders } self.positions = {sym: {'amount': 0, 'entry_price': 0, 'order_id': None, 'order_timestamp': None} for sym in self.config['symbols']} self.symbols_info = {} # Stores exchangeInfo for each symbol self.load_symbols_info() # Fetch exchangeInfo on startup self.conn_key = None try: balance = self.client.get_account() print(balance) # Should print account details except Exception as e: print(e) self.start_websockets() def load_symbols_info(self): """Fetch and cache exchangeInfo for all configured symbols.""" try: info = self.client.get_exchange_info() for symbol in self.config['symbols']: for sym_info in info['symbols']: if sym_info['symbol'] == symbol: self.symbols_info[symbol] = sym_info logger.info(f"Loaded symbol info for {symbol}") break logger.info("Exchange info loaded successfully.") except Exception as e: logger.error(f"Failed to load exchange info: {e}") raise def validate_order_params(self, symbol, amount, price): """ Validate order amount/price against Binance rules. Returns: (valid, adjusted_amount, adjusted_price) """ if symbol not in self.symbols_info: logger.error(f"No symbol info for {symbol}") return False, None, None sym_info = self.symbols_info[symbol] filters = {f['filterType']: f for f in sym_info['filters']} # --- Price Validation --- tick_size = float(filters['PRICE_FILTER']['tickSize']) price = round(price / tick_size) * tick_size # Round to nearest tick # --- Lot Size Validation --- lot_size = float(filters['LOT_SIZE']['stepSize']) amount = round(amount / lot_size) * lot_size # Round to nearest step # --- Min Notional Check --- min_notional = float(filters['MIN_NOTIONAL']['minNotional']) if price * amount < min_notional: logger.warning( f"Order notional too small for {symbol}: " f"Need {min_notional}, got {price * amount}" ) return False, None, None return True, amount, price def start_websockets(self): #self.conn_key = self.bm.user_socket(self.process_user_message) self.bm.start() self.bm.start_user_socket(callback=self.process_user_message) def process_user_message(self, msg): if msg['e'] == 'executionReport': symbol = msg['s'] order_id = msg['i'] status = msg['X'] side = msg['S'] filled_amount = float(msg['z']) logger.info(f"[{symbol}] Order Update: ID={order_id}, Status={status}, Side={side}, Filled={filled_amount}") if symbol in self.positions: pos = self.positions[symbol] if status in ['FILLED', 'PARTIALLY_FILLED']: if side == 'BUY': pos['amount'] += filled_amount pos['entry_price'] = float(msg['L']) elif side == 'SELL': pos['amount'] -= filled_amount if status == 'FILLED': pos['order_id'] = None pos['order_timestamp'] = None elif status == 'CANCELED': pos['order_id'] = None pos['order_timestamp'] = None def fetch_data(self, symbol): return self.client.get_klines(symbol=symbol, interval=self.config['timeframe'], limit=self.strategy.params.get('sma_periods', 50) + 1) def place_limit_order(self, symbol, signal): action = signal['action'] amount = signal['amount'] limit_price = signal['limit_price'] stop_price = signal.get('stop_price') # Validate parameters is_valid, adj_amount, adj_price = self.validate_order_params( symbol, amount, limit_price ) if not is_valid: logger.error(f"Invalid order params for {symbol}") return None # Adjust stop price if needed if stop_price: _, _, adj_stop_price = self.validate_order_params( symbol, amount, stop_price ) stop_price = adj_stop_price # Proceed with order placement try: order_params = { 'symbol': symbol, 'side': SIDE_BUY if action == ACTION_BUY else SIDE_SELL, 'type': ORDER_TYPE_LIMIT, 'timeInForce': TIME_IN_FORCE_GTC, 'quantity': adj_amount, 'price': adj_price } if stop_price: order_params.update({ 'type': ORDER_TYPE_STOP_LOSS_LIMIT, 'stopPrice': stop_price }) order = self.client.create_order(**order_params) logger.info( f"[{symbol}] Placed Order: {action}, " f"Amount={adj_amount}, Price={adj_price}" ) return order['orderId'] except Exception as e: logger.error(f"[{symbol}] Order placement failed: {e}") return None def cancel_order(self, symbol, order_id): try: self.client.cancel_order(symbol=symbol, orderId=order_id) logger.info(f"[{symbol}] Canceled Order: ID={order_id}") except Exception as e: logger.error(f"[{symbol}] Cancel error: {e}") def poll_updates(self): for symbol in self.config['symbols']: try: pos = self.positions[symbol] if pos['order_id']: order = self.client.get_order(symbol=symbol, orderId=pos['order_id']) logger.info(f"[{symbol}] Polled Order: ID={order['orderId']}, Status={order['status']}, Filled={order['executedQty']}") if order['status'] == 'FILLED': pos['order_id'] = None pos['order_timestamp'] = None # Cancel if timed out if pos['order_timestamp'] and time.time() - pos['order_timestamp'] > self.config['order_timeout']: self.cancel_order(symbol, pos['order_id']) pos['order_id'] = None pos['order_timestamp'] = None balance = self.client.get_asset_balance(asset=symbol[:-4]) # e.g., 'BTC' from 'BTCUSDT' usdt_balance = self.client.get_asset_balance(asset='USDT') logger.info(f"[{symbol}] Polled Position: Asset={balance['free']}, USDT={usdt_balance['free']}") pos['amount'] = float(balance['free']) # Sync position except Exception as e: logger.error(f"[{symbol}] Polling error: {e}") def run(self): while True: try: for symbol in self.config['symbols']: data = self.fetch_data(symbol) current_position = self.positions[symbol] signal = self.strategy.generate_signal(symbol, data, current_position) if signal and current_position['order_id'] is None: # No active order order_id = self.place_limit_order(symbol, signal) if order_id: self.positions[symbol]['order_id'] = order_id self.positions[symbol]['order_timestamp'] = time.time() self.poll_updates() time.sleep(self.config['loop_interval']) except Exception as e: logger.error(f"Error in main loop: {e}") time.sleep(10) # Retry if __name__ == "__main__": # Example usage: SMA Strategy with custom params strategy = SMAStrategy(params={ 'sma_periods': 50, 'buy_discount_pct': 0.005, 'sell_premium_pct': 0.005, 'take_profit_pct': 0.05, 'stop_loss_pct': -0.03, 'position_size_usd': 100 }) bot = TradingBot( 'BJbjlf7yTcso2VS7lxMKQbybqDkzJt68CIYDdi005orbvxtgUJGYg5Jz5S3YED43', '9AMYjDGpfmxS8k7kNuwoVnwgJX9HSDaERYyR3Wl9qHsyviMef3mo8pyn6OfWTsHD', strategy=strategy, config={ 'symbols': ['BTCUSDT', 'ETHUSDT', 'BNBUSDT', 'SOLUSDT', 'XRPUSDT', 'ADAUSDT', 'DOGEUSDT', 'AVAXUSDT', 'SHIBUSDT', 'DOTUSDT', 'LINKUSDT', 'TRXUSDT', 'UNIUSDT', 'LTCUSDT'], # Multi-symbol support 'timeframe': '1h', 'poll_interval': 30, 'loop_interval': 60, 'order_timeout': 300 } ) bot.run()