Overall Statistics
Total Orders
689
Average Win
0.64%
Average Loss
-0.95%
Compounding Annual Return
3.422%
Drawdown
70.000%
Expectancy
0.648
Start Equity
1000.00
End Equity
1228.10
Net Profit
22.810%
Sharpe Ratio
0.243
Sortino Ratio
0.304
Probabilistic Sharpe Ratio
1.296%
Loss Rate
1%
Win Rate
99%
Profit-Loss Ratio
0.67
Alpha
0.068
Beta
0.504
Annual Standard Deviation
0.496
Annual Variance
0.246
Information Ratio
0.033
Tracking Error
0.496
Treynor Ratio
0.239
Total Fees
$0.00
Estimated Strategy Capacity
$2600000000.00
Lowest Capacity Asset
XRPUSDT 18N
Portfolio Turnover
0.26%
from datetime import datetime, timedelta
import numpy as np
from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data import *
from QuantConnect.Indicators import *

class DCACryptoStrategy(QCAlgorithm):
    
    def Initialize(self):
        # Strategy configuration parameters
        self.mode = self.GetParameter("mode", "training")
        # Use SetStartDate and SetEndDate methods instead of direct assignment
        self.SetStartDate(2019, 1, 1)  # Year, Month, Day
        self.SetEndDate(2025, 3, 31)   # Year, Month, Day
        
        self.initial_cash = float(self.GetParameter("initial_cash", "1000"))
        self.allocation = float(self.GetParameter("allocation", "0.1"))  # 10% of capital per purchase
        self.sell_percentage = float(self.GetParameter("sell_percentage", "0.2"))  # 20% max sell
        self.trading_symbols = self.GetParameter("trading_symbols", "XRPUSDT,ETHUSDT").split(',')
        self.exchange = self.GetParameter("exchange", "binance")
        
        # Set initial cash
        self.SetCash(self.initial_cash)
        
        # Set up data resolution
        self.resolution = Resolution.Daily
        if self.mode == "live":
            self.resolution = Resolution.Hour
        
        # Dictionary to store symbols and their indicators
        self.symbols_data = {}
        
        # Add each crypto symbol
        for symbol_str in self.trading_symbols:
            symbol = self.AddCrypto(symbol_str.strip(), self.resolution, self.exchange).Symbol
            
            # Create indicators for price analysis
            sma20 = self.SMA(symbol, 20, Resolution.Daily)
            sma50 = self.SMA(symbol, 50, Resolution.Daily)
            rsi = self.RSI(symbol, 14, MovingAverageType.Simple, Resolution.Daily)
            
            # Store symbol data
            self.symbols_data[symbol] = {
                'symbol': symbol,
                'sma20': sma20,
                'sma50': sma50,
                'rsi': rsi,
                'last_trade_date': None,
                'price_history': []
            }
        
        # Set the warmup period to ensure indicators are ready
        self.SetWarmUp(50)
        
        # Schedule daily screening at market open
        self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(9, 30), self.DailyScreening)

    def OnData(self, data):
        """Event fired each time new data arrives"""
        if self.IsWarmingUp:
            return
            
        # Update price history for each symbol
        for symbol, symbol_data in self.symbols_data.items():
            if data.ContainsKey(symbol) and data[symbol] is not None:
                current_price = data[symbol].Close
                symbol_data['price_history'].append(current_price)
                
                # Keep only the last 30 days of price data
                if len(symbol_data['price_history']) > 30:
                    symbol_data['price_history'] = symbol_data['price_history'][-30:]
    
    def DailyScreening(self):
        """Daily screening to find buying opportunities"""
        if self.IsWarmingUp:
            return
            
        # Only execute one trade per day
        trade_executed = False
        
        # Sort symbols by buying opportunity score
        ranked_symbols = self.RankSymbols()
        
        for symbol_data in ranked_symbols:
            symbol = symbol_data['symbol']
            original_data = self.symbols_data[symbol]
            
            # Skip if no data or indicators aren't ready
            if not original_data['sma20'].IsReady or not original_data['sma50'].IsReady or not original_data['rsi'].IsReady:
                continue
                
            # Skip if we've already traded this symbol today
            current_date = self.Time.date()
            if original_data['last_trade_date'] == current_date:
                continue
                
            # Check if we should buy
            if self.ShouldBuy(symbol_data) and not trade_executed:
                self.ExecuteBuy(symbol)
                original_data['last_trade_date'] = current_date
                trade_executed = True
                
            # Check if we should sell some position
            elif self.ShouldSell(symbol_data) and not trade_executed:
                self.ExecuteSell(symbol)
                original_data['last_trade_date'] = current_date
                trade_executed = True
    
    def RankSymbols(self):
        """Rank symbols based on buying opportunity"""
        ranked = []
        
        for symbol, data in self.symbols_data.items():
            if not data['sma20'].IsReady or not data['sma50'].IsReady or not data['rsi'].IsReady:
                continue
                
            # Calculate buying score (lower is better)
            price = self.Securities[symbol].Price
            
            # RSI score (lower RSI = better buying opportunity)
            rsi_score = data['rsi'].Current.Value / 100
            
            # Price relative to recent lows
            if len(data['price_history']) > 0:
                min_price = min(data['price_history'])
                price_to_min_ratio = price / min_price if min_price > 0 else 1
            else:
                price_to_min_ratio = 1
                
            # SMA signals (price below SMA = better buying opportunity)
            sma_signal = 0
            if price < data['sma20'].Current.Value:
                sma_signal -= 0.3
            if price < data['sma50'].Current.Value:
                sma_signal -= 0.3
                
            # Combined score (lower is better for buying)
            buy_score = rsi_score + (price_to_min_ratio - 1) + sma_signal
            
            ranked.append({
                'symbol': symbol,
                'buy_score': buy_score,
                'rsi': data['rsi'].Current.Value,
                'sma20': data['sma20'].Current.Value,
                'sma50': data['sma50'].Current.Value,
                'price_history': data['price_history'],
                'last_trade_date': data['last_trade_date']
            })
            
        # Sort by buy score (lower is better)
        return sorted(ranked, key=lambda x: x['buy_score'])
    
    def ShouldBuy(self, symbol_data):
        """Determine if we should buy the cryptocurrency"""
        symbol = symbol_data['symbol']
        
        # Don't buy if we don't have enough cash
        available_cash = self.Portfolio.Cash
        allocation_amount = available_cash * self.allocation
        if allocation_amount < 10:  # Minimum order size
            return False
            
        current_price = self.Securities[symbol].Price
        rsi = symbol_data['rsi']
        
        # Buy conditions
        # 1. RSI indicating oversold (RSI < 30)
        # 2. Price is near recent lows
        # 3. We have sufficient cash allocation
        
        # Check if RSI indicates oversold
        oversold = rsi < 30
        
        # Check if price is near recent lows (bottom 20% of recent price range)
        near_lows = False
        if len(symbol_data['price_history']) > 5:
            min_price = min(symbol_data['price_history'])
            max_price = max(symbol_data['price_history'])
            price_range = max_price - min_price
            if price_range > 0:
                relative_position = (current_price - min_price) / price_range
                near_lows = relative_position < 0.2
        
        return oversold or near_lows
    
    def ShouldSell(self, symbol_data):
        """Determine if we should sell some of the cryptocurrency"""
        symbol = symbol_data['symbol']
        
        # Don't sell if we don't have a position
        if not self.Portfolio[symbol].Invested:
            return False
            
        current_price = self.Securities[symbol].Price
        rsi = symbol_data['rsi']
        
        # Sell conditions
        # 1. RSI indicating overbought (RSI > 70)
        # 2. Price is near recent highs
        # 3. We have a profitable position
        
        # Check if RSI indicates overbought
        overbought = rsi > 70
        
        # Check if price is near recent highs (top 20% of recent price range)
        near_highs = False
        if len(symbol_data['price_history']) > 5:
            min_price = min(symbol_data['price_history'])
            max_price = max(symbol_data['price_history'])
            price_range = max_price - min_price
            if price_range > 0:
                relative_position = (current_price - min_price) / price_range
                near_highs = relative_position > 0.8
        
        # Check if position is profitable
        avg_price = self.Portfolio[symbol].AveragePrice
        profitable = current_price > avg_price * 1.1  # 10% profit minimum
        
        return (overbought or near_highs) and profitable
    
    def ExecuteBuy(self, symbol):
        """Execute a buy order for the symbol"""
        available_cash = self.Portfolio.Cash
        allocation_amount = available_cash * self.allocation
        
        # Skip if allocation is too small
        if allocation_amount < 10:
            self.Debug(f"Skipping buy for {symbol} - allocation too small: ${allocation_amount}")
            return
            
        # Calculate quantity based on current price
        price = self.Securities[symbol].Price
        quantity = allocation_amount / price
        
        # Execute market order
        self.MarketOrder(symbol, quantity)
        self.Log(f"Buying {quantity} {symbol} at ${price} for ${allocation_amount}")
    
    def ExecuteSell(self, symbol):
        """Execute a sell order for the symbol"""
        if not self.Portfolio[symbol].Invested:
            return
            
        # Calculate quantity to sell (20% of holdings)
        holdings = self.Portfolio[symbol].Quantity
        sell_quantity = holdings * self.sell_percentage
        
        # Execute market order
        price = self.Securities[symbol].Price
        self.MarketOrder(symbol, -sell_quantity)
        self.Log(f"Selling {sell_quantity} {symbol} at ${price} for ${sell_quantity * price}")
    
    def OnEndOfAlgorithm(self):
        """Event fired at the end of the algorithm"""
        self.Log(f"Final Portfolio Value: ${self.Portfolio.TotalPortfolioValue}")
        
        # Log performance of each position
        for symbol in self.symbols_data:
            if self.Portfolio[symbol].Invested:
                qty = self.Portfolio[symbol].Quantity
                avg_price = self.Portfolio[symbol].AveragePrice
                current_price = self.Securities[symbol].Price
                profit_loss = (current_price - avg_price) * qty
                profit_loss_pct = ((current_price / avg_price) - 1) * 100
                
                self.Log(f"{symbol}: {qty} units, avg price: ${avg_price}, current: ${current_price}")
                self.Log(f"P&L: ${profit_loss} ({profit_loss_pct:.2f}%)")