Expert Bot Question - News Trading Strategy Review

Strategy Overview & Goals

I'm developing a news-based trading strategy for QuantConnect with the following objectives:

Primary Goal: News Arbitrage Trading

•Monitor 800+ ultra low float stocks (≤10M shares outstanding)

•Send immediate Telegram alerts when news breaks for any monitored stock

•Focus on speculative trading where small volume can move prices significantly

•Price range: $1-$50, Market cap: $2M-$200M

Key Requirements:

1.Fast startup using ObjectStore caching

2.Weekly universe updates (every Monday)

3.Real-time news monitoring with BenzingaNews

4.Instant alerts with stock details and news links

5.Enhanced alerts with live stock data (price, volume, change %)

Current Implementation Questions

1. Architecture & Performance:

•Is my current approach optimal for 800+ symbols with news subscriptions?

•Are there any performance bottlenecks or QuantConnect limits I should be aware of?

•Is the ObjectStore implementation correct for weekly universe caching?

2. News Processing:

•Am I handling BenzingaNews correctly for real-time processing?

•How can I add news URLs and detailed content to alerts?

•Is the deduplication logic (deque with 10K limit) sufficient?

3. Stock Data Integration:

•How can I fetch live stock data (price, volume, % change) when news breaks?

•What's the best way to get real-time fundamental data for alert enhancement?

•Should I use TradeBar data or SecurityHolding for current prices?

4. Alert Enhancement:

•Can I improve the Telegram alert format with more stock details?

•How to add clickable news links and stock information?

•Is there a way to include pre-market/after-hours data in alerts?

5. Error Handling & Reliability:

•Are there any obvious bugs or issues in my current implementation?

•Is the timezone handling (UTC to Saudi Arabia) implemented correctly?

•Are the Telegram notifications using the correct API parameters?

Current Code Implementation

Python

 

 

