| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299 |
- import time
- import logging
- import pandas as pd
- from binance.client import Client
- from binance.enums import *
- from binance import ThreadedWebsocketManager
- from twisted.internet import reactor
- from abc import ABC, abstractmethod # For abstract base class
- # Configure logging
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
- logger = logging.getLogger(__name__)
- # Signal Actions (extensible enum-like)
- ACTION_BUY = 'buy'
- ACTION_SELL_TP = 'sell_tp' # Take profit
- ACTION_SELL_SL = 'sell_sl' # Stop loss
- ACTION_CANCEL = 'cancel' # Cancel existing order
- class BaseStrategy(ABC):
- """Abstract base class for strategies. Inherit and implement generate_signal()."""
-
- def __init__(self, params=None):
- self.params = params or {} # Strategy-specific params (e.g., {'sma_periods': 50})
- @abstractmethod
- def generate_signal(self, symbol, data, current_position):
- """
- Generate a signal based on data.
- - data: pd.DataFrame with OHLCV.
- - current_position: dict {'amount': float, 'entry_price': float}.
- Returns: dict like {'action': 'buy', 'amount': float, 'limit_price': float} or None.
- """
- pass
- def pre_process_data(self, data):
- """Optional hook: Pre-process data before signal generation."""
- return data
- def post_signal(self, signal):
- """Optional hook: Post-process signal (e.g., adjust for risk)."""
- return signal
- class SMAStrategy(BaseStrategy):
- """Example strategy: Buy if price > SMA, sell on TP/SL."""
- def generate_signal(self, symbol, data, current_position):
- data = self.pre_process_data(data)
- periods = self.params.get('sma_periods', 50)
- buy_discount_pct = self.params.get('buy_discount_pct', 0.005)
- take_profit_pct = self.params.get('take_profit_pct', 0.05)
- stop_loss_pct = self.params.get('stop_loss_pct', -0.03)
- df = pd.DataFrame(data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_av', 'trades', 'tb_base_av', 'tb_quote_av', 'ignore'])
- df['close'] = df['close'].astype(float)
- sma = df['close'].rolling(window=periods).mean().iloc[-1]
- current_price = df['close'].iloc[-1]
- if current_position['amount'] > 0: # Check exit
- pnl_pct = (current_price - current_position['entry_price']) / current_position['entry_price']
- if pnl_pct >= take_profit_pct:
- limit_price = current_price * (1 + self.params.get('sell_premium_pct', 0.005))
- return self.post_signal({'action': ACTION_SELL_TP, 'amount': current_position['amount'], 'limit_price': limit_price})
- elif pnl_pct <= stop_loss_pct:
- limit_price = current_price * (1 - self.params.get('sell_premium_pct', 0.005))
- return self.post_signal({'action': ACTION_SELL_SL, 'amount': current_position['amount'], 'limit_price': limit_price, 'stop_price': limit_price})
- else: # Check entry
- if current_price > sma:
- amount = self.params.get('position_size_usd', 100) / current_price
- limit_price = current_price * (1 - buy_discount_pct)
- return self.post_signal({'action': ACTION_BUY, 'amount': amount, 'limit_price': limit_price})
- return None
- class TradingBot:
- def __init__(self, api_key, api_secret, strategy, config=None):
- self.client = Client(api_key, api_secret, testnet=False) # Set testnet=True for testing
- self.bm = ThreadedWebsocketManager(api_key, api_secret, testnet=False)
- self.strategy = strategy
- self.config = config or {
- 'symbols': ['BTCUSDT'],
- 'timeframe': '1h',
- 'poll_interval': 30,
- 'loop_interval': 60,
- 'order_timeout': 300 # Seconds before canceling unfilled orders
- }
- self.positions = {sym: {'amount': 0, 'entry_price': 0, 'order_id': None, 'order_timestamp': None} for sym in self.config['symbols']}
- self.symbols_info = {} # Stores exchangeInfo for each symbol
- self.load_symbols_info() # Fetch exchangeInfo on startup
- self.conn_key = None
- try:
- balance = self.client.get_account()
- print(balance) # Should print account details
- except Exception as e:
- print(e)
- self.start_websockets()
- def load_symbols_info(self):
- """Fetch and cache exchangeInfo for all configured symbols."""
- try:
- info = self.client.get_exchange_info()
- for symbol in self.config['symbols']:
- for sym_info in info['symbols']:
- if sym_info['symbol'] == symbol:
- self.symbols_info[symbol] = sym_info
- logger.info(f"Loaded symbol info for {symbol}")
- break
- logger.info("Exchange info loaded successfully.")
- except Exception as e:
- logger.error(f"Failed to load exchange info: {e}")
- raise
- def validate_order_params(self, symbol, amount, price):
- """
- Validate order amount/price against Binance rules.
- Returns: (valid, adjusted_amount, adjusted_price)
- """
- if symbol not in self.symbols_info:
- logger.error(f"No symbol info for {symbol}")
- return False, None, None
- sym_info = self.symbols_info[symbol]
- filters = {f['filterType']: f for f in sym_info['filters']}
- # --- Price Validation ---
- tick_size = float(filters['PRICE_FILTER']['tickSize'])
- price = round(price / tick_size) * tick_size # Round to nearest tick
- # --- Lot Size Validation ---
- lot_size = float(filters['LOT_SIZE']['stepSize'])
- amount = round(amount / lot_size) * lot_size # Round to nearest step
- # --- Min Notional Check ---
- min_notional = float(filters['MIN_NOTIONAL']['minNotional'])
- if price * amount < min_notional:
- logger.warning(
- f"Order notional too small for {symbol}: "
- f"Need {min_notional}, got {price * amount}"
- )
- return False, None, None
- return True, amount, price
- def start_websockets(self):
- #self.conn_key = self.bm.user_socket(self.process_user_message)
- self.bm.start()
- self.bm.start_user_socket(callback=self.process_user_message)
-
- def process_user_message(self, msg):
- if msg['e'] == 'executionReport':
- symbol = msg['s']
- order_id = msg['i']
- status = msg['X']
- side = msg['S']
- filled_amount = float(msg['z'])
- logger.info(f"[{symbol}] Order Update: ID={order_id}, Status={status}, Side={side}, Filled={filled_amount}")
- if symbol in self.positions:
- pos = self.positions[symbol]
- if status in ['FILLED', 'PARTIALLY_FILLED']:
- if side == 'BUY':
- pos['amount'] += filled_amount
- pos['entry_price'] = float(msg['L'])
- elif side == 'SELL':
- pos['amount'] -= filled_amount
- if status == 'FILLED':
- pos['order_id'] = None
- pos['order_timestamp'] = None
- elif status == 'CANCELED':
- pos['order_id'] = None
- pos['order_timestamp'] = None
- def fetch_data(self, symbol):
- return self.client.get_klines(symbol=symbol, interval=self.config['timeframe'], limit=self.strategy.params.get('sma_periods', 50) + 1)
- def place_limit_order(self, symbol, signal):
- action = signal['action']
- amount = signal['amount']
- limit_price = signal['limit_price']
- stop_price = signal.get('stop_price')
- # Validate parameters
- is_valid, adj_amount, adj_price = self.validate_order_params(
- symbol, amount, limit_price
- )
- if not is_valid:
- logger.error(f"Invalid order params for {symbol}")
- return None
- # Adjust stop price if needed
- if stop_price:
- _, _, adj_stop_price = self.validate_order_params(
- symbol, amount, stop_price
- )
- stop_price = adj_stop_price
- # Proceed with order placement
- try:
- order_params = {
- 'symbol': symbol,
- 'side': SIDE_BUY if action == ACTION_BUY else SIDE_SELL,
- 'type': ORDER_TYPE_LIMIT,
- 'timeInForce': TIME_IN_FORCE_GTC,
- 'quantity': adj_amount,
- 'price': adj_price
- }
- if stop_price:
- order_params.update({
- 'type': ORDER_TYPE_STOP_LOSS_LIMIT,
- 'stopPrice': stop_price
- })
- order = self.client.create_order(**order_params)
- logger.info(
- f"[{symbol}] Placed Order: {action}, "
- f"Amount={adj_amount}, Price={adj_price}"
- )
- return order['orderId']
- except Exception as e:
- logger.error(f"[{symbol}] Order placement failed: {e}")
- return None
- def cancel_order(self, symbol, order_id):
- try:
- self.client.cancel_order(symbol=symbol, orderId=order_id)
- logger.info(f"[{symbol}] Canceled Order: ID={order_id}")
- except Exception as e:
- logger.error(f"[{symbol}] Cancel error: {e}")
- def poll_updates(self):
- for symbol in self.config['symbols']:
- try:
- pos = self.positions[symbol]
- if pos['order_id']:
- order = self.client.get_order(symbol=symbol, orderId=pos['order_id'])
- logger.info(f"[{symbol}] Polled Order: ID={order['orderId']}, Status={order['status']}, Filled={order['executedQty']}")
- if order['status'] == 'FILLED':
- pos['order_id'] = None
- pos['order_timestamp'] = None
- # Cancel if timed out
- if pos['order_timestamp'] and time.time() - pos['order_timestamp'] > self.config['order_timeout']:
- self.cancel_order(symbol, pos['order_id'])
- pos['order_id'] = None
- pos['order_timestamp'] = None
- balance = self.client.get_asset_balance(asset=symbol[:-4]) # e.g., 'BTC' from 'BTCUSDT'
- usdt_balance = self.client.get_asset_balance(asset='USDT')
- logger.info(f"[{symbol}] Polled Position: Asset={balance['free']}, USDT={usdt_balance['free']}")
- pos['amount'] = float(balance['free']) # Sync position
- except Exception as e:
- logger.error(f"[{symbol}] Polling error: {e}")
- def run(self):
- while True:
- try:
- for symbol in self.config['symbols']:
- data = self.fetch_data(symbol)
- current_position = self.positions[symbol]
- signal = self.strategy.generate_signal(symbol, data, current_position)
- if signal and current_position['order_id'] is None: # No active order
- order_id = self.place_limit_order(symbol, signal)
- if order_id:
- self.positions[symbol]['order_id'] = order_id
- self.positions[symbol]['order_timestamp'] = time.time()
- self.poll_updates()
- time.sleep(self.config['loop_interval'])
- except Exception as e:
- logger.error(f"Error in main loop: {e}")
- time.sleep(10) # Retry
- if __name__ == "__main__":
- # Example usage: SMA Strategy with custom params
- strategy = SMAStrategy(params={
- 'sma_periods': 50,
- 'buy_discount_pct': 0.005,
- 'sell_premium_pct': 0.005,
- 'take_profit_pct': 0.05,
- 'stop_loss_pct': -0.03,
- 'position_size_usd': 100
- })
- bot = TradingBot(
- 'BJbjlf7yTcso2VS7lxMKQbybqDkzJt68CIYDdi005orbvxtgUJGYg5Jz5S3YED43', '9AMYjDGpfmxS8k7kNuwoVnwgJX9HSDaERYyR3Wl9qHsyviMef3mo8pyn6OfWTsHD',
- strategy=strategy,
- config={
- 'symbols': ['BTCUSDT', 'ETHUSDT', 'BNBUSDT', 'SOLUSDT', 'XRPUSDT', 'ADAUSDT', 'DOGEUSDT',
- 'AVAXUSDT', 'SHIBUSDT', 'DOTUSDT', 'LINKUSDT', 'TRXUSDT', 'UNIUSDT', 'LTCUSDT'], # Multi-symbol support
- 'timeframe': '1h',
- 'poll_interval': 30,
- 'loop_interval': 60,
- 'order_timeout': 300
- }
- )
- bot.run()
|