Overall Statistics
Total Orders
695
Average Win
0.75%
Average Loss
-2.11%
Compounding Annual Return
5.349%
Drawdown
70.000%
Expectancy
0.338
Start Equity
2000.00
End Equity
2749.29
Net Profit
37.464%
Sharpe Ratio
0.459
Sortino Ratio
0.691
Probabilistic Sharpe Ratio
2.757%
Loss Rate
1%
Win Rate
99%
Profit-Loss Ratio
0.36
Alpha
0.224
Beta
0.483
Annual Standard Deviation
0.598
Annual Variance
0.358
Information Ratio
0.284
Tracking Error
0.599
Treynor Ratio
0.569
Total Fees
$0.00
Estimated Strategy Capacity
$3300000000.00
Lowest Capacity Asset
XRPUSDT 18N
Portfolio Turnover
0.29%
from datetime import datetime, timedelta
import numpy as np
from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data import *
from QuantConnect.Indicators import *
from QuantConnect.Orders import OrderStatus

class DCACryptoStrategy(QCAlgorithm):
    
    def Initialize(self):
        # Strategy configuration parameters
        self.mode = self.GetParameter("mode", "training")
        # Use SetStartDate and SetEndDate methods instead of direct assignment
        self.SetStartDate(2019, 1, 1)  # Year, Month, Day
        self.SetEndDate(2025, 3, 31)   # Year, Month, Day
        
        self.initial_cash = float(self.GetParameter("initial_cash", "1000"))
        self.allocation = float(self.GetParameter("allocation", "0.1"))  # 10% of capital per purchase
        self.sell_percentage = float(self.GetParameter("sell_percentage", "0.2"))  # 20% max sell
        self.trading_symbols = self.GetParameter("trading_symbols", "XRPUSDT,ETHUSDT").split(',')
        self.exchange = self.GetParameter("exchange", "binance")
        
        # Track our buying activities to limit to one buy per day
        self.last_buy_date = None
        
        # Set initial cash
        self.SetCash(self.initial_cash)
        
        # Add USDT as a base currency with conversion rate of 1.0
        self.SetCash("USDT", self.initial_cash, 1.0)
        
        # Set up data resolution
        self.resolution = Resolution.Daily
        if self.mode == "live":
            self.resolution = Resolution.Hour
        
        # Dictionary to store symbols and their indicators
        self.symbols_data = {}
        
        # Add each crypto symbol
        for symbol_str in self.trading_symbols:
            symbol = self.AddCrypto(symbol_str.strip(), self.resolution, self.exchange).Symbol
            
            # Create indicators for price analysis using QCAlgorithm helper methods
            sma20 = self.SMA(symbol, 20, Resolution.Daily)
            sma50 = self.SMA(symbol, 50, Resolution.Daily)
            rsi = self.RSI(symbol, 14, MovingAverageType.Simple, Resolution.Daily)
            
            # Store symbol data
            self.symbols_data[symbol] = {
                'symbol': symbol,
                'sma20': sma20,
                'sma50': sma50,
                'rsi': rsi,
                'price_history': []
            }
        
        # Set the warmup period to ensure indicators are ready
        self.SetWarmUp(50)
        
        # Schedule daily screening at market open
        self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(9, 30), self.DailyScreening)

    def OnData(self, data):
        """Event fired each time new data arrives"""
        if self.IsWarmingUp:
            return
            
        # Update price history for each symbol
        for symbol, symbol_data in self.symbols_data.items():
            if data.ContainsKey(symbol) and data[symbol] is not None:
                current_price = data[symbol].Close
                symbol_data['price_history'].append(current_price)
                
                # Keep only the last 30 days of price data
                if len(symbol_data['price_history']) > 30:
                    symbol_data['price_history'] = symbol_data['price_history'][-30:]
    
    def DailyScreening(self):
        """Daily screening to find buying opportunities"""
        if self.IsWarmingUp:
            return
            
        # Check if we've already made a trade today
        current_date = self.Time.date()
        if self.last_buy_date == current_date:
            return
            
        # Sort symbols by buying opportunity score
        ranked_symbols = self.RankSymbols()
        
        # First check if we should sell any positions
        for symbol_data in ranked_symbols:
            symbol = symbol_data['symbol']
            original_data = self.symbols_data[symbol]
            
            # Skip if indicators aren't ready
            if not self.IndicatorsReady(original_data):
                continue
                
            # Check if we should sell some position
            if self.ShouldSell(symbol_data):
                if self.ExecuteSell(symbol):
                    self.last_buy_date = current_date
                    return  # Only one trade per day
        
        # If we didn't sell anything, check for buying opportunities
        for symbol_data in ranked_symbols:
            symbol = symbol_data['symbol']
            original_data = self.symbols_data[symbol]
            
            # Skip if indicators aren't ready
            if not self.IndicatorsReady(original_data):
                continue
                
            # Check if we should buy
            if self.ShouldBuy(symbol_data):
                if self.ExecuteBuy(symbol):
                    self.last_buy_date = current_date
                    return  # Only one trade per day
    
    def IndicatorsReady(self, symbol_data):
        """Check if all indicators for a symbol are ready"""
        return (symbol_data['sma20'].IsReady and 
                symbol_data['sma50'].IsReady and 
                symbol_data['rsi'].IsReady)
    
    def RankSymbols(self):
        """Rank symbols based on buying opportunity"""
        ranked = []
        
        for symbol, data in self.symbols_data.items():
            if not self.IndicatorsReady(data):
                continue
                
            # Calculate buying score (lower is better)
            price = self.Securities[symbol].Price
            
            # RSI score (lower RSI = better buying opportunity)
            rsi_score = data['rsi'].Current.Value / 100
            
            # Price relative to recent lows
            if len(data['price_history']) > 0:
                min_price = min(data['price_history'])
                price_to_min_ratio = price / min_price if min_price > 0 else 1
            else:
                price_to_min_ratio = 1
                
            # SMA signals (price below SMA = better buying opportunity)
            sma_signal = 0
            if price < data['sma20'].Current.Value:
                sma_signal -= 0.3
            if price < data['sma50'].Current.Value:
                sma_signal -= 0.3
                
            # Combined score (lower is better for buying)
            buy_score = rsi_score + (price_to_min_ratio - 1) + sma_signal
            
            ranked.append({
                'symbol': symbol,
                'buy_score': buy_score,
                'rsi': data['rsi'].Current.Value,
                'sma20': data['sma20'].Current.Value,
                'sma50': data['sma50'].Current.Value,
                'price_history': data['price_history'],
                'current_price': price
            })
            
        # Sort by buy score (lower is better)
        return sorted(ranked, key=lambda x: x['buy_score'])
    
    def ShouldBuy(self, symbol_data):
        """Determine if we should buy the cryptocurrency"""
        symbol = symbol_data['symbol']
        
        # Don't buy if we don't have enough cash
        available_cash = self.Portfolio.Cash
        allocation_amount = available_cash * self.allocation
        
        # Check minimum order size based on exchange requirements
        min_order_size = self.GetMinimumOrderSize(symbol)
        if allocation_amount < min_order_size:
            self.Debug(f"Allocation too small for {symbol}: ${allocation_amount} < ${min_order_size}")
            return False
            
        current_price = symbol_data['current_price']
        rsi = symbol_data['rsi']
        
        # Buy conditions
        # 1. RSI indicating oversold (RSI < 30)
        # 2. Price is near recent lows
        # 3. We have sufficient cash allocation
        
        # Check if RSI indicates oversold
        oversold = rsi < 30
        
        # Check if price is near recent lows (bottom 20% of recent price range)
        near_lows = False
        if len(symbol_data['price_history']) > 5:
            min_price = min(symbol_data['price_history'])
            max_price = max(symbol_data['price_history'])
            price_range = max_price - min_price
            if price_range > 0:
                relative_position = (current_price - min_price) / price_range
                near_lows = relative_position < 0.2
        
        return oversold or near_lows
    
    def ShouldSell(self, symbol_data):
        """Determine if we should sell some of the cryptocurrency"""
        symbol = symbol_data['symbol']
        
        # Don't sell if we don't have a position
        if not self.Portfolio[symbol].Invested:
            return False
            
        current_price = symbol_data['current_price']
        rsi = symbol_data['rsi']
        
        # Sell conditions
        # 1. RSI indicating overbought (RSI > 70)
        # 2. Price is near recent highs
        # 3. We have a profitable position
        
        # Check if RSI indicates overbought
        overbought = rsi > 70
        
        # Check if price is near recent highs (top 20% of recent price range)
        near_highs = False
        if len(symbol_data['price_history']) > 5:
            min_price = min(symbol_data['price_history'])
            max_price = max(symbol_data['price_history'])
            price_range = max_price - min_price
            if price_range > 0:
                relative_position = (current_price - min_price) / price_range
                near_highs = relative_position > 0.8
        
        # Check if position is profitable
        avg_price = self.Portfolio[symbol].AveragePrice
        profitable = current_price > avg_price * 1.1  # 10% profit minimum
        
        return (overbought or near_highs) and profitable
    
    def GetMinimumOrderSize(self, symbol):
        """Calculate minimum order size based on exchange rules"""
        # Default minimum in dollars (adjust as needed for your exchange)
        return 10.0
    
    def CalculateOrderQuantity(self, symbol, dollar_amount):
        """Calculate the appropriate order quantity respecting lot sizes"""
        price = self.Securities[symbol].Price
        
        # Default quantity calculation
        raw_quantity = dollar_amount / price
        
        # Get the lot size for this symbol (usually available from exchange info)
        # For crypto exchanges, lot size is typically 0.0001 for ETH, etc.
        lot_size = self.GetLotSize(symbol)
        
        # Round down to the nearest lot size
        quantity = np.floor(raw_quantity / lot_size) * lot_size
        
        return quantity
    
    def GetLotSize(self, symbol):
        """Get the lot size for a symbol"""
        # This would ideally come from exchange information
        # For simplicity, using a default value based on the symbol
        symbol_str = str(symbol)
        if "ETH" in symbol_str:
            return 0.0001  # Typical ETH lot size
        else:
            return 1.0  # Typical for many other cryptos like XRP
    
    def ExecuteBuy(self, symbol):
        """Execute a buy order for the symbol"""
        available_cash = self.Portfolio.Cash
        allocation_amount = available_cash * self.allocation
        
        # Get minimum order size for this symbol
        min_order_size = self.GetMinimumOrderSize(symbol)
        
        # Skip if allocation is too small
        if allocation_amount < min_order_size:
            self.Debug(f"Skipping buy for {symbol} - allocation too small: ${allocation_amount}")
            return False
            
        # Calculate quantity based on current price and lot size
        quantity = self.CalculateOrderQuantity(symbol, allocation_amount)
        
        # Skip if quantity is too small
        if quantity <= 0:
            self.Debug(f"Skipping buy for {symbol} - calculated quantity too small: {quantity}")
            return False
            
        # Calculate actual cost
        price = self.Securities[symbol].Price
        actual_cost = quantity * price
        
        # Make sure we have enough cash (with a small buffer)
        if actual_cost > available_cash * 0.99:
            self.Debug(f"Skipping buy for {symbol} - not enough cash: need ${actual_cost}, have ${available_cash}")
            return False
            
        # Execute market order
        order_ticket = self.MarketOrder(symbol, quantity)
        
        # Log the order regardless of status
        self.Log(f"Buying {quantity} {symbol} at ${price} for ${actual_cost}")
        
        # Return true to indicate we attempted an order
        return True
    
    def ExecuteSell(self, symbol):
        """Execute a sell order for the symbol"""
        if not self.Portfolio[symbol].Invested:
            return False
            
        # Calculate quantity to sell (20% of holdings)
        holdings = self.Portfolio[symbol].Quantity
        sell_quantity = holdings * self.sell_percentage
        
        # Ensure we're respecting lot sizes
        lot_size = self.GetLotSize(symbol)
        sell_quantity = np.floor(sell_quantity / lot_size) * lot_size
        
        # Skip if quantity is too small
        if sell_quantity <= 0:
            self.Debug(f"Skipping sell for {symbol} - calculated quantity too small: {sell_quantity}")
            return False
            
        # Execute market order
        price = self.Securities[symbol].Price
        order_ticket = self.MarketOrder(symbol, -sell_quantity)
        
        # Log the order regardless of status
        self.Log(f"Selling {sell_quantity} {symbol} at ${price} for ${sell_quantity * price}")
        
        # Return true to indicate we attempted an order
        return True
    
    def OnEndOfAlgorithm(self):
        """Event fired at the end of the algorithm"""
        self.Log(f"Final Portfolio Value: ${self.Portfolio.TotalPortfolioValue}")
        
        # Log performance of each position
        for symbol in self.symbols_data:
            if self.Portfolio[symbol].Invested:
                qty = self.Portfolio[symbol].Quantity
                avg_price = self.Portfolio[symbol].AveragePrice
                current_price = self.Securities[symbol].Price
                profit_loss = (current_price - avg_price) * qty
                profit_loss_pct = ((current_price / avg_price) - 1) * 100
                
                self.Log(f"{symbol}: {qty} units, avg price: ${avg_price}, current: ${current_price}")
                self.Log(f"P&L: ${profit_loss} ({profit_loss_pct:.2f}%)")