# news_monitor_optimized.py # مراقب الأخبار المحسّن - بناءً على أفضل الممارسات من QuantConnect from AlgorithmImports import * import json import pytz from datetime import timedelta from collections import deque class OptimizedNewsMonitor(QCAlgorithm): """ مراقب أخبار محسّن يطبق أفضل الممارسات: 1. تحميل سريع من ObjectStore 2. تحديث أسبوعي للكون 3. معالجة فعالة للأخبار 4. تنبيهات فورية عبر Telegram """ def initialize(self): """تهيئة الخوارزمية""" self.set_start_date(2023, 1, 1) self.set_cash(100_000) # ==================== الإعدادات ==================== self.UNIVERSE_SIZE = 800 self.STORE_KEY = "news_monitor_universe_v1" self.NEWS_MAX_AGE_MINUTES = 5 self.MAX_NEWS_CACHE = 10_000 # معايير الفلترة - محسّنة للمضاربة self.MIN_PRICE = 1.0 self.MAX_PRICE = 50.0 self.MAX_SHARES_OUTSTANDING = 10_000_000 # 10 مليون فقط - Ultra Low Float self.MIN_MARKET_CAP = 2_000_000 # $2M - أصغر للمضاربة self.MAX_MARKET_CAP = 200_000_000 # $200M - نطاق أوسع # إعدادات Telegram self.TELEGRAM_BOT_TOKEN = "7944989941:AAHzGOHL1rMNP8fz_mt_KSeaOTebxx3MYzQ" self.TELEGRAM_CHAT_ID = "-1002989440680" self.TELEGRAM_SEND_INTERVAL = 3 # إعدادات الأخبار self.NEWS_MAX_AGE_MINUTES = 5 # تجاهل الأخبار الأقدم من 5 دقائق # ==================== المتغيرات الداخلية ==================== self.universe_symbols = [] self.universe_tickers = set() self.symbols_from_store = False self.refresh_flag = False self.news_subscriptions = set() self.processed_news_ids = deque(maxlen=self.MAX_NEWS_CACHE) self.last_telegram_send = None self.news_count_today = 0 # المناطق الزمنية والإيموجي self.utc_tz = pytz.UTC self.saudi_tz = pytz.timezone("Asia/Riyadh") self.setup_emojis() # ==================== التهيئة ==================== # 1. تحميل سريع من ObjectStore self.load_universe_from_store() # 2. إعداد Universe Selection self.universe_settings.resolution = Resolution.MINUTE self.add_universe(self.coarse_selection, self.fine_selection) # 3. جدولة التحديث الأسبوعي self.schedule.on( self.date_rules.every(DayOfWeek.MONDAY), self.time_rules.before_market_open("SPY", 30), self.flag_universe_refresh ) self.log("=" * 60) self.log("[Initialize] Optimized News Monitor started") self.log(f"[Initialize] Target universe size: {self.UNIVERSE_SIZE}") self.log(f"[Initialize] Loaded from store: {self.symbols_from_store}") self.log("=" * 60) def setup_emojis(self): """إعداد الإيموجي باستخدام Unicode escape sequences""" self.EMOJI_NEWS = "\U0001F4F0" # News self.EMOJI_ROCKET = "\U0001F680" # Rocket self.EMOJI_CHECK = "\u2705" # Check self.EMOJI_CHART = "\U0001F4CA" # Chart self.EMOJI_CLOCK = "\U0001F552" # Clock self.EMOJI_MONEY = "\U0001F4B0" # Money self.EMOJI_LINK = "\U0001F517" # Link self.EMOJI_WARNING = "\u26A0\uFE0F" # Warning self.EMOJI_INFO = "\u2139\uFE0F" # Info # ==================== ObjectStore Management ==================== def load_universe_from_store(self): """تحميل سريع للكون من ObjectStore""" if not self.object_store.contains_key(self.STORE_KEY): self.log("[ObjectStore] No cached universe found") return try: raw_data = self.object_store.read(self.STORE_KEY) data = json.loads(raw_data) tickers = data.get('tickers', []) last_update = data.get('last_update', 'unknown') if not tickers: self.log("[ObjectStore] Empty universe in cache") return self.log(f"[ObjectStore] Loaded {len(tickers)} symbols from cache") self.log(f"[ObjectStore] Last update: {last_update}") self.symbols_from_store = True self.universe_tickers = set(tickers) # إضافة الأسهم فوراً للتحميل السريع for ticker in tickers: try: symbol = self.add_equity(ticker, Resolution.MINUTE).symbol self.universe_symbols.append(symbol) self.subscribe_news(symbol) except Exception as e: self.debug(f"[ObjectStore] Failed to add {ticker}: {e}") except Exception as e: self.error(f"[ObjectStore] Load failed: {e}") def save_universe_to_store(self, tickers): """حفظ الكون في ObjectStore""" try: data = { 'tickers': tickers, 'last_update': self.time.strftime("%Y-%m-%d %H:%M:%S"), 'count': len(tickers) } self.object_store.save(self.STORE_KEY, json.dumps(data)) self.log(f"[ObjectStore] Saved {len(tickers)} symbols to cache") except Exception as e: self.error(f"[ObjectStore] Save failed: {e}") # ==================== Universe Selection ==================== def flag_universe_refresh(self): """تفعيل علامة التحديث الأسبوعي""" self.refresh_flag = True self.log("[Schedule] Universe refresh flagged for next selection") def coarse_selection(self, coarse): """الفلترة الأولية""" # تخطي العمل إلا إذا كان التحديث مطلوب if not self.refresh_flag and self.universe_symbols: return self.universe_symbols # فلترة أساسية liquid = [c for c in coarse if c.has_fundamental_data and self.MIN_PRICE <= c.price <= self.MAX_PRICE and c.dollar_volume > 500_000] # ترتيب حسب السيولة liquid.sort(key=lambda c: c.dollar_volume, reverse=True) # إرسال أفضل 2000 للمرحلة التالية selected = [c.symbol for c in liquid[:2000]] self.log(f"[CoarseSelection] Filtered {len(selected)} from {len(coarse)} symbols") return selected def fine_selection(self, fine): """الفلترة النهائية""" if not self.refresh_flag and self.universe_symbols: return self.universe_symbols filtered = [] for f in fine: try: # الحصول على البيانات الأساسية shares = f.earning_reports.basic_average_shares.three_months mcap = f.market_cap # تطبيق المعايير if (shares and shares <= self.MAX_SHARES_OUTSTANDING and self.MIN_MARKET_CAP <= mcap <= self.MAX_MARKET_CAP): filtered.append((f.symbol, shares)) except Exception: continue # ترتيب حسب عدد الأسهم (الأقل أولاً - low float) filtered.sort(key=lambda x: x[1]) # أخذ أفضل الأسهم symbols = [item[0] for item in filtered[:self.UNIVERSE_SIZE]] tickers = [s.value for s in symbols] # تحديث الكاش self.universe_symbols = symbols self.universe_tickers = set(tickers) self.save_universe_to_store(tickers) self.refresh_flag = False self.log(f"[FineSelection] Selected {len(symbols)} final symbols") return symbols # ==================== News Management ==================== def subscribe_news(self, symbol): """اشتراك في أخبار السهم""" if symbol in self.news_subscriptions: return try: self.add_data(BenzingaNews, symbol, Resolution.MINUTE) self.news_subscriptions.add(symbol) except Exception as e: self.debug(f"[News] Subscription failed for {symbol}: {e}") def unsubscribe_news(self, symbol): """إلغاء اشتراك أخبار السهم""" if symbol not in self.news_subscriptions: return try: self.remove_security(symbol) self.news_subscriptions.remove(symbol) except Exception as e: self.debug(f"[News] Unsubscribe failed for {symbol}: {e}") def on_securities_changed(self, changes): """معالجة تغييرات الأسهم""" # إضافة اشتراكات الأخبار للأسهم الجديدة for sec in changes.added_securities: self.subscribe_news(sec.symbol) self.log(f"[Monitor] Added news subscription for {sec.symbol.value}") # إزالة اشتراكات الأخبار للأسهم المحذوفة for sec in changes.removed_securities: self.unsubscribe_news(sec.symbol) self.log(f"[Monitor] Removed news subscription for {sec.symbol.value}") # إرسال رسالة البدء عند اكتمال التحميل if len(changes.added_securities) > 0 and len(self.news_subscriptions) >= 100: self.send_startup_message() # ==================== News Processing ==================== def on_data(self, slice): """معالجة البيانات الواردة""" news_items = slice.get(BenzingaNews) if not news_items: return for article in news_items.values(): # 1. تجنب المعالجة المكررة if article.id in self.processed_news_ids: continue # 2. تجاهل الأخبار القديمة if (self.time - article.end_time) > timedelta(minutes=self.NEWS_MAX_AGE_MINUTES): continue # 3. التأكد أن الخبر يخص سهم من الكون article_tickers = getattr(article, 'tickers', []) if not any(ticker in self.universe_tickers for ticker in article_tickers): continue # 4. معالجة الخبر self.process_news_article(article) # 5. إضافة للكاش self.processed_news_ids.append(article.id) self.news_count_today += 1 def process_news_article(self, article): """معالجة مقال إخباري""" try: # العثور على الأسهم المرتبطة related_tickers = [] article_tickers = getattr(article, 'tickers', []) for ticker in article_tickers: if ticker in self.universe_tickers: related_tickers.append(ticker) if not related_tickers: return # إرسال التنبيه self.send_news_alert(related_tickers, article) # تسجيل الخبر tickers_str = ', '.join(related_tickers) self.log(f"[News] Processed: {tickers_str} | {article.title[:50]}...") except Exception as e: self.error(f"[News] Processing error: {e}") # ==================== Telegram Alerts ==================== def send_news_alert(self, tickers, article): """إرسال تنبيه الخبر""" try: # فحص Rate limiting if not self.can_send_telegram(): self.log(f"[Alert] Rate limit - skipping alert for {', '.join(tickers)}") return False # بناء الرسالة message = self.build_news_message(tickers, article) # إرسال التنبيه success = self.notify.telegram( self.TELEGRAM_BOT_TOKEN, self.TELEGRAM_CHAT_ID, message ) if success: self.last_telegram_send = self.time self.log(f"[Alert] News alert sent for {', '.join(tickers)}") else: self.error(f"[Alert] Failed to send alert for {', '.join(tickers)}") return success except Exception as e: self.error(f"[Alert] Exception sending alert: {e}") return False def build_news_message(self, tickers, article): """بناء رسالة التنبيه""" # الوقت السعودي saudi_time = self.get_saudi_time_string() # معلومات الأسهم tickers_str = ', '.join(tickers) tickers_count = len(tickers) # خط فاصل separator = "\u2501" * 20 # separator line # بناء الرسالة message = f"""{self.EMOJI_NEWS} خبر جديد {separator} {self.EMOJI_CHART} الأسهم: {tickers_str} {self.EMOJI_INFO} عدد الأسهم: {tickers_count} {self.EMOJI_CLOCK} الوقت: {saudi_time} {self.EMOJI_NEWS} الخبر: {article.title}""" # إضافة الرابط إذا كان متوفراً if hasattr(article, 'source_url') and article.source_url: message += f"\n\n{self.EMOJI_LINK} اقرأ المزيد: {article.source_url}" # التوقيع message += f"\n\n{separator}\n{self.EMOJI_NEWS} News Monitor Pro" return message def send_startup_message(self): """إرسال رسالة بدء التشغيل""" try: saudi_time = self.get_saudi_time_string() separator = "\u2501" * 20 # separator line message = f"""{self.EMOJI_ROCKET} مراقب الأخبار المحسّن {separator} {self.EMOJI_CHECK} الحالة: يعمل بنجاح {self.EMOJI_CHART} الإعدادات: - عدد الأسهم: {len(self.universe_tickers)} سهم - نطاق السعر: ${self.MIN_PRICE} - ${self.MAX_PRICE} - الحد الأقصى للأسهم الحرة: {self.MAX_SHARES_OUTSTANDING:,} (Ultra Low Float) - تحديث الكون: أسبوعي {self.EMOJI_INFO} الميزات: - تحميل سريع من الكاش - معالجة فعالة للأخبار - تجنب الأخبار المكررة - تنبيهات فورية {self.EMOJI_CLOCK} وقت البدء: {saudi_time} {self.EMOJI_NEWS} النظام الآن يراقب الأخبار... {separator} {self.EMOJI_NEWS} News Monitor Pro""" success = self.notify.telegram( self.TELEGRAM_BOT_TOKEN, self.TELEGRAM_CHAT_ID, message ) if success: self.last_telegram_send = self.time self.log("[Startup] Startup message sent successfully") else: self.error("[Startup] Failed to send startup message") except Exception as e: self.error(f"[Startup] Exception sending startup message: {e}") # ==================== Helper Functions ==================== def can_send_telegram(self): """فحص إمكانية الإرسال""" if not self.last_telegram_send: return True elapsed = (self.time - self.last_telegram_send).total_seconds() return elapsed >= self.TELEGRAM_SEND_INTERVAL def get_saudi_time_string(self): """تحويل الوقت إلى توقيت مكة المكرمة مع تنسيق 12 ساعة""" try: utc_time = self.utc_time if utc_time.tzinfo is None: utc_time = utc_time.replace(tzinfo=self.utc_tz) saudi_time = utc_time.astimezone(self.saudi_tz) # تنسيق 12 ساعة hour = saudi_time.hour minute = saudi_time.minute second = saudi_time.second if hour == 0: hour_12 = 12 period = "AM" elif hour < 12: hour_12 = hour period = "AM" elif hour == 12: hour_12 = 12 period = "PM" else: hour_12 = hour - 12 period = "PM" return f"{saudi_time.year}-{saudi_time.month:02d}-{saudi_time.day:02d} {hour_12:02d}:{minute:02d}:{second:02d} {period}" except Exception as e: self.error(f"Error getting Saudi time: {e}") return "N/A" def on_end_of_day(self): """نهاية اليوم""" self.log("=" * 60) self.log(f"[EndOfDay] Daily Summary:") self.log(f"[EndOfDay] Universe size: {len(self.universe_tickers)}") self.log(f"[EndOfDay] News subscriptions: {len(self.news_subscriptions)}") self.log(f"[EndOfDay] News processed today: {self.news_count_today}") self.log(f"[EndOfDay] News cache size: {len(self.processed_news_ids)}") self.log(f"[EndOfDay] Loaded from store: {self.symbols_from_store}") self.log("=" * 60) # إعادة تعيين العدادات اليومية self.news_count_today = 0

