Helion-V2.0-Thinking / benchmark.py
Trouter-Library's picture
Create benchmark.py
e489ad0 verified
raw
history blame
15 kB
"""
Comprehensive benchmarking script for Helion-V2.0-Thinking
Measures performance, throughput, latency, and memory usage
"""
import torch
from transformers import AutoModelForCausalLM, AutoProcessor
from typing import List, Dict, Any
import time
import psutil
import numpy as np
from PIL import Image
import json
from tqdm import tqdm
import gc
class HelionBenchmark:
"""Performance benchmarking for Helion-V2.0-Thinking"""
def __init__(self, model_name: str = "DeepXR/Helion-V2.0-Thinking"):
"""Initialize benchmark suite"""
print(f"Loading model for benchmarking: {model_name}")
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.bfloat16,
device_map="auto",
trust_remote_code=True
)
self.processor = AutoProcessor.from_pretrained(model_name)
self.model.eval()
print("Model loaded successfully")
# Get device info
if torch.cuda.is_available():
self.device = "cuda"
self.device_name = torch.cuda.get_device_name(0)
self.total_vram = torch.cuda.get_device_properties(0).total_memory / (1024**3)
else:
self.device = "cpu"
self.device_name = "CPU"
self.total_vram = 0
print(f"Device: {self.device_name}")
if self.device == "cuda":
print(f"Total VRAM: {self.total_vram:.2f} GB")
def measure_memory_usage(self) -> Dict[str, float]:
"""Measure current memory usage"""
memory_stats = {}
if self.device == "cuda":
memory_stats['vram_allocated_gb'] = torch.cuda.memory_allocated() / (1024**3)
memory_stats['vram_reserved_gb'] = torch.cuda.memory_reserved() / (1024**3)
memory_stats['vram_peak_gb'] = torch.cuda.max_memory_allocated() / (1024**3)
# System RAM
memory_stats['ram_used_gb'] = psutil.Process().memory_info().rss / (1024**3)
return memory_stats
def benchmark_text_generation(
self,
prompts: List[str],
max_new_tokens: int = 256
) -> Dict[str, Any]:
"""
Benchmark text generation performance
Returns:
Dict with latency, throughput, and token metrics
"""
print("\n=== Benchmarking Text Generation ===")
latencies = []
tokens_per_second = []
# Warmup
warmup_prompt = "Test prompt for warmup."
inputs = self.processor(text=warmup_prompt, return_tensors="pt").to(self.model.device)
with torch.no_grad():
_ = self.model.generate(**inputs, max_new_tokens=50)
if self.device == "cuda":
torch.cuda.synchronize()
torch.cuda.reset_peak_memory_stats()
# Benchmark
for prompt in tqdm(prompts, desc="Text Generation"):
inputs = self.processor(text=prompt, return_tensors="pt").to(self.model.device)
start_time = time.time()
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=max_new_tokens,
do_sample=False
)
if self.device == "cuda":
torch.cuda.synchronize()
end_time = time.time()
latency = end_time - start_time
# Calculate tokens generated
generated_tokens = outputs.shape[1] - inputs['input_ids'].shape[1]
tps = generated_tokens / latency
latencies.append(latency)
tokens_per_second.append(tps)
# Memory stats
memory_stats = self.measure_memory_usage()
return {
"text_generation": {
"avg_latency_ms": np.mean(latencies) * 1000,
"p50_latency_ms": np.percentile(latencies, 50) * 1000,
"p95_latency_ms": np.percentile(latencies, 95) * 1000,
"p99_latency_ms": np.percentile(latencies, 99) * 1000,
"avg_tokens_per_second": np.mean(tokens_per_second),
"total_prompts": len(prompts),
**memory_stats
}
}
def benchmark_vision(
self,
image_prompts: List[tuple[Image.Image, str]],
max_new_tokens: int = 256
) -> Dict[str, Any]:
"""
Benchmark vision + text generation
Args:
image_prompts: List of (image, prompt) tuples
Returns:
Performance metrics
"""
print("\n=== Benchmarking Vision Tasks ===")
latencies = []
tokens_per_second = []
# Warmup
if image_prompts:
warmup_image, warmup_prompt = image_prompts[0]
inputs = self.processor(
text=warmup_prompt,
images=warmup_image,
return_tensors="pt"
).to(self.model.device)
with torch.no_grad():
_ = self.model.generate(**inputs, max_new_tokens=50)
if self.device == "cuda":
torch.cuda.synchronize()
torch.cuda.reset_peak_memory_stats()
# Benchmark
for image, prompt in tqdm(image_prompts, desc="Vision Tasks"):
inputs = self.processor(
text=prompt,
images=image,
return_tensors="pt"
).to(self.model.device)
start_time = time.time()
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=max_new_tokens,
do_sample=False
)
if self.device == "cuda":
torch.cuda.synchronize()
end_time = time.time()
latency = end_time - start_time
generated_tokens = outputs.shape[1] - inputs['input_ids'].shape[1]
tps = generated_tokens / latency
latencies.append(latency)
tokens_per_second.append(tps)
memory_stats = self.measure_memory_usage()
return {
"vision_tasks": {
"avg_latency_ms": np.mean(latencies) * 1000,
"p50_latency_ms": np.percentile(latencies, 50) * 1000,
"p95_latency_ms": np.percentile(latencies, 95) * 1000,
"p99_latency_ms": np.percentile(latencies, 99) * 1000,
"avg_tokens_per_second": np.mean(tokens_per_second),
"total_image_prompts": len(image_prompts),
**memory_stats
}
}
def benchmark_long_context(
self,
context_lengths: List[int] = [1000, 5000, 10000, 50000, 100000]
) -> Dict[str, Any]:
"""
Benchmark performance with varying context lengths
Args:
context_lengths: List of context lengths to test
Returns:
Performance metrics by context length
"""
print("\n=== Benchmarking Long Context Performance ===")
results = {}
for length in tqdm(context_lengths, desc="Context Lengths"):
# Generate synthetic context
context = "This is a test sentence. " * (length // 6) # Approx tokens
prompt = f"{context}\n\nQuestion: What is the main topic of this text?\nAnswer:"
inputs = self.processor(text=prompt, return_tensors="pt").to(self.model.device)
# Check if context fits
if inputs['input_ids'].shape[1] > length:
print(f"Skipping {length} - generated context too long")
continue
if self.device == "cuda":
torch.cuda.synchronize()
torch.cuda.reset_peak_memory_stats()
start_time = time.time()
try:
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=128,
do_sample=False
)
if self.device == "cuda":
torch.cuda.synchronize()
end_time = time.time()
latency = end_time - start_time
memory_stats = self.measure_memory_usage()
results[f"context_{length}"] = {
"latency_ms": latency * 1000,
"input_tokens": inputs['input_ids'].shape[1],
**memory_stats
}
except Exception as e:
results[f"context_{length}"] = {
"error": str(e)
}
# Cleanup
del inputs, outputs
gc.collect()
if self.device == "cuda":
torch.cuda.empty_cache()
return {"long_context": results}
def benchmark_throughput(
self,
batch_sizes: List[int] = [1, 2, 4, 8],
sequence_length: int = 512
) -> Dict[str, Any]:
"""
Benchmark throughput with different batch sizes
Args:
batch_sizes: List of batch sizes to test
sequence_length: Target sequence length
Returns:
Throughput metrics
"""
print("\n=== Benchmarking Throughput ===")
results = {}
prompt = "Explain the concept of artificial intelligence. " * 10
for batch_size in tqdm(batch_sizes, desc="Batch Sizes"):
try:
# Create batch
prompts = [prompt] * batch_size
inputs = self.processor(
text=prompts,
return_tensors="pt",
padding=True
).to(self.model.device)
if self.device == "cuda":
torch.cuda.synchronize()
torch.cuda.reset_peak_memory_stats()
start_time = time.time()
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=256,
do_sample=False
)
if self.device == "cuda":
torch.cuda.synchronize()
end_time = time.time()
latency = end_time - start_time
# Calculate throughput
total_tokens = outputs.shape[0] * outputs.shape[1]
throughput = total_tokens / latency
memory_stats = self.measure_memory_usage()
results[f"batch_{batch_size}"] = {
"latency_ms": latency * 1000,
"throughput_tokens_per_sec": throughput,
"tokens_per_sample": outputs.shape[1],
**memory_stats
}
except Exception as e:
results[f"batch_{batch_size}"] = {
"error": str(e)
}
# Cleanup
del inputs, outputs
gc.collect()
if self.device == "cuda":
torch.cuda.empty_cache()
return {"throughput": results}
def run_full_benchmark(self) -> Dict[str, Any]:
"""Run complete benchmark suite"""
print("\n" + "="*60)
print("Starting Full Benchmark Suite")
print(f"Device: {self.device_name}")
print("="*60)
results = {
"system_info": {
"device": self.device,
"device_name": self.device_name,
"total_vram_gb": self.total_vram if self.device == "cuda" else None,
"pytorch_version": torch.__version__,
"cuda_available": torch.cuda.is_available()
}
}
# Text generation benchmark
text_prompts = [
"Explain quantum mechanics in simple terms.",
"Write a short story about space exploration.",
"What are the benefits of machine learning?",
"Describe the process of photosynthesis.",
"How does blockchain technology work?"
]
results.update(self.benchmark_text_generation(text_prompts, max_new_tokens=256))
# Long context benchmark
results.update(self.benchmark_long_context([1000, 5000, 10000, 50000]))
# Throughput benchmark
results.update(self.benchmark_throughput([1, 2, 4]))
print("\n" + "="*60)
print("Benchmark Complete")
print("="*60)
return results
def print_results(self, results: Dict[str, Any]):
"""Print benchmark results in a readable format"""
print("\n" + "="*60)
print("BENCHMARK RESULTS")
print("="*60)
def print_dict(d, indent=0):
for key, value in d.items():
if isinstance(value, dict):
print(" " * indent + f"{key}:")
print_dict(value, indent + 1)
elif isinstance(value, float):
print(" " * indent + f"{key}: {value:.4f}")
else:
print(" " * indent + f"{key}: {value}")
print_dict(results)
print("="*60 + "\n")
def save_results(self, results: Dict[str, Any], filename: str = "benchmark_results.json"):
"""Save results to JSON file"""
with open(filename, 'w') as f:
json.dump(results, f, indent=2)
print(f"Results saved to {filename}")
def main():
"""Main benchmark function"""
import argparse
parser = argparse.ArgumentParser(description="Benchmark Helion-V2.0-Thinking")
parser.add_argument(
"--model",
type=str,
default="DeepXR/Helion-V2.0-Thinking",
help="Model name or path"
)
parser.add_argument(
"--output",
type=str,
default="benchmark_results.json",
help="Output file for results"
)
args = parser.parse_args()
# Run benchmark
benchmark = HelionBenchmark(args.model)
results = benchmark.run_full_benchmark()
benchmark.print_results(results)
benchmark.save_results(results, args.output)
if __name__ == "__main__":
main()