| Overall Statistics |
|
Total Orders 689 Average Win 0.64% Average Loss -0.95% Compounding Annual Return 3.422% Drawdown 70.000% Expectancy 0.648 Start Equity 1000.00 End Equity 1228.10 Net Profit 22.810% Sharpe Ratio 0.243 Sortino Ratio 0.304 Probabilistic Sharpe Ratio 1.296% Loss Rate 1% Win Rate 99% Profit-Loss Ratio 0.67 Alpha 0.068 Beta 0.504 Annual Standard Deviation 0.496 Annual Variance 0.246 Information Ratio 0.033 Tracking Error 0.496 Treynor Ratio 0.239 Total Fees $0.00 Estimated Strategy Capacity $2600000000.00 Lowest Capacity Asset XRPUSDT 18N Portfolio Turnover 0.26% |
from datetime import datetime, timedelta
import numpy as np
from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data import *
from QuantConnect.Indicators import *
class DCACryptoStrategy(QCAlgorithm):
def Initialize(self):
# Strategy configuration parameters
self.mode = self.GetParameter("mode", "training")
# Use SetStartDate and SetEndDate methods instead of direct assignment
self.SetStartDate(2019, 1, 1) # Year, Month, Day
self.SetEndDate(2025, 3, 31) # Year, Month, Day
self.initial_cash = float(self.GetParameter("initial_cash", "1000"))
self.allocation = float(self.GetParameter("allocation", "0.1")) # 10% of capital per purchase
self.sell_percentage = float(self.GetParameter("sell_percentage", "0.2")) # 20% max sell
self.trading_symbols = self.GetParameter("trading_symbols", "XRPUSDT,ETHUSDT").split(',')
self.exchange = self.GetParameter("exchange", "binance")
# Set initial cash
self.SetCash(self.initial_cash)
# Set up data resolution
self.resolution = Resolution.Daily
if self.mode == "live":
self.resolution = Resolution.Hour
# Dictionary to store symbols and their indicators
self.symbols_data = {}
# Add each crypto symbol
for symbol_str in self.trading_symbols:
symbol = self.AddCrypto(symbol_str.strip(), self.resolution, self.exchange).Symbol
# Create indicators for price analysis
sma20 = self.SMA(symbol, 20, Resolution.Daily)
sma50 = self.SMA(symbol, 50, Resolution.Daily)
rsi = self.RSI(symbol, 14, MovingAverageType.Simple, Resolution.Daily)
# Store symbol data
self.symbols_data[symbol] = {
'symbol': symbol,
'sma20': sma20,
'sma50': sma50,
'rsi': rsi,
'last_trade_date': None,
'price_history': []
}
# Set the warmup period to ensure indicators are ready
self.SetWarmUp(50)
# Schedule daily screening at market open
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(9, 30), self.DailyScreening)
def OnData(self, data):
"""Event fired each time new data arrives"""
if self.IsWarmingUp:
return
# Update price history for each symbol
for symbol, symbol_data in self.symbols_data.items():
if data.ContainsKey(symbol) and data[symbol] is not None:
current_price = data[symbol].Close
symbol_data['price_history'].append(current_price)
# Keep only the last 30 days of price data
if len(symbol_data['price_history']) > 30:
symbol_data['price_history'] = symbol_data['price_history'][-30:]
def DailyScreening(self):
"""Daily screening to find buying opportunities"""
if self.IsWarmingUp:
return
# Only execute one trade per day
trade_executed = False
# Sort symbols by buying opportunity score
ranked_symbols = self.RankSymbols()
for symbol_data in ranked_symbols:
symbol = symbol_data['symbol']
original_data = self.symbols_data[symbol]
# Skip if no data or indicators aren't ready
if not original_data['sma20'].IsReady or not original_data['sma50'].IsReady or not original_data['rsi'].IsReady:
continue
# Skip if we've already traded this symbol today
current_date = self.Time.date()
if original_data['last_trade_date'] == current_date:
continue
# Check if we should buy
if self.ShouldBuy(symbol_data) and not trade_executed:
self.ExecuteBuy(symbol)
original_data['last_trade_date'] = current_date
trade_executed = True
# Check if we should sell some position
elif self.ShouldSell(symbol_data) and not trade_executed:
self.ExecuteSell(symbol)
original_data['last_trade_date'] = current_date
trade_executed = True
def RankSymbols(self):
"""Rank symbols based on buying opportunity"""
ranked = []
for symbol, data in self.symbols_data.items():
if not data['sma20'].IsReady or not data['sma50'].IsReady or not data['rsi'].IsReady:
continue
# Calculate buying score (lower is better)
price = self.Securities[symbol].Price
# RSI score (lower RSI = better buying opportunity)
rsi_score = data['rsi'].Current.Value / 100
# Price relative to recent lows
if len(data['price_history']) > 0:
min_price = min(data['price_history'])
price_to_min_ratio = price / min_price if min_price > 0 else 1
else:
price_to_min_ratio = 1
# SMA signals (price below SMA = better buying opportunity)
sma_signal = 0
if price < data['sma20'].Current.Value:
sma_signal -= 0.3
if price < data['sma50'].Current.Value:
sma_signal -= 0.3
# Combined score (lower is better for buying)
buy_score = rsi_score + (price_to_min_ratio - 1) + sma_signal
ranked.append({
'symbol': symbol,
'buy_score': buy_score,
'rsi': data['rsi'].Current.Value,
'sma20': data['sma20'].Current.Value,
'sma50': data['sma50'].Current.Value,
'price_history': data['price_history'],
'last_trade_date': data['last_trade_date']
})
# Sort by buy score (lower is better)
return sorted(ranked, key=lambda x: x['buy_score'])
def ShouldBuy(self, symbol_data):
"""Determine if we should buy the cryptocurrency"""
symbol = symbol_data['symbol']
# Don't buy if we don't have enough cash
available_cash = self.Portfolio.Cash
allocation_amount = available_cash * self.allocation
if allocation_amount < 10: # Minimum order size
return False
current_price = self.Securities[symbol].Price
rsi = symbol_data['rsi']
# Buy conditions
# 1. RSI indicating oversold (RSI < 30)
# 2. Price is near recent lows
# 3. We have sufficient cash allocation
# Check if RSI indicates oversold
oversold = rsi < 30
# Check if price is near recent lows (bottom 20% of recent price range)
near_lows = False
if len(symbol_data['price_history']) > 5:
min_price = min(symbol_data['price_history'])
max_price = max(symbol_data['price_history'])
price_range = max_price - min_price
if price_range > 0:
relative_position = (current_price - min_price) / price_range
near_lows = relative_position < 0.2
return oversold or near_lows
def ShouldSell(self, symbol_data):
"""Determine if we should sell some of the cryptocurrency"""
symbol = symbol_data['symbol']
# Don't sell if we don't have a position
if not self.Portfolio[symbol].Invested:
return False
current_price = self.Securities[symbol].Price
rsi = symbol_data['rsi']
# Sell conditions
# 1. RSI indicating overbought (RSI > 70)
# 2. Price is near recent highs
# 3. We have a profitable position
# Check if RSI indicates overbought
overbought = rsi > 70
# Check if price is near recent highs (top 20% of recent price range)
near_highs = False
if len(symbol_data['price_history']) > 5:
min_price = min(symbol_data['price_history'])
max_price = max(symbol_data['price_history'])
price_range = max_price - min_price
if price_range > 0:
relative_position = (current_price - min_price) / price_range
near_highs = relative_position > 0.8
# Check if position is profitable
avg_price = self.Portfolio[symbol].AveragePrice
profitable = current_price > avg_price * 1.1 # 10% profit minimum
return (overbought or near_highs) and profitable
def ExecuteBuy(self, symbol):
"""Execute a buy order for the symbol"""
available_cash = self.Portfolio.Cash
allocation_amount = available_cash * self.allocation
# Skip if allocation is too small
if allocation_amount < 10:
self.Debug(f"Skipping buy for {symbol} - allocation too small: ${allocation_amount}")
return
# Calculate quantity based on current price
price = self.Securities[symbol].Price
quantity = allocation_amount / price
# Execute market order
self.MarketOrder(symbol, quantity)
self.Log(f"Buying {quantity} {symbol} at ${price} for ${allocation_amount}")
def ExecuteSell(self, symbol):
"""Execute a sell order for the symbol"""
if not self.Portfolio[symbol].Invested:
return
# Calculate quantity to sell (20% of holdings)
holdings = self.Portfolio[symbol].Quantity
sell_quantity = holdings * self.sell_percentage
# Execute market order
price = self.Securities[symbol].Price
self.MarketOrder(symbol, -sell_quantity)
self.Log(f"Selling {sell_quantity} {symbol} at ${price} for ${sell_quantity * price}")
def OnEndOfAlgorithm(self):
"""Event fired at the end of the algorithm"""
self.Log(f"Final Portfolio Value: ${self.Portfolio.TotalPortfolioValue}")
# Log performance of each position
for symbol in self.symbols_data:
if self.Portfolio[symbol].Invested:
qty = self.Portfolio[symbol].Quantity
avg_price = self.Portfolio[symbol].AveragePrice
current_price = self.Securities[symbol].Price
profit_loss = (current_price - avg_price) * qty
profit_loss_pct = ((current_price / avg_price) - 1) * 100
self.Log(f"{symbol}: {qty} units, avg price: ${avg_price}, current: ${current_price}")
self.Log(f"P&L: ${profit_loss} ({profit_loss_pct:.2f}%)")