|
|
|
|
|
|
|
|
|
|
|
import pandas as pd |
|
|
from sqlalchemy import create_engine, text |
|
|
from datetime import date, timedelta |
|
|
import os, sys |
|
|
|
|
|
|
|
|
|
|
|
def take_daily_snapshot(): |
|
|
print("--- πΈ Starting Daily Influencer Stat Snapshot Process ---") |
|
|
db_url = os.getenv("DATABASE_URL") |
|
|
if not db_url: return |
|
|
|
|
|
engine = create_engine(db_url) |
|
|
today = date.today() |
|
|
yesterday = today - timedelta(days=1) |
|
|
|
|
|
|
|
|
sql = text(""" |
|
|
WITH daily_agg AS ( |
|
|
SELECT |
|
|
cs.influencer_id, |
|
|
AVG(cs.likes) as avg_likes, |
|
|
AVG(cs.comments) as avg_comments, |
|
|
AVG(cs.engagement_rate) as avg_engagement_rate, |
|
|
-- Get the latest follower count for the influencer |
|
|
(SELECT ip.follower_count FROM public.influencer_profiles ip WHERE ip.profile_id = cs.influencer_id LIMIT 1) as follower_count |
|
|
FROM public.campaign_submissions cs |
|
|
WHERE DATE(cs.created_at) = :yesterday_date |
|
|
GROUP BY cs.influencer_id |
|
|
) |
|
|
INSERT INTO public.daily_influencer_stats (profile_id, date, avg_likes, avg_comments, avg_engagement_rate, follower_count) |
|
|
SELECT influencer_id, :yesterday_date, avg_likes, avg_comments, avg_engagement_rate, follower_count |
|
|
FROM daily_agg |
|
|
ON CONFLICT (profile_id, date) DO NOTHING; -- Avoid duplicates |
|
|
""") |
|
|
|
|
|
try: |
|
|
with engine.connect() as connection: |
|
|
result = connection.execute(sql, {'yesterday_date': yesterday}) |
|
|
print(f"β
Snapshot complete. {result.rowcount} influencers' stats were updated for {yesterday}.") |
|
|
except Exception as e: |
|
|
print(f"π΄ ERROR during daily snapshot: {e}") |
|
|
|
|
|
if __name__ == '__main__': |
|
|
take_daily_snapshot() |