| Overall Statistics |
|
Total Orders 49 Average Win 0.09% Average Loss -0.01% Compounding Annual Return 0.090% Drawdown 16.100% Expectancy 9.104 Start Equity 10000.0 End Equity 10247.54 Net Profit 2.475% Sharpe Ratio -1.153 Sortino Ratio -1.086 Probabilistic Sharpe Ratio 0.000% Loss Rate 12% Win Rate 88% Profit-Loss Ratio 10.55 Alpha 0 Beta 0 Annual Standard Deviation 0.019 Annual Variance 0 Information Ratio 0.042 Tracking Error 0.019 Treynor Ratio 0 Total Fees $49.00 Estimated Strategy Capacity $0 Lowest Capacity Asset HTZ XTDCDCDRI6HX Portfolio Turnover 0.01% |
from datetime import datetime, timedelta
import numpy as np
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Brokerages import *
from QuantConnect.Data import *
from QuantConnect.Indicators import *
from QuantConnect.Orders import OrderStatus
class DCAStockStrategy(QCAlgorithm):
def Initialize(self):
# Strategy configuration parameters
self.mode = self.GetParameter("mode", "training")
# Set brokerage model for Interactive Brokers
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Cash)
self.initial_cash = float(self.GetParameter("initial_cash", "10000"))
self.allocation = float(self.GetParameter("allocation", "0.05")) # 5% of capital
self.sell_percentage = float(self.GetParameter("sell_percentage", "0.2"))
self.trading_symbols = self.GetParameter("trading_symbols", "SIRI,HTZ").split(',')
self.exchange = "SMART" # Interactive Brokers exchange
# Minimum USD required for purchases
self.min_purchase_amount = 100.0
self.min_usd_threshold = 150.0
# Track buying activities and P&L
self.last_buy_date = None
self.last_log_date = {}
self.realized_pnl = 0.0
# Set cash only for backtesting
if self.mode != "live":
self.SetCash("USD", self.initial_cash, 1.0)
# Set data resolution
self.resolution = Resolution.Hour if self.mode == "live" else Resolution.Daily
# Dictionary to store symbols and indicators
self.symbols_data = {}
# Add each stock symbol
for symbol_str in self.trading_symbols:
symbol_str = symbol_str.strip()
try:
symbol = self.AddEquity(symbol_str, self.resolution).Symbol
sma20 = self.SMA(symbol, 20, Resolution.Daily)
sma50 = self.SMA(symbol, 50, Resolution.Daily)
rsi = self.RSI(symbol, 14, MovingAverageType.Simple, Resolution.Daily)
self.symbols_data[symbol] = {
'symbol': symbol,
'sma20': sma20,
'sma50': sma50,
'rsi': rsi,
'price_history': []
}
except Exception as e:
self.Log(f"Error adding symbol {symbol_str}: {str(e)}")
# Set warmup period
self.SetWarmUp(50, Resolution.Daily)
# Schedule daily screening (UTC time adjusted for US markets)
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(10, 0), self.DailyScreening)
def OnData(self, data):
if self.IsWarmingUp:
return
for symbol, symbol_data in self.symbols_data.items():
if data.ContainsKey(symbol) and data[symbol] is not None:
current_price = data[symbol].Close
symbol_data['price_history'].append(current_price)
if len(symbol_data['price_history']) > 30:
symbol_data['price_history'] = symbol_data['price_history'][-30:]
else:
self.Debug(f"No data for {symbol} at {self.Time}")
def DailyScreening(self):
if self.IsWarmingUp:
return
current_date = self.Time.date()
if self.last_buy_date == current_date:
return
usd_available = self.Portfolio.CashBook["USD"].Amount
if usd_available < self.min_usd_threshold:
if self.SellForCash():
self.last_buy_date = current_date
self.Log(f"USD balance after selling: ${usd_available:.2f}")
return
else:
self.Log(f"Skipping DCA: insufficient USD (${usd_available:.2f}) and no profitable positions")
return
ranked_symbols = self.RankSymbols()
for symbol_data in ranked_symbols:
symbol = symbol_data['symbol']
original_data = self.symbols_data[symbol]
if not self.IndicatorsReady(original_data):
continue
if self.ShouldSell(symbol_data):
if self.ExecuteSell(symbol):
self.last_buy_date = current_date
return
for symbol_data in ranked_symbols:
symbol = symbol_data['symbol']
original_data = self.symbols_data[symbol]
if not self.IndicatorsReady(original_data):
continue
if self.ShouldBuy(symbol_data):
if self.ExecuteBuy(symbol):
self.last_buy_date = current_date
return
def SellForCash(self):
usd_available = self.Portfolio.CashBook["USD"].Amount
target_amount = self.min_usd_threshold
if usd_available >= target_amount:
return True
profitable_positions = []
for symbol, data in self.symbols_data.items():
if self.Portfolio[symbol].Invested:
avg_price = self.Portfolio[symbol].AveragePrice
current_price = self.Securities[symbol].Price
profit_pct = (current_price / avg_price) - 1
if profit_pct >= 0.02:
profitable_positions.append({
'symbol': symbol,
'profit_pct': profit_pct,
'quantity': self.Portfolio[symbol].Quantity,
'current_price': current_price
})
profitable_positions.sort(key=lambda x: x['profit_pct'], reverse=True)
for position in profitable_positions:
symbol = position['symbol']
sell_quantity = min(position['quantity'] * self.sell_percentage, position['quantity'])
if self.ExecuteSell(symbol, sell_quantity):
usd_available = self.Portfolio.CashBook["USD"].Amount
self.Log(f"Raised cash by selling {sell_quantity:.4f} {symbol}. USD: ${usd_available:.2f}")
if usd_available >= target_amount:
return True
self.Log(f"Failed to raise USD. Available: ${usd_available:.2f}, Needed: ${target_amount:.2f}")
return False
def IndicatorsReady(self, symbol_data):
return (symbol_data['sma20'].IsReady and
symbol_data['sma50'].IsReady and
symbol_data['rsi'].IsReady)
def RankSymbols(self):
ranked = []
for symbol, data in self.symbols_data.items():
if not self.IndicatorsReady(data):
continue
price = self.Securities[symbol].Price
rsi_score = data['rsi'].Current.Value / 100
if len(data['price_history']) > 0:
min_price = min(data['price_history'])
price_to_min_ratio = price / min_price if min_price > 0 else 1
else:
price_to_min_ratio = 1
sma_signal = 0
if price < data['sma20'].Current.Value:
sma_signal -= 0.3
if price < data['sma50'].Current.Value:
sma_signal -= 0.3
buy_score = rsi_score + (price_to_min_ratio - 1) + sma_signal
ranked.append({
'symbol': symbol,
'buy_score': buy_score,
'rsi': data['rsi'].Current.Value,
'sma20': data['sma20'].Current.Value,
'sma50': data['sma50'].Current.Value,
'price_history': data['price_history'],
'current_price': price
})
return sorted(ranked, key=lambda x: x['buy_score'])
def ShouldBuy(self, symbol_data):
current_price = symbol_data['current_price']
# Check if price is at multiple of 5 (rounded down)
rounded_price = np.floor(current_price / 5) * 5
if abs(current_price - rounded_price) > 0.01: # Allow small price deviation
return False
# Avoid friction stocks (assuming friction means high volatility or penny stocks)
if len(symbol_data['price_history']) > 5:
price_std = np.std(symbol_data['price_history'])
price_mean = np.mean(symbol_data['price_history'])
volatility = price_std / price_mean if price_mean > 0 else 0
if volatility > 0.2 or current_price < 1.0: # High volatility or penny stock
return False
return True
def ShouldSell(self, symbol_data):
symbol = symbol_data['symbol']
if not self.Portfolio[symbol].Invested:
return False
current_price = symbol_data['current_price']
rsi = symbol_data['rsi']
avg_price = self.Portfolio[symbol].AveragePrice
profitable = current_price > avg_price * 1.02
if not profitable:
return False
overbought = rsi > 70
near_highs = False
if len(symbol_data['price_history']) > 5:
min_price = min(symbol_data['price_history'])
max_price = max(symbol_data['price_history'])
price_range = max_price - min_price
if price_range > 0:
relative_position = (current_price - min_price) / price_range
near_highs = relative_position > 0.8
return overbought or near_highs
def GetMinimumOrderSize(self, symbol):
return 10.0 # Minimum order size for stocks
def GetLotSize(self, symbol):
return 1.0 # Stocks trade in whole shares
def CalculateOrderQuantity(self, symbol, dollar_amount):
price = self.Securities[symbol].Price
if price <= 0:
return 0
# Round down to multiple of 5
rounded_price = np.floor(price / 5) * 5
if abs(price - rounded_price) > 0.01:
return 0
raw_quantity = dollar_amount / rounded_price
lot_size = self.GetLotSize(symbol)
quantity = np.floor(raw_quantity / lot_size) * lot_size
if quantity * rounded_price < self.GetMinimumOrderSize(symbol):
return 0
return quantity
def ExecuteBuy(self, symbol):
usd_available = self.Portfolio.CashBook["USD"].Amount
allocation_amount = usd_available * self.allocation
if usd_available < self.min_usd_threshold:
current_date = self.Time.date()
if symbol not in self.last_log_date or self.last_log_date[symbol] != current_date:
self.Debug(f"Not enough USD for {symbol}: ${usd_available:.2f}, need ${self.min_usd_threshold:.2f}")
self.last_log_date[symbol] = current_date
return False
if allocation_amount < self.min_purchase_amount:
allocation_amount = min(usd_available * 0.99, self.min_purchase_amount)
quantity = self.CalculateOrderQuantity(symbol, allocation_amount)
if quantity <= 0:
current_date = self.Time.date()
if symbol not in self.last_log_date or self.last_log_date[symbol] != current_date:
self.Debug(f"Skipping buy for {symbol} - quantity too small: {quantity:.4f}")
self.last_log_date[symbol] = current_date
return False
price = self.Securities[symbol].Price
actual_cost = quantity * price
order_ticket = self.MarketOrder(symbol, quantity)
if order_ticket.Status == OrderStatus.Invalid:
self.Log(f"Buy order for {symbol} failed: {order_ticket.OrderErrorMessage}")
return False
self.Log(f"Buying {quantity:.0f} {symbol} at ${price:.2f} for ${actual_cost:.2f}")
return True
def ExecuteSell(self, symbol, custom_quantity=None, force=False):
if not self.Portfolio[symbol].Invested:
return False
if custom_quantity is not None:
sell_quantity = min(custom_quantity, self.Portfolio[symbol].Quantity)
else:
holdings = self.Portfolio[symbol].Quantity
sell_quantity = holdings * self.sell_percentage
lot_size = self.GetLotSize(symbol)
sell_quantity = np.floor(sell_quantity / lot_size) * lot_size
if sell_quantity <= 0:
self.Debug(f"Skipping sell for {symbol} - quantity too small: {sell_quantity:.4f}")
return False
avg_price = self.Portfolio[symbol].AveragePrice
current_price = self.Securities[symbol].Price
is_profitable = current_price > avg_price * 1.02
if not is_profitable and not force:
self.Debug(f"Skipping sell for {symbol} - not profitable: ${current_price:.2f}, avg ${avg_price:.2f}")
return False
order_ticket = self.MarketOrder(symbol, -sell_quantity)
if order_ticket.Status == OrderStatus.Invalid:
self.Log(f"Sell order for {symbol} failed: {order_ticket.OrderErrorMessage}")
return False
self.Log(f"Selling {sell_quantity:.0f} {symbol} at ${current_price:.2f} for ${(sell_quantity * current_price):.2f}")
profit_loss = (current_price - avg_price) * sell_quantity
self.realized_pnl += profit_loss
return True
def OnEndOfAlgorithm(self):
self.Log(f"Final Portfolio Value: ${self.Portfolio.TotalPortfolioValue:.2f}")
total_unrealized_pnl = 0.0
for symbol in self.symbols_data:
if self.Portfolio[symbol].Invested:
qty = self.Portfolio[symbol].Quantity
avg_price = self.Portfolio[symbol].AveragePrice
current_price = self.Securities[symbol].Price
unrealized_pnl = (current_price - avg_price) * qty
total_unrealized_pnl += unrealized_pnl
profit_loss_pct = ((current_price / avg_price) - 1) * 100
self.Log(f"{symbol}: {qty:.0f} units, avg price: ${avg_price:.2f}, current: ${current_price:.2f}")
self.Log(f"Unrealized P&L: ${unrealized_pnl:.2f} ({profit_loss_pct:.2f}%)")
total_pnl = self.realized_pnl + total_unrealized_pnl
initial_value = self.initial_cash
total_pnl_pct = ((self.Portfolio.TotalPortfolioValue / initial_value) - 1) * 100
self.Log(f"Realized P&L: ${self.realized_pnl:.2f}")
self.Log(f"Total Unrealized P&L: ${total_unrealized_pnl:.2f}")
self.Log(f"Total P&L: ${total_pnl:.2f} ({total_pnl_pct:.2f}%)")
self.Log("Note: Unrealized P&L reflects current market prices and may fluctuate.")