reachify-ai-service / core /anomaly_detector.py
amitbhatt6075's picture
Complete fresh start - FINAL UPLOAD
0914e96
raw
history blame
1.55 kB
# File: ai-service/core/anomaly_detector.py (NEW FILE)
import pandas as pd
from statsmodels.tsa.seasonal import seasonal_decompose
def find_anomalies(influencer_id: str, historical_data: pd.DataFrame, today_stats: dict) -> list[str]:
insights = []
df = historical_data.copy()
if len(df) < 30: # Need at least 30 days of data for meaningful analysis
return ["Not enough historical data to analyze trends yet."]
df.set_index('date', inplace=True)
# Calculate 90-day averages
avg_engagement_90d = df['avg_engagement_rate'].tail(90).mean()
today_engagement = today_stats.get('avg_engagement_rate', 0)
# Anomaly 1: Performance Spikes/Dips
percentage_change = ((today_engagement - avg_engagement_90d) / avg_engagement_90d) * 100
if percentage_change > 100:
insights.append(f"πŸš€ High Performer Alert: Engagement rate spiked to {today_engagement:.2f}%, which is {percentage_change:.0f}% above the 90-day average. A recent post may be going viral.")
elif percentage_change < -50:
insights.append(f"⚠️ Performance Dip Alert: Engagement has dropped by {abs(percentage_change):.0f}%. It's worth checking in with this influencer.")
# Anomaly 2: Follower Growth
follower_change = today_stats.get('follower_count', 0) - df['follower_count'].tail(7).iloc[0]
if follower_change > 5000: # Example threshold
insights.append(f"πŸ“ˆ Follower Growth Spike: Gained {follower_change} followers this week. This is unusually high.")
return insights