Specific Enhancement Requests

1. Enhanced Alert Format:

I want to enhance alerts to include live stock data when news breaks:

Current Alert:

Plain Text

 

 

📰 خبر جديد ━━━━━━━━━━━━━━━━━━━━ 📊 الأسهم: AAPL ℹ️ عدد الأسهم: 1 🕐 الوقت: 2025-10-05 09:30:45 PM 📰 الخبر: Apple announces new product 🔗 اقرأ المزيد: https://...

Desired Enhanced Alert:

Plain Text

 

 

📰 خبر جديد - AAPL ━━━━━━━━━━━━━━━━━━━━ 💰 السعر: $150.25 (+2.5%) 📊 الحجم: 1.2M 🔥 الأسهم الحرة: 8.5M 📈 التغيير: +$3.75 🕐 الوقت: 2025-10-05 09:30:45 PM 📰 الخبر: Apple announces new product 🔗 اقرأ المزيد: https://... ━━━━━━━━━━━━━━━━━━━━ 📰 News Monitor Pro

2. Questions for Expert Review:

1.How can I fetch real-time stock data (price, volume, % change) in the alert function?

2.What's the best way to get current market data for symbols when news breaks?

3.Should I use slice.bars, securities[symbol].price, or another method?

4.How can I calculate percentage change from previous close?

5.Can I access pre-market/after-hours data for enhanced alerts?

6.Are there any performance implications of fetching live data for 800+ symbols?

7.Is my current universe selection and filtering approach optimal?

8.Any obvious bugs or improvements you can suggest?

Expected Expert Feedback

Please review the code and provide recommendations on:

•Architecture improvements

•Performance optimizations

•Bug fixes

•Enhanced alert implementation

•Best practices for news trading strategies

•Real-time data integration methods

Thank you for your expert analysis