from AlgorithmImports import *
from QuantConnect.DataSource import *
import json
from datetime import datetime, timedelta
from io import StringIO
class IBMTiingoNewsCollector(QCAlgorithm):
def Initialize(self) -> None:
# Set date range to match your sentiment analysis period
self.SetStartDate(2005, 1, 1)
self.SetEndDate(2024, 2, 29)
self.SetCash(100000)
# Add IBM equity
self.ibm = self.AddEquity("IBM", Resolution.Daily).Symbol
# Request Tiingo news data
self.tiingo_symbol = self.AddData(TiingoNews, self.ibm).Symbol
# Initialize news collection dictionary
self.news_data = []
# Load earnings dates to track news around earnings
self.earnings_dates = self.LoadEarningsData()
# Dictionary to track news counts by date for logging
self.daily_news_count = {}
# Record the last save time to periodically save data
self.last_save_time = datetime.min
# Dictionary for simple sentiment scoring
self.word_scores = {
# Positive words
'good': 1, 'great': 1, 'best': 1, 'growth': 1, 'positive': 1,
'increase': 1, 'up': 0.5, 'higher': 1, 'beat': 1, 'exceed': 1,
'profit': 1, 'gain': 1, 'improved': 1, 'strong': 1, 'success': 1,
# Negative words
'bad': -1, 'terrible': -1, 'worst': -1, 'loss': -1, 'negative': -1,
'decrease': -1, 'down': -0.5, 'lower': -1, 'miss': -1, 'below': -1,
'weak': -1, 'poor': -1, 'decline': -1, 'disappoint': -1, 'fail': -1,
# Earnings related words
'earnings': 0.2, 'revenue': 0.2, 'quarter': 0.2, 'report': 0.2,
'guidance': 0.2, 'outlook': 0.2, 'forecast': 0.2
}
# Log initialization
self.Log("IBM Tiingo News Collector initialized")
def LoadEarningsData(self):
"""Load earnings dates from ObjectStore or use defaults"""
earnings_dates = []
# Try to load from ObjectStore
if self.ObjectStore.ContainsKey("team6/ibm_earnings_history_final.csv"):
try:
csv_data = self.ObjectStore.Read("team6/ibm_earnings_history_final.csv")
lines = csv_data.strip().split('\n')
headers = lines[0].split(',')
report_date_index = headers.index('report_date')
for i in range(1, len(lines)):
values = lines[i].split(',')
if len(values) > report_date_index:
date_str = values[report_date_index]
try:
earnings_date = datetime.strptime(date_str, '%Y-%m-%d')
earnings_dates.append(earnings_date)
except ValueError:
self.Log(f"Error parsing date: {date_str}")
self.Log(f"Loaded {len(earnings_dates)} earnings dates from ObjectStore")
return earnings_dates
except Exception as e:
self.Log(f"Error loading earnings dates: {str(e)}")
# If loading fails, use quarterly approximations
self.Log("Using default quarterly earnings dates")
start_date = self.StartDate
end_date = self.EndDate
current_date = start_date
while current_date < end_date:
for month in [1, 4, 7, 10]:
earnings_date = datetime(current_date.year, month, 15)
if start_date <= earnings_date <= end_date:
earnings_dates.append(earnings_date)
current_date = datetime(current_date.year + 1, 1, 1)
return earnings_dates
def OnData(self, slice: Slice) -> None:
# Check for new Tiingo news
if slice.ContainsKey(self.tiingo_symbol):
# Get the news item
news = slice[self.tiingo_symbol]
# Skip if missing crucial data
if not news.description or not news.published_date:
return
# Calculate distance to nearest earnings date
days_to_earnings = self.GetDaysToEarnings(news.published_date)
# Calculate a simple sentiment score
sentiment_score = self.CalculateSentiment(news.description)
# Store the news data with enriched metadata
news_item = {
'id': news.article_id,
'source': news.source,
'title': news.title,
'description': news.description,
'published': news.published_date.strftime('%Y-%m-%d %H:%M:%S'),
'symbols': ','.join([str(s) for s in news.symbols]),
'url': news.url,
'word_count': len(news.description.split()) if news.description else 0,
'sentiment_score': sentiment_score,
'days_to_earnings': days_to_earnings,
'collected_at': self.Time.strftime('%Y-%m-%d %H:%M:%S')
}
# Add to our collection
self.news_data.append(news_item)
# Track news counts by date for logging
date_key = news.published_date.strftime('%Y-%m-%d')
self.daily_news_count[date_key] = self.daily_news_count.get(date_key, 0) + 1
# Log periodic updates
if len(self.news_data) % 100 == 0:
self.Log(f"Collected {len(self.news_data)} news articles so far")
# Save data periodically
if (self.Time - self.last_save_time).days >= 30:
self.SaveNewsData("interim")
self.last_save_time = self.Time
def CalculateSentiment(self, text):
"""Calculate a simple sentiment score for the news item"""
if not text:
return 0
text = text.lower()
score = 0
total_words = len(text.split())
# Count sentiment words
for word, word_score in self.word_scores.items():
# Count occurrences of the word
count = text.count(word)
if count > 0:
score += word_score * count
# Normalize by text length to avoid bias towards longer articles
if total_words > 0:
normalized_score = score / (total_words ** 0.5)
return normalized_score
return score
def GetDaysToEarnings(self, news_date):
"""Calculate days to nearest earnings date"""
if not self.earnings_dates:
return None
# Find closest earnings date
news_date = news_date.replace(tzinfo=None)
closest_date = min(self.earnings_dates, key=lambda d: abs((d - news_date).days))
days_diff = (closest_date - news_date).days
return days_diff
def OnEndOfAlgorithm(self):
"""Save final data at the end of the algorithm"""
self.SaveNewsData("final")
# Log summary statistics
self.LogSummaryStatistics()
def SaveNewsData(self, version="final"):
"""Save collected news data to CSV in ObjectStore"""
if not self.news_data:
self.Log("No news data to save")
return
try:
# Convert to string format for CSV storage
output = StringIO()
# Write headers
if self.news_data:
headers = list(self.news_data[0].keys())
output.write(','.join([f'"{h}"' for h in headers]) + '\n')
# Write data rows
for item in self.news_data:
row = []
for key in headers:
# Handle special characters and quotes in CSV
value = str(item.get(key, ''))
value = value.replace('"', '""') # Escape quotes
row.append(f'"{value}"')
output.write(','.join(row) + '\n')
# Save to ObjectStore
filename = f"team6/ibm_news_data_{version}.csv"
self.ObjectStore.Save(filename, output.getvalue())
self.Log(f"Saved {len(self.news_data)} news items to {filename}")
except Exception as e:
self.Error(f"Error saving news data: {str(e)}")
def LogSummaryStatistics(self):
"""Log summary statistics about the collected news data"""
if not self.news_data:
return
self.Log(f"\nNews Collection Complete - {len(self.news_data)} articles")
# Count by year and month
year_month_counts = {}
for item in self.news_data:
try:
date = datetime.strptime(item['published'], '%Y-%m-%d %H:%M:%S')
key = f"{date.year}-{date.month:02d}"
year_month_counts[key] = year_month_counts.get(key, 0) + 1
except:
continue
# Sort and display
self.Log("\nArticles by Year-Month:")
for key in sorted(year_month_counts.keys()):
self.Log(f"{key}: {year_month_counts[key]} articles")
# Articles around earnings
earnings_news = [item for item in self.news_data if item.get('days_to_earnings') is not None and abs(item['days_to_earnings']) <= 10]
self.Log(f"\nArticles within 10 days of earnings: {len(earnings_news)}")
# Sentiment distribution
pos_news = [item for item in self.news_data if item.get('sentiment_score', 0) > 0.2]
neg_news = [item for item in self.news_data if item.get('sentiment_score', 0) < -0.2]
neutral_news = [item for item in self.news_data if -0.2 <= item.get('sentiment_score', 0) <= 0.2]
self.Log("\nSentiment Distribution:")
self.Log(f"Positive: {len(pos_news)} ({len(pos_news)/len(self.news_data)*100:.1f}%)")
self.Log(f"Neutral: {len(neutral_news)} ({len(neutral_news)/len(self.news_data)*100:.1f}%)")
self.Log(f"Negative: {len(neg_news)} ({len(neg_news)/len(self.news_data)*100:.1f}%)")