| Overall Statistics |
|
Total Orders 13 Average Win 0.15% Average Loss 0% Compounding Annual Return -4.160% Drawdown 24.600% Expectancy 0 Start Equity 2000.00 End Equity 1542.90 Net Profit -22.855% Sharpe Ratio 0.178 Sortino Ratio 0.771 Probabilistic Sharpe Ratio 0.027% Loss Rate 0% Win Rate 100% Profit-Loss Ratio 0 Alpha 0.063 Beta -0.022 Annual Standard Deviation 0.341 Annual Variance 0.116 Information Ratio -0.114 Tracking Error 0.38 Treynor Ratio -2.807 Total Fees $0.00 Estimated Strategy Capacity $9000000.00 Lowest Capacity Asset ENAUSDT 18N Portfolio Turnover 0.02% |
from datetime import datetime, timedelta
import numpy as np
from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Data import *
from QuantConnect.Indicators import *
from QuantConnect.Orders import OrderStatus
class DCACryptoStrategy(QCAlgorithm):
def Initialize(self):
# Strategy configuration parameters
self.mode = self.GetParameter("mode", "training")
# Use SetStartDate and SetEndDate methods instead of direct assignment
self.SetStartDate(2019, 1, 1) # Year, Month, Day
self.SetEndDate(2025, 3, 31) # Year, Month, Day
self.initial_cash = float(self.GetParameter("initial_cash", "1000"))
self.allocation = float(self.GetParameter("allocation", "0.1")) # 10% of capital per purchase
self.sell_percentage = float(self.GetParameter("sell_percentage", "0.2")) # 20% max sell
self.trading_symbols = self.GetParameter("trading_symbols", "TAOUSDT,ENAUSDT").split(',')
self.exchange = self.GetParameter("exchange", "binance")
# Track our buying activities to limit to one buy per day
self.last_buy_date = None
# Set initial cash
self.SetCash(self.initial_cash)
# Add USDT as a base currency with conversion rate of 1.0
self.SetCash("USDT", self.initial_cash, 1.0)
# Set up data resolution
self.resolution = Resolution.Daily
if self.mode == "live":
self.resolution = Resolution.Hour
# Dictionary to store symbols and their indicators
self.symbols_data = {}
# Add each crypto symbol
for symbol_str in self.trading_symbols:
symbol = self.AddCrypto(symbol_str.strip(), self.resolution, self.exchange).Symbol
# Create indicators for price analysis using QCAlgorithm helper methods
sma20 = self.SMA(symbol, 20, Resolution.Daily)
sma50 = self.SMA(symbol, 50, Resolution.Daily)
rsi = self.RSI(symbol, 14, MovingAverageType.Simple, Resolution.Daily)
# Store symbol data
self.symbols_data[symbol] = {
'symbol': symbol,
'sma20': sma20,
'sma50': sma50,
'rsi': rsi,
'price_history': []
}
# Set the warmup period to ensure indicators are ready
self.SetWarmUp(50)
# Schedule daily screening at market open
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(10, 0), self.DailyScreening)
def OnData(self, data):
"""Event fired each time new data arrives"""
if self.IsWarmingUp:
return
# Update price history for each symbol
for symbol, symbol_data in self.symbols_data.items():
if data.ContainsKey(symbol) and data[symbol] is not None:
current_price = data[symbol].Close
symbol_data['price_history'].append(current_price)
# Keep only the last 30 days of price data
if len(symbol_data['price_history']) > 30:
symbol_data['price_history'] = symbol_data['price_history'][-30:]
def DailyScreening(self):
"""Daily screening to find buying opportunities"""
if self.IsWarmingUp:
return
# Check if we've already made a trade today
current_date = self.Time.date()
if self.last_buy_date == current_date:
return
# Sort symbols by buying opportunity score
ranked_symbols = self.RankSymbols()
# First check if we should sell any positions
for symbol_data in ranked_symbols:
symbol = symbol_data['symbol']
original_data = self.symbols_data[symbol]
# Skip if indicators aren't ready
if not self.IndicatorsReady(original_data):
continue
# Check if we should sell some position
if self.ShouldSell(symbol_data):
if self.ExecuteSell(symbol):
self.last_buy_date = current_date
return # Only one trade per day
# If we didn't sell anything, check for buying opportunities
for symbol_data in ranked_symbols:
symbol = symbol_data['symbol']
original_data = self.symbols_data[symbol]
# Skip if indicators aren't ready
if not self.IndicatorsReady(original_data):
continue
# Check if we should buy
if self.ShouldBuy(symbol_data):
if self.ExecuteBuy(symbol):
self.last_buy_date = current_date
return # Only one trade per day
def IndicatorsReady(self, symbol_data):
"""Check if all indicators for a symbol are ready"""
return (symbol_data['sma20'].IsReady and
symbol_data['sma50'].IsReady and
symbol_data['rsi'].IsReady)
def RankSymbols(self):
"""Rank symbols based on buying opportunity"""
ranked = []
for symbol, data in self.symbols_data.items():
if not self.IndicatorsReady(data):
continue
# Calculate buying score (lower is better)
price = self.Securities[symbol].Price
# RSI score (lower RSI = better buying opportunity)
rsi_score = data['rsi'].Current.Value / 100
# Price relative to recent lows
if len(data['price_history']) > 0:
min_price = min(data['price_history'])
price_to_min_ratio = price / min_price if min_price > 0 else 1
else:
price_to_min_ratio = 1
# SMA signals (price below SMA = better buying opportunity)
sma_signal = 0
if price < data['sma20'].Current.Value:
sma_signal -= 0.3
if price < data['sma50'].Current.Value:
sma_signal -= 0.3
# Combined score (lower is better for buying)
buy_score = rsi_score + (price_to_min_ratio - 1) + sma_signal
ranked.append({
'symbol': symbol,
'buy_score': buy_score,
'rsi': data['rsi'].Current.Value,
'sma20': data['sma20'].Current.Value,
'sma50': data['sma50'].Current.Value,
'price_history': data['price_history'],
'current_price': price
})
# Sort by buy score (lower is better)
return sorted(ranked, key=lambda x: x['buy_score'])
def ShouldBuy(self, symbol_data):
"""Determine if we should buy the cryptocurrency"""
symbol = symbol_data['symbol']
# Always want to buy - this is a DCA strategy
return True
def ShouldSell(self, symbol_data):
"""Determine if we should sell some of the cryptocurrency"""
symbol = symbol_data['symbol']
# Don't sell if we don't have a position
if not self.Portfolio[symbol].Invested:
return False
current_price = symbol_data['current_price']
rsi = symbol_data['rsi']
# Sell conditions
# 1. RSI indicating overbought (RSI > 70)
# 2. Price is near recent highs
# 3. We have a profitable position
# Check if RSI indicates overbought
overbought = rsi > 70
# Check if price is near recent highs (top 20% of recent price range)
near_highs = False
if len(symbol_data['price_history']) > 5:
min_price = min(symbol_data['price_history'])
max_price = max(symbol_data['price_history'])
price_range = max_price - min_price
if price_range > 0:
relative_position = (current_price - min_price) / price_range
near_highs = relative_position > 0.8
# Check if position is profitable
avg_price = self.Portfolio[symbol].AveragePrice
profitable = current_price > avg_price * 1.1 # 10% profit minimum
return (overbought or near_highs) and profitable
def GetMinimumOrderSize(self, symbol):
"""Calculate minimum order size based on exchange rules"""
# Default minimum in dollars
return 10.0
def CalculateOrderQuantity(self, symbol, dollar_amount):
"""Calculate the appropriate order quantity respecting lot sizes"""
price = self.Securities[symbol].Price
if price <= 0:
return 0
# Default quantity calculation
raw_quantity = dollar_amount / price
# Get the lot size for this symbol
lot_size = self.GetLotSize(symbol)
# Round down to the nearest lot size
quantity = np.floor(raw_quantity / lot_size) * lot_size
# Check for minimum quantity
if quantity * price < self.GetMinimumOrderSize(symbol):
return 0
return quantity
def GetLotSize(self, symbol):
"""Get the lot size for a symbol"""
# This would ideally come from exchange information
symbol_str = str(symbol)
if "ETH" in symbol_str:
return 0.0001 # Typical ETH lot size
else:
return 1.0 # Typical for many other cryptos like XRP
def ExecuteBuy(self, symbol):
"""Execute a buy order for the symbol"""
# Check if we have USDT available
usdt_available = self.Portfolio.CashBook["USDT"].Amount
# Use a fixed amount of $100 for each purchase
allocation_amount = 100
# Make sure we have enough USDT
if usdt_available < allocation_amount:
self.Debug(f"Not enough USDT for {symbol} purchase: have ${usdt_available}, need ${allocation_amount}")
return False
# Calculate quantity based on current price and lot size
quantity = self.CalculateOrderQuantity(symbol, allocation_amount)
# Skip if quantity is too small
if quantity <= 0:
self.Debug(f"Skipping buy for {symbol} - calculated quantity too small: {quantity}")
return False
# Calculate actual cost
price = self.Securities[symbol].Price
actual_cost = quantity * price
# Execute market order
order_ticket = self.MarketOrder(symbol, quantity)
# Log the order regardless of status
self.Log(f"Buying {quantity} {symbol} at ${price} for ${actual_cost}")
# Return true to indicate we attempted an order
return True
def ExecuteSell(self, symbol):
"""Execute a sell order for the symbol"""
if not self.Portfolio[symbol].Invested:
return False
# Calculate quantity to sell (20% of holdings)
holdings = self.Portfolio[symbol].Quantity
sell_quantity = holdings * self.sell_percentage
# Ensure we're respecting lot sizes
lot_size = self.GetLotSize(symbol)
sell_quantity = np.floor(sell_quantity / lot_size) * lot_size
# Skip if quantity is too small
if sell_quantity <= 0:
self.Debug(f"Skipping sell for {symbol} - calculated quantity too small: {sell_quantity}")
return False
# Execute market order
price = self.Securities[symbol].Price
order_ticket = self.MarketOrder(symbol, -sell_quantity)
# Log the order regardless of status
self.Log(f"Selling {sell_quantity} {symbol} at ${price} for ${sell_quantity * price}")
# Return true to indicate we attempted an order
return True
def OnEndOfAlgorithm(self):
"""Event fired at the end of the algorithm"""
self.Log(f"Final Portfolio Value: ${self.Portfolio.TotalPortfolioValue}")
# Log performance of each position
for symbol in self.symbols_data:
if self.Portfolio[symbol].Invested:
qty = self.Portfolio[symbol].Quantity
avg_price = self.Portfolio[symbol].AveragePrice
current_price = self.Securities[symbol].Price
profit_loss = (current_price - avg_price) * qty
profit_loss_pct = ((current_price / avg_price) - 1) * 100
self.Log(f"{symbol}: {qty} units, avg price: ${avg_price}, current: ${current_price}")
self.Log(f"P&L: ${profit_loss} ({profit_loss_pct:.2f}%)")