Files changed (1) hide show
  1. safeguard_filters.py +500 -0
safeguard_filters.py ADDED
@@ -0,0 +1,500 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Helion-V1.5-XL Safety and Safeguard Filters
3
+ Implementation of comprehensive content filtering and safety mechanisms
4
+ """
5
+
6
+ import re
7
+ import json
8
+ import logging
9
+ from typing import Dict, List, Tuple, Optional, Any
10
+ from dataclasses import dataclass
11
+ from enum import Enum
12
+ import hashlib
13
+
14
+
15
+ class SeverityLevel(Enum):
16
+ """Safety violation severity levels"""
17
+ LOW = "low"
18
+ MEDIUM = "medium"
19
+ HIGH = "high"
20
+ CRITICAL = "critical"
21
+
22
+
23
+ class FilterAction(Enum):
24
+ """Actions to take when filter is triggered"""
25
+ ALLOW = "allow"
26
+ WARN = "warn"
27
+ BLOCK = "block"
28
+ REDACT = "redact"
29
+ LOG = "log"
30
+
31
+
32
+ @dataclass
33
+ class SafetyViolation:
34
+ """Represents a safety violation"""
35
+ category: str
36
+ severity: SeverityLevel
37
+ confidence: float
38
+ message: str
39
+ details: Dict[str, Any]
40
+ action: FilterAction
41
+
42
+
43
+ class ContentSafetyFilter:
44
+ """Main safety filter for content moderation"""
45
+
46
+ def __init__(self, config_path: Optional[str] = None):
47
+ """
48
+ Initialize safety filter with configuration
49
+
50
+ Args:
51
+ config_path: Path to safety configuration JSON file
52
+ """
53
+ self.logger = logging.getLogger(__name__)
54
+ self.config = self._load_config(config_path)
55
+ self.violation_log = []
56
+
57
+ # Initialize filter components
58
+ self.toxicity_patterns = self._load_toxicity_patterns()
59
+ self.harmful_keywords = self._load_harmful_keywords()
60
+ self.pii_patterns = self._load_pii_patterns()
61
+ self.jailbreak_patterns = self._load_jailbreak_patterns()
62
+
63
+ def _load_config(self, config_path: Optional[str]) -> Dict:
64
+ """Load safety configuration"""
65
+ if config_path:
66
+ with open(config_path, 'r') as f:
67
+ return json.load(f)
68
+ return self._default_config()
69
+
70
+ def _default_config(self) -> Dict:
71
+ """Default safety configuration"""
72
+ return {
73
+ "toxicity_threshold": 0.75,
74
+ "harmful_content_threshold": 0.8,
75
+ "pii_detection_enabled": True,
76
+ "prompt_injection_detection": True,
77
+ "logging_enabled": True
78
+ }
79
+
80
+ def _load_toxicity_patterns(self) -> List[re.Pattern]:
81
+ """Load toxicity detection patterns"""
82
+ patterns = [
83
+ r'\b(kill|murder|assassinate)\s+(yourself|themselves|someone)\b',
84
+ r'\b(hate|despise|loathe)\s+(all|every)\s+\w+s\b',
85
+ r'\b(you|they)\s+(should|must|need to)\s+die\b',
86
+ r'\b(terrorist|terrorism)\s+(plan|attack|bombing)\b',
87
+ r'\b(nazi|hitler|genocide)\s+(support|praise|glorify)\b',
88
+ ]
89
+ return [re.compile(p, re.IGNORECASE) for p in patterns]
90
+
91
+ def _load_harmful_keywords(self) -> Dict[str, List[str]]:
92
+ """Load harmful content keywords by category"""
93
+ return {
94
+ "violence": [
95
+ "murder", "assault", "torture", "mutilation", "gore",
96
+ "massacre", "slaughter", "execution", "bomb", "weapon"
97
+ ],
98
+ "self_harm": [
99
+ "suicide", "self-harm", "cutting", "overdose", "self-injury",
100
+ "end my life", "kill myself", "hurt myself"
101
+ ],
102
+ "hate_speech": [
103
+ "racial slur", "ethnic slur", "homophobic", "transphobic",
104
+ "xenophobic", "misogynistic", "antisemitic"
105
+ ],
106
+ "illegal_activities": [
107
+ "drug trafficking", "human trafficking", "child exploitation",
108
+ "fraud scheme", "money laundering", "illegal hacking",
109
+ "terrorism", "weapons trafficking"
110
+ ],
111
+ "dangerous_instructions": [
112
+ "make a bomb", "create malware", "synthesize drugs",
113
+ "manufacture weapons", "forge documents", "bypass security"
114
+ ]
115
+ }
116
+
117
+ def _load_pii_patterns(self) -> Dict[str, re.Pattern]:
118
+ """Load PII detection patterns"""
119
+ return {
120
+ "email": re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
121
+ "phone": re.compile(r'\b(?:\+?1[-.]?)?\(?([0-9]{3})\)?[-.]?([0-9]{3})[-.]?([0-9]{4})\b'),
122
+ "ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
123
+ "credit_card": re.compile(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b'),
124
+ "ip_address": re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'),
125
+ }
126
+
127
+ def _load_jailbreak_patterns(self) -> List[re.Pattern]:
128
+ """Load jailbreak attempt detection patterns"""
129
+ patterns = [
130
+ r'ignore\s+(previous|all|prior)\s+(instructions|prompts|rules)',
131
+ r'disregard\s+(safety|ethical|moral)\s+(guidelines|rules)',
132
+ r'(pretend|act|roleplay)\s+(as if|you are|to be)\s+(?!helpful)',
133
+ r'bypass\s+(filter|safety|moderation)',
134
+ r'jailbreak',
135
+ r'(forget|override)\s+(your|the)\s+(training|programming|instructions)',
136
+ r'you\s+(can|must|should)\s+say\s+anything',
137
+ r'developer\s+mode',
138
+ r'(sudo|admin|root)\s+mode',
139
+ ]
140
+ return [re.compile(p, re.IGNORECASE) for p in patterns]
141
+
142
+ def check_input(self, text: str) -> Tuple[bool, List[SafetyViolation]]:
143
+ """
144
+ Check input text for safety violations
145
+
146
+ Args:
147
+ text: Input text to check
148
+
149
+ Returns:
150
+ Tuple of (is_safe, list of violations)
151
+ """
152
+ violations = []
153
+
154
+ # Check for toxicity
155
+ toxicity_violation = self._check_toxicity(text)
156
+ if toxicity_violation:
157
+ violations.append(toxicity_violation)
158
+
159
+ # Check for harmful content
160
+ harmful_violations = self._check_harmful_content(text)
161
+ violations.extend(harmful_violations)
162
+
163
+ # Check for PII
164
+ if self.config.get("pii_detection_enabled", True):
165
+ pii_violations = self._check_pii(text)
166
+ violations.extend(pii_violations)
167
+
168
+ # Check for prompt injection
169
+ if self.config.get("prompt_injection_detection", True):
170
+ injection_violation = self._check_prompt_injection(text)
171
+ if injection_violation:
172
+ violations.append(injection_violation)
173
+
174
+ # Check for jailbreak attempts
175
+ jailbreak_violation = self._check_jailbreak(text)
176
+ if jailbreak_violation:
177
+ violations.append(jailbreak_violation)
178
+
179
+ # Log violations
180
+ if violations and self.config.get("logging_enabled", True):
181
+ self._log_violations(text, violations)
182
+
183
+ # Determine if input is safe
184
+ is_safe = not any(v.severity in [SeverityLevel.HIGH, SeverityLevel.CRITICAL]
185
+ for v in violations)
186
+
187
+ return is_safe, violations
188
+
189
+ def _check_toxicity(self, text: str) -> Optional[SafetyViolation]:
190
+ """Check for toxic content"""
191
+ for pattern in self.toxicity_patterns:
192
+ if pattern.search(text):
193
+ return SafetyViolation(
194
+ category="toxicity",
195
+ severity=SeverityLevel.HIGH,
196
+ confidence=0.9,
197
+ message="Toxic content detected",
198
+ details={"pattern": pattern.pattern},
199
+ action=FilterAction.BLOCK
200
+ )
201
+ return None
202
+
203
+ def _check_harmful_content(self, text: str) -> List[SafetyViolation]:
204
+ """Check for harmful content across categories"""
205
+ violations = []
206
+ text_lower = text.lower()
207
+
208
+ for category, keywords in self.harmful_keywords.items():
209
+ matches = [kw for kw in keywords if kw in text_lower]
210
+ if matches:
211
+ severity = self._determine_severity(category, len(matches))
212
+ violations.append(SafetyViolation(
213
+ category=f"harmful_content_{category}",
214
+ severity=severity,
215
+ confidence=min(0.7 + len(matches) * 0.1, 0.95),
216
+ message=f"Harmful {category} content detected",
217
+ details={"matches": matches[:3]}, # Limit to 3 examples
218
+ action=FilterAction.BLOCK if severity == SeverityLevel.CRITICAL
219
+ else FilterAction.WARN
220
+ ))
221
+
222
+ return violations
223
+
224
+ def _determine_severity(self, category: str, match_count: int) -> SeverityLevel:
225
+ """Determine severity based on category and match count"""
226
+ critical_categories = ["self_harm", "illegal_activities", "dangerous_instructions"]
227
+
228
+ if category in critical_categories:
229
+ return SeverityLevel.CRITICAL if match_count >= 2 else SeverityLevel.HIGH
230
+
231
+ if match_count >= 3:
232
+ return SeverityLevel.HIGH
233
+ elif match_count >= 2:
234
+ return SeverityLevel.MEDIUM
235
+ else:
236
+ return SeverityLevel.LOW
237
+
238
+ def _check_pii(self, text: str) -> List[SafetyViolation]:
239
+ """Check for personally identifiable information"""
240
+ violations = []
241
+
242
+ for pii_type, pattern in self.pii_patterns.items():
243
+ matches = pattern.findall(text)
244
+ if matches:
245
+ violations.append(SafetyViolation(
246
+ category=f"pii_{pii_type}",
247
+ severity=SeverityLevel.MEDIUM,
248
+ confidence=0.85,
249
+ message=f"PII detected: {pii_type}",
250
+ details={"count": len(matches)},
251
+ action=FilterAction.REDACT
252
+ ))
253
+
254
+ return violations
255
+
256
+ def _check_prompt_injection(self, text: str) -> Optional[SafetyViolation]:
257
+ """Check for prompt injection attempts"""
258
+ dangerous_instructions = [
259
+ "ignore", "disregard", "forget", "override", "bypass"
260
+ ]
261
+
262
+ text_lower = text.lower()
263
+ if any(instr in text_lower for instr in dangerous_instructions):
264
+ if "instruction" in text_lower or "prompt" in text_lower or "rule" in text_lower:
265
+ return SafetyViolation(
266
+ category="prompt_injection",
267
+ severity=SeverityLevel.HIGH,
268
+ confidence=0.8,
269
+ message="Potential prompt injection detected",
270
+ details={"type": "instruction_manipulation"},
271
+ action=FilterAction.BLOCK
272
+ )
273
+
274
+ return None
275
+
276
+ def _check_jailbreak(self, text: str) -> Optional[SafetyViolation]:
277
+ """Check for jailbreak attempts"""
278
+ for pattern in self.jailbreak_patterns:
279
+ if pattern.search(text):
280
+ return SafetyViolation(
281
+ category="jailbreak_attempt",
282
+ severity=SeverityLevel.CRITICAL,
283
+ confidence=0.9,
284
+ message="Jailbreak attempt detected",
285
+ details={"pattern": pattern.pattern},
286
+ action=FilterAction.BLOCK
287
+ )
288
+ return None
289
+
290
+ def check_output(self, text: str) -> Tuple[bool, List[SafetyViolation]]:
291
+ """
292
+ Check output text for safety violations
293
+
294
+ Args:
295
+ text: Output text to check
296
+
297
+ Returns:
298
+ Tuple of (is_safe, list of violations)
299
+ """
300
+ violations = []
301
+
302
+ # Check for leaked PII
303
+ pii_violations = self._check_pii(text)
304
+ violations.extend(pii_violations)
305
+
306
+ # Check for harmful generated content
307
+ harmful_violations = self._check_harmful_content(text)
308
+ violations.extend(harmful_violations)
309
+
310
+ # Check for bias indicators
311
+ bias_violation = self._check_bias(text)
312
+ if bias_violation:
313
+ violations.append(bias_violation)
314
+
315
+ is_safe = not any(v.severity == SeverityLevel.CRITICAL for v in violations)
316
+
317
+ return is_safe, violations
318
+
319
+ def _check_bias(self, text: str) -> Optional[SafetyViolation]:
320
+ """Check for biased content"""
321
+ bias_indicators = {
322
+ "gender": ["all men", "all women", "typical male", "typical female"],
323
+ "race": ["all [race]", "typical [race]"],
324
+ "age": ["all old people", "all young people", "boomers are", "millennials are"],
325
+ "religion": ["all [religion]", "typical [religion]"]
326
+ }
327
+
328
+ text_lower = text.lower()
329
+ for bias_type, indicators in bias_indicators.items():
330
+ for indicator in indicators:
331
+ if indicator.lower() in text_lower:
332
+ return SafetyViolation(
333
+ category=f"bias_{bias_type}",
334
+ severity=SeverityLevel.MEDIUM,
335
+ confidence=0.7,
336
+ message=f"Potential {bias_type} bias detected",
337
+ details={"indicator": indicator},
338
+ action=FilterAction.WARN
339
+ )
340
+
341
+ return None
342
+
343
+ def redact_pii(self, text: str) -> str:
344
+ """
345
+ Redact PII from text
346
+
347
+ Args:
348
+ text: Text containing potential PII
349
+
350
+ Returns:
351
+ Text with PII redacted
352
+ """
353
+ redacted = text
354
+
355
+ for pii_type, pattern in self.pii_patterns.items():
356
+ redacted = pattern.sub(f"[{pii_type.upper()}_REDACTED]", redacted)
357
+
358
+ return redacted
359
+
360
+ def _log_violations(self, text: str, violations: List[SafetyViolation]):
361
+ """Log safety violations"""
362
+ text_hash = hashlib.sha256(text.encode()).hexdigest()[:16]
363
+
364
+ log_entry = {
365
+ "timestamp": self._get_timestamp(),
366
+ "text_hash": text_hash,
367
+ "violations": [
368
+ {
369
+ "category": v.category,
370
+ "severity": v.severity.value,
371
+ "confidence": v.confidence,
372
+ "action": v.action.value
373
+ }
374
+ for v in violations
375
+ ]
376
+ }
377
+
378
+ self.violation_log.append(log_entry)
379
+ self.logger.warning(f"Safety violations detected: {log_entry}")
380
+
381
+ def _get_timestamp(self) -> str:
382
+ """Get current timestamp"""
383
+ from datetime import datetime
384
+ return datetime.utcnow().isoformat()
385
+
386
+ def get_violation_stats(self) -> Dict[str, Any]:
387
+ """Get statistics about violations"""
388
+ if not self.violation_log:
389
+ return {"total_violations": 0}
390
+
391
+ categories = {}
392
+ severities = {}
393
+
394
+ for entry in self.violation_log:
395
+ for violation in entry["violations"]:
396
+ cat = violation["category"]
397
+ sev = violation["severity"]
398
+
399
+ categories[cat] = categories.get(cat, 0) + 1
400
+ severities[sev] = severities.get(sev, 0) + 1
401
+
402
+ return {
403
+ "total_violations": len(self.violation_log),
404
+ "by_category": categories,
405
+ "by_severity": severities
406
+ }
407
+
408
+
409
+ class RefusalGenerator:
410
+ """Generate appropriate refusal messages"""
411
+
412
+ def __init__(self):
413
+ self.refusal_templates = {
414
+ "illegal_content": "I cannot assist with illegal activities or harmful content.",
415
+ "dangerous_instructions": "I cannot provide instructions that could cause harm.",
416
+ "unethical_requests": "I cannot help with unethical activities.",
417
+ "privacy_violations": "I cannot assist with privacy violations.",
418
+ "jailbreak_attempt": "I'm designed to be helpful, harmless, and honest. I cannot bypass my safety guidelines.",
419
+ "harmful_content": "I cannot generate content that could be harmful.",
420
+ "default": "I'm unable to assist with this request as it conflicts with my safety guidelines."
421
+ }
422
+
423
+ def generate_refusal(self, violation: SafetyViolation) -> str:
424
+ """
425
+ Generate appropriate refusal message
426
+
427
+ Args:
428
+ violation: Safety violation that triggered refusal
429
+
430
+ Returns:
431
+ Refusal message
432
+ """
433
+ category_base = violation.category.split('_')[0]
434
+ template = self.refusal_templates.get(
435
+ category_base,
436
+ self.refusal_templates["default"]
437
+ )
438
+
439
+ if violation.severity == SeverityLevel.CRITICAL:
440
+ return template + " This request has been logged."
441
+
442
+ return template
443
+
444
+ def generate_alternative(self, violation: SafetyViolation) -> Optional[str]:
445
+ """
446
+ Generate alternative suggestion when appropriate
447
+
448
+ Args:
449
+ violation: Safety violation
450
+
451
+ Returns:
452
+ Alternative suggestion or None
453
+ """
454
+ alternatives = {
455
+ "harmful_content_violence": "I can provide information about conflict resolution or non-violent problem solving.",
456
+ "harmful_content_self_harm": "If you're struggling, please reach out to a mental health professional or crisis helpline.",
457
+ "illegal_activities": "I can provide information about legal alternatives or the legal framework around this topic.",
458
+ }
459
+
460
+ return alternatives.get(violation.category)
461
+
462
+
463
+ def create_safety_pipeline(config_path: Optional[str] = None):
464
+ """
465
+ Create a complete safety pipeline
466
+
467
+ Args:
468
+ config_path: Path to configuration file
469
+
470
+ Returns:
471
+ Tuple of (content_filter, refusal_generator)
472
+ """
473
+ content_filter = ContentSafetyFilter(config_path)
474
+ refusal_generator = RefusalGenerator()
475
+
476
+ return content_filter, refusal_generator
477
+
478
+
479
+ # Example usage
480
+ if __name__ == "__main__":
481
+ # Initialize safety pipeline
482
+ safety_filter, refusal_gen = create_safety_pipeline()
483
+
484
+ # Test input
485
+ test_input = "How do I make a bomb?"
486
+ is_safe, violations = safety_filter.check_input(test_input)
487
+
488
+ if not is_safe:
489
+ print("Input blocked!")
490
+ for violation in violations:
491
+ refusal = refusal_gen.generate_refusal(violation)
492
+ print(f"Refusal: {refusal}")
493
+
494
+ alternative = refusal_gen.generate_alternative(violation)
495
+ if alternative:
496
+ print(f"Alternative: {alternative}")
497
+
498
+ # Get statistics
499
+ stats = safety_filter.get_violation_stats()
500
+ print(f"\nViolation Stats: {stats}")