reachify-ai-service / scripts /export_thunderbird_training_data.py
amitbhatt6075's picture
feat(thunderbird): Add market intelligence module with new model and APIs
01c71d2
raw
history blame
6.19 kB
import os
import pandas as pd
from datetime import datetime, timedelta
from dotenv import load_dotenv
from supabase import create_client, Client
from pytrends.request import TrendReq
import time
import random
# --- CONFIGURATION (No changes) ---
load_dotenv()
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_KEY")
if not SUPABASE_URL or not SUPABASE_KEY:
raise ValueError("Supabase URL and Service Key must be set.")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
NICHES_TO_TRACK = ["fashion", "gaming", "fitness", "skincare", "finance"]
MONTHS_TO_FETCH = 12
OUTPUT_FILE = os.path.join(os.path.dirname(__file__), '..', 'data', 'thunderbird_market_trends.csv')
# --- get_successful_campaign_counts() --- (No changes needed, this function is correct)
def get_successful_campaign_counts() -> pd.DataFrame:
print("πŸš€ Fetching successful campaign data from Supabase...")
end_date = datetime.now()
start_date = end_date - timedelta(days=MONTHS_TO_FETCH * 30)
try:
response = supabase.table('campaigns').select('id, title, description, created_at') \
.eq('status', 'completed') \
.gte('created_at', start_date.isoformat()) \
.lte('created_at', end_date.isoformat()) \
.execute()
if not response.data:
print("⚠️ No campaign data found in the specified date range.")
return pd.DataFrame()
df = pd.DataFrame(response.data)
df['created_at'] = pd.to_datetime(df['created_at'])
df['month'] = df['created_at'].dt.to_period('M')
def assign_niche(row):
text_to_search = f"{row.get('title', '')} {row.get('description', '')}".lower()
for niche in NICHES_TO_TRACK:
if niche in text_to_search:
return niche
return "general"
df['niche'] = df.apply(assign_niche, axis=1)
monthly_counts = df.groupby(['month', 'niche']).size().reset_index(name='successful_campaigns')
print(f"βœ… Found and processed {len(df)} successful campaigns.")
return monthly_counts
except Exception as e:
print(f"❌ Error fetching data from Supabase: {e}")
return pd.DataFrame()
# --- get_google_trends_data() --- (UPDATED)
def get_google_trends_data() -> pd.DataFrame:
print("\nπŸš€ Fetching historical market interest from Google Trends (Robust Mode)...")
# Increase retries and backoff for more resilience
pytrends = TrendReq(hl='en-US', tz=360, retries=5, backoff_factor=1)
end_date = datetime.now()
start_date = end_date - timedelta(days=MONTHS_TO_FETCH * 30)
timeframe = f"{start_date.strftime('%Y-%m-%d')} {end_date.strftime('%Y-%m-%d')}"
all_trends_df = pd.DataFrame()
for niche in NICHES_TO_TRACK:
print(f" - Fetching trend data for '{niche}'...")
try:
pytrends.build_payload([niche], cat=0, timeframe=timeframe, geo='', gprop='')
interest_over_time_df = pytrends.interest_over_time()
if not interest_over_time_df.empty and niche in interest_over_time_df:
interest_over_time_df = interest_over_time_df.rename(columns={niche: 'trend_score'})
interest_over_time_df['niche'] = niche
all_trends_df = pd.concat([all_trends_df, interest_over_time_df[['trend_score', 'niche']]])
else:
print(f" - ℹ️ No trend data returned for '{niche}'.")
# --- THE FIX: LONGER, MORE RANDOM DELAY ---
sleep_time = random.uniform(5, 12) # Wait for 5 to 12 seconds
print(f" - 😴 Sleeping for {sleep_time:.2f} seconds...")
time.sleep(sleep_time)
# ----------------------------------------
except Exception as e:
# We specifically catch the 429 error text
if "response with code 429" in str(e) or "too many 429 error responses" in str(e):
print(f" - πŸ›‘ Hit rate limit hard for '{niche}'. Taking a long 2-minute break...")
time.sleep(120) # Take a long break if we still get blocked
else:
print(f" - ⚠️ A non-rate-limit error occurred for '{niche}'. Error: {e}")
continue
if all_trends_df.empty:
print("⚠️ Warning: Could not fetch any data from Google Trends. Proceeding without trend scores.")
return pd.DataFrame()
all_trends_df['month'] = all_trends_df.index.to_period('M')
monthly_trends = all_trends_df.groupby(['month', 'niche'])['trend_score'].mean().reset_index()
print(f"βœ… Successfully fetched and processed Google Trends data.")
return monthly_trends
# --- main() function --- (UPDATED)
def main():
"""Main function to run the script."""
print("--- Starting Project Thunderbird Data Export ---")
campaign_df = get_successful_campaign_counts()
if campaign_df.empty:
print("\n❌ No campaign data found. Aborting training file creation.")
return
trends_df = get_google_trends_data()
# --- THE FIX: USE A 'LEFT' MERGE ---
if not trends_df.empty:
print("\nπŸš€ Merging campaign success data with market trend data...")
training_df = pd.merge(campaign_df, trends_df, on=['month', 'niche'], how='left')
# Fill any missing trend scores with 0
training_df['trend_score'].fillna(0, inplace=True)
else:
print("\n⚠️ No trends data was fetched. Creating training file with only campaign data.")
training_df = campaign_df
training_df['trend_score'] = 0 # Add the column so our model doesn't break
# ----------------------------------
# Convert Period to string for CSV
training_df['month'] = training_df['month'].astype(str)
# Save the final dataframe to a CSV file
try:
training_df.to_csv(OUTPUT_FILE, index=False)
print(f"\nβœ… Success! Training data has been saved to:")
print(f" {OUTPUT_FILE}")
except Exception as e:
print(f"\n❌ Error saving training data to CSV: {e}")
if __name__ == "__main__":
main()