|
|
""" |
|
|
Comprehensive benchmarking script for Helion-V2.0-Thinking |
|
|
Measures performance, throughput, latency, and memory usage |
|
|
""" |
|
|
|
|
|
import torch |
|
|
from transformers import AutoModelForCausalLM, AutoProcessor |
|
|
from typing import List, Dict, Any |
|
|
import time |
|
|
import psutil |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
import json |
|
|
from tqdm import tqdm |
|
|
import gc |
|
|
|
|
|
|
|
|
class HelionBenchmark: |
|
|
"""Performance benchmarking for Helion-V2.0-Thinking""" |
|
|
|
|
|
def __init__(self, model_name: str = "DeepXR/Helion-V2.0-Thinking"): |
|
|
"""Initialize benchmark suite""" |
|
|
print(f"Loading model for benchmarking: {model_name}") |
|
|
|
|
|
self.model = AutoModelForCausalLM.from_pretrained( |
|
|
model_name, |
|
|
torch_dtype=torch.bfloat16, |
|
|
device_map="auto", |
|
|
trust_remote_code=True |
|
|
) |
|
|
self.processor = AutoProcessor.from_pretrained(model_name) |
|
|
self.model.eval() |
|
|
|
|
|
print("Model loaded successfully") |
|
|
|
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
self.device = "cuda" |
|
|
self.device_name = torch.cuda.get_device_name(0) |
|
|
self.total_vram = torch.cuda.get_device_properties(0).total_memory / (1024**3) |
|
|
else: |
|
|
self.device = "cpu" |
|
|
self.device_name = "CPU" |
|
|
self.total_vram = 0 |
|
|
|
|
|
print(f"Device: {self.device_name}") |
|
|
if self.device == "cuda": |
|
|
print(f"Total VRAM: {self.total_vram:.2f} GB") |
|
|
|
|
|
def measure_memory_usage(self) -> Dict[str, float]: |
|
|
"""Measure current memory usage""" |
|
|
memory_stats = {} |
|
|
|
|
|
if self.device == "cuda": |
|
|
memory_stats['vram_allocated_gb'] = torch.cuda.memory_allocated() / (1024**3) |
|
|
memory_stats['vram_reserved_gb'] = torch.cuda.memory_reserved() / (1024**3) |
|
|
memory_stats['vram_peak_gb'] = torch.cuda.max_memory_allocated() / (1024**3) |
|
|
|
|
|
|
|
|
memory_stats['ram_used_gb'] = psutil.Process().memory_info().rss / (1024**3) |
|
|
|
|
|
return memory_stats |
|
|
|
|
|
def benchmark_text_generation( |
|
|
self, |
|
|
prompts: List[str], |
|
|
max_new_tokens: int = 256 |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Benchmark text generation performance |
|
|
|
|
|
Returns: |
|
|
Dict with latency, throughput, and token metrics |
|
|
""" |
|
|
print("\n=== Benchmarking Text Generation ===") |
|
|
|
|
|
latencies = [] |
|
|
tokens_per_second = [] |
|
|
|
|
|
|
|
|
warmup_prompt = "Test prompt for warmup." |
|
|
inputs = self.processor(text=warmup_prompt, return_tensors="pt").to(self.model.device) |
|
|
with torch.no_grad(): |
|
|
_ = self.model.generate(**inputs, max_new_tokens=50) |
|
|
|
|
|
if self.device == "cuda": |
|
|
torch.cuda.synchronize() |
|
|
torch.cuda.reset_peak_memory_stats() |
|
|
|
|
|
|
|
|
for prompt in tqdm(prompts, desc="Text Generation"): |
|
|
inputs = self.processor(text=prompt, return_tensors="pt").to(self.model.device) |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = self.model.generate( |
|
|
**inputs, |
|
|
max_new_tokens=max_new_tokens, |
|
|
do_sample=False |
|
|
) |
|
|
|
|
|
if self.device == "cuda": |
|
|
torch.cuda.synchronize() |
|
|
|
|
|
end_time = time.time() |
|
|
latency = end_time - start_time |
|
|
|
|
|
|
|
|
generated_tokens = outputs.shape[1] - inputs['input_ids'].shape[1] |
|
|
tps = generated_tokens / latency |
|
|
|
|
|
latencies.append(latency) |
|
|
tokens_per_second.append(tps) |
|
|
|
|
|
|
|
|
memory_stats = self.measure_memory_usage() |
|
|
|
|
|
return { |
|
|
"text_generation": { |
|
|
"avg_latency_ms": np.mean(latencies) * 1000, |
|
|
"p50_latency_ms": np.percentile(latencies, 50) * 1000, |
|
|
"p95_latency_ms": np.percentile(latencies, 95) * 1000, |
|
|
"p99_latency_ms": np.percentile(latencies, 99) * 1000, |
|
|
"avg_tokens_per_second": np.mean(tokens_per_second), |
|
|
"total_prompts": len(prompts), |
|
|
**memory_stats |
|
|
} |
|
|
} |
|
|
|
|
|
def benchmark_vision( |
|
|
self, |
|
|
image_prompts: List[tuple[Image.Image, str]], |
|
|
max_new_tokens: int = 256 |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Benchmark vision + text generation |
|
|
|
|
|
Args: |
|
|
image_prompts: List of (image, prompt) tuples |
|
|
|
|
|
Returns: |
|
|
Performance metrics |
|
|
""" |
|
|
print("\n=== Benchmarking Vision Tasks ===") |
|
|
|
|
|
latencies = [] |
|
|
tokens_per_second = [] |
|
|
|
|
|
|
|
|
if image_prompts: |
|
|
warmup_image, warmup_prompt = image_prompts[0] |
|
|
inputs = self.processor( |
|
|
text=warmup_prompt, |
|
|
images=warmup_image, |
|
|
return_tensors="pt" |
|
|
).to(self.model.device) |
|
|
with torch.no_grad(): |
|
|
_ = self.model.generate(**inputs, max_new_tokens=50) |
|
|
|
|
|
if self.device == "cuda": |
|
|
torch.cuda.synchronize() |
|
|
torch.cuda.reset_peak_memory_stats() |
|
|
|
|
|
|
|
|
for image, prompt in tqdm(image_prompts, desc="Vision Tasks"): |
|
|
inputs = self.processor( |
|
|
text=prompt, |
|
|
images=image, |
|
|
return_tensors="pt" |
|
|
).to(self.model.device) |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = self.model.generate( |
|
|
**inputs, |
|
|
max_new_tokens=max_new_tokens, |
|
|
do_sample=False |
|
|
) |
|
|
|
|
|
if self.device == "cuda": |
|
|
torch.cuda.synchronize() |
|
|
|
|
|
end_time = time.time() |
|
|
latency = end_time - start_time |
|
|
|
|
|
generated_tokens = outputs.shape[1] - inputs['input_ids'].shape[1] |
|
|
tps = generated_tokens / latency |
|
|
|
|
|
latencies.append(latency) |
|
|
tokens_per_second.append(tps) |
|
|
|
|
|
memory_stats = self.measure_memory_usage() |
|
|
|
|
|
return { |
|
|
"vision_tasks": { |
|
|
"avg_latency_ms": np.mean(latencies) * 1000, |
|
|
"p50_latency_ms": np.percentile(latencies, 50) * 1000, |
|
|
"p95_latency_ms": np.percentile(latencies, 95) * 1000, |
|
|
"p99_latency_ms": np.percentile(latencies, 99) * 1000, |
|
|
"avg_tokens_per_second": np.mean(tokens_per_second), |
|
|
"total_image_prompts": len(image_prompts), |
|
|
**memory_stats |
|
|
} |
|
|
} |
|
|
|
|
|
def benchmark_long_context( |
|
|
self, |
|
|
context_lengths: List[int] = [1000, 5000, 10000, 50000, 100000] |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Benchmark performance with varying context lengths |
|
|
|
|
|
Args: |
|
|
context_lengths: List of context lengths to test |
|
|
|
|
|
Returns: |
|
|
Performance metrics by context length |
|
|
""" |
|
|
print("\n=== Benchmarking Long Context Performance ===") |
|
|
|
|
|
results = {} |
|
|
|
|
|
for length in tqdm(context_lengths, desc="Context Lengths"): |
|
|
|
|
|
context = "This is a test sentence. " * (length // 6) |
|
|
prompt = f"{context}\n\nQuestion: What is the main topic of this text?\nAnswer:" |
|
|
|
|
|
inputs = self.processor(text=prompt, return_tensors="pt").to(self.model.device) |
|
|
|
|
|
|
|
|
if inputs['input_ids'].shape[1] > length: |
|
|
print(f"Skipping {length} - generated context too long") |
|
|
continue |
|
|
|
|
|
if self.device == "cuda": |
|
|
torch.cuda.synchronize() |
|
|
torch.cuda.reset_peak_memory_stats() |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
with torch.no_grad(): |
|
|
outputs = self.model.generate( |
|
|
**inputs, |
|
|
max_new_tokens=128, |
|
|
do_sample=False |
|
|
) |
|
|
|
|
|
if self.device == "cuda": |
|
|
torch.cuda.synchronize() |
|
|
|
|
|
end_time = time.time() |
|
|
latency = end_time - start_time |
|
|
|
|
|
memory_stats = self.measure_memory_usage() |
|
|
|
|
|
results[f"context_{length}"] = { |
|
|
"latency_ms": latency * 1000, |
|
|
"input_tokens": inputs['input_ids'].shape[1], |
|
|
**memory_stats |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
results[f"context_{length}"] = { |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
|
|
|
del inputs, outputs |
|
|
gc.collect() |
|
|
if self.device == "cuda": |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
return {"long_context": results} |
|
|
|
|
|
def benchmark_throughput( |
|
|
self, |
|
|
batch_sizes: List[int] = [1, 2, 4, 8], |
|
|
sequence_length: int = 512 |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Benchmark throughput with different batch sizes |
|
|
|
|
|
Args: |
|
|
batch_sizes: List of batch sizes to test |
|
|
sequence_length: Target sequence length |
|
|
|
|
|
Returns: |
|
|
Throughput metrics |
|
|
""" |
|
|
print("\n=== Benchmarking Throughput ===") |
|
|
|
|
|
results = {} |
|
|
prompt = "Explain the concept of artificial intelligence. " * 10 |
|
|
|
|
|
for batch_size in tqdm(batch_sizes, desc="Batch Sizes"): |
|
|
try: |
|
|
|
|
|
prompts = [prompt] * batch_size |
|
|
inputs = self.processor( |
|
|
text=prompts, |
|
|
return_tensors="pt", |
|
|
padding=True |
|
|
).to(self.model.device) |
|
|
|
|
|
if self.device == "cuda": |
|
|
torch.cuda.synchronize() |
|
|
torch.cuda.reset_peak_memory_stats() |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = self.model.generate( |
|
|
**inputs, |
|
|
max_new_tokens=256, |
|
|
do_sample=False |
|
|
) |
|
|
|
|
|
if self.device == "cuda": |
|
|
torch.cuda.synchronize() |
|
|
|
|
|
end_time = time.time() |
|
|
latency = end_time - start_time |
|
|
|
|
|
|
|
|
total_tokens = outputs.shape[0] * outputs.shape[1] |
|
|
throughput = total_tokens / latency |
|
|
|
|
|
memory_stats = self.measure_memory_usage() |
|
|
|
|
|
results[f"batch_{batch_size}"] = { |
|
|
"latency_ms": latency * 1000, |
|
|
"throughput_tokens_per_sec": throughput, |
|
|
"tokens_per_sample": outputs.shape[1], |
|
|
**memory_stats |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
results[f"batch_{batch_size}"] = { |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
|
|
|
del inputs, outputs |
|
|
gc.collect() |
|
|
if self.device == "cuda": |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
return {"throughput": results} |
|
|
|
|
|
def run_full_benchmark(self) -> Dict[str, Any]: |
|
|
"""Run complete benchmark suite""" |
|
|
print("\n" + "="*60) |
|
|
print("Starting Full Benchmark Suite") |
|
|
print(f"Device: {self.device_name}") |
|
|
print("="*60) |
|
|
|
|
|
results = { |
|
|
"system_info": { |
|
|
"device": self.device, |
|
|
"device_name": self.device_name, |
|
|
"total_vram_gb": self.total_vram if self.device == "cuda" else None, |
|
|
"pytorch_version": torch.__version__, |
|
|
"cuda_available": torch.cuda.is_available() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
text_prompts = [ |
|
|
"Explain quantum mechanics in simple terms.", |
|
|
"Write a short story about space exploration.", |
|
|
"What are the benefits of machine learning?", |
|
|
"Describe the process of photosynthesis.", |
|
|
"How does blockchain technology work?" |
|
|
] |
|
|
results.update(self.benchmark_text_generation(text_prompts, max_new_tokens=256)) |
|
|
|
|
|
|
|
|
results.update(self.benchmark_long_context([1000, 5000, 10000, 50000])) |
|
|
|
|
|
|
|
|
results.update(self.benchmark_throughput([1, 2, 4])) |
|
|
|
|
|
print("\n" + "="*60) |
|
|
print("Benchmark Complete") |
|
|
print("="*60) |
|
|
|
|
|
return results |
|
|
|
|
|
def print_results(self, results: Dict[str, Any]): |
|
|
"""Print benchmark results in a readable format""" |
|
|
print("\n" + "="*60) |
|
|
print("BENCHMARK RESULTS") |
|
|
print("="*60) |
|
|
|
|
|
def print_dict(d, indent=0): |
|
|
for key, value in d.items(): |
|
|
if isinstance(value, dict): |
|
|
print(" " * indent + f"{key}:") |
|
|
print_dict(value, indent + 1) |
|
|
elif isinstance(value, float): |
|
|
print(" " * indent + f"{key}: {value:.4f}") |
|
|
else: |
|
|
print(" " * indent + f"{key}: {value}") |
|
|
|
|
|
print_dict(results) |
|
|
print("="*60 + "\n") |
|
|
|
|
|
def save_results(self, results: Dict[str, Any], filename: str = "benchmark_results.json"): |
|
|
"""Save results to JSON file""" |
|
|
with open(filename, 'w') as f: |
|
|
json.dump(results, f, indent=2) |
|
|
print(f"Results saved to {filename}") |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main benchmark function""" |
|
|
import argparse |
|
|
|
|
|
parser = argparse.ArgumentParser(description="Benchmark Helion-V2.0-Thinking") |
|
|
parser.add_argument( |
|
|
"--model", |
|
|
type=str, |
|
|
default="DeepXR/Helion-V2.0-Thinking", |
|
|
help="Model name or path" |
|
|
) |
|
|
parser.add_argument( |
|
|
"--output", |
|
|
type=str, |
|
|
default="benchmark_results.json", |
|
|
help="Output file for results" |
|
|
) |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
benchmark = HelionBenchmark(args.model) |
|
|
results = benchmark.run_full_benchmark() |
|
|
benchmark.print_results(results) |
|
|
benchmark.save_results(results, args.output) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |