Test scripts and utilities for evaluating vision-language models on jersey number detection using llama.cpp server.
972 lines
44 KiB
Python
Executable File
972 lines
44 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Test script for evaluating jersey detection performance with different models and prompts.
|
|
|
|
Usage:
|
|
python test_jersey_detection.py <image_directory> <prompt_file> [options]
|
|
|
|
Arguments:
|
|
image_directory: Path to directory containing test images
|
|
prompt_file: Path to text file containing the prompt to use
|
|
--model-name: Name of the model being tested (optional, auto-detected from server if not provided)
|
|
--model-tag: Model tag for llama-swap integration (optional)
|
|
--server-url: Optional llama.cpp server URL (default: read from scan.ini)
|
|
--output-file: Output file for results (default: jersey_detection_results.jsonl)
|
|
--resize: Maximum image dimension for resizing before processing
|
|
|
|
Ground Truth:
|
|
Expected jersey numbers are parsed from filenames using dash-separated format:
|
|
Example: 1122-8-10-29.jpg expects jerseys 8, 10, and 29
|
|
|
|
The script calculates precision, recall, F1 score, and confidence calibration metrics
|
|
to evaluate model accuracy against known correct results.
|
|
|
|
Output Files:
|
|
<output_file>: Summary statistics with ground truth metrics (default: jersey_detection_results.jsonl)
|
|
|
|
Example:
|
|
# Auto-detect model name from server
|
|
python test_jersey_detection.py ./images jersey_prompt.txt
|
|
|
|
# Resize images to 1024px max dimension before processing
|
|
python test_jersey_detection.py ./images jersey_prompt.txt --resize 1024
|
|
|
|
# Use llama-swap to automatically load a specific model
|
|
python test_jersey_detection.py ./images jersey_prompt.txt --model-tag "qwen2.5-vl-7b" --resize 1024
|
|
|
|
# Specify custom model name (for tracking in results)
|
|
python test_jersey_detection.py ./images jersey_prompt.txt --model-name "llama-3.2-vision"
|
|
python test_jersey_detection.py ./images jersey_prompt_with_confidence.txt --model-name "qwen2-vl" --resize 1024
|
|
|
|
After running tests, analyze results with:
|
|
python analyze_jersey_results.py # Performance and accuracy analysis
|
|
"""
|
|
|
|
import argparse
|
|
import configparser
|
|
import json
|
|
import os
|
|
import re
|
|
import requests
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional
|
|
import cv2
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from scan_utils.llama_cpp_client import LlamaCppClient
|
|
|
|
|
|
# Hallucination detection: filter out example numbers from prompts
|
|
# Using numbers > 100 as examples to avoid filtering valid jersey numbers
|
|
HALLUCINATION_NUMBERS = {'101', '102', '103', '142', '199'}
|
|
|
|
|
|
def parse_expected_jerseys(filename: str) -> List[str]:
|
|
"""
|
|
Parse expected jersey numbers from filename.
|
|
|
|
Format: prefix-number1-number2-number3.ext
|
|
Example: 1122-8-10-29.jpg -> ['8', '10', '29']
|
|
|
|
Args:
|
|
filename: Image filename
|
|
|
|
Returns:
|
|
List of expected jersey numbers as strings
|
|
"""
|
|
# Remove extension
|
|
name_without_ext = Path(filename).stem
|
|
|
|
# Split by dash
|
|
parts = name_without_ext.split('-')
|
|
|
|
# First part is typically a prefix/identifier, rest are jersey numbers
|
|
# Skip the first part and collect numeric parts
|
|
expected = []
|
|
for i, part in enumerate(parts[1:], 1): # Skip first part
|
|
# Check if part is numeric (jersey number)
|
|
if part.isdigit():
|
|
expected.append(part)
|
|
|
|
return expected
|
|
|
|
|
|
def clean_response(text: str) -> str:
|
|
"""
|
|
Clean the response by removing think tags and markdown code blocks.
|
|
Some models use <think> tags for chain-of-thought reasoning and wrap JSON in markdown.
|
|
|
|
Args:
|
|
text: Raw response text
|
|
|
|
Returns:
|
|
Cleaned text ready for JSON parsing
|
|
"""
|
|
# Remove <think>...</think> tags and their content (standard angle brackets)
|
|
cleaned = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL | re.IGNORECASE)
|
|
# Remove ◁think▷...◁/think▷ tags (unicode triangle brackets)
|
|
cleaned = re.sub(r'◁think▷.*?◁/think▷', '', cleaned, flags=re.DOTALL | re.IGNORECASE)
|
|
# Also remove any standalone think tags (both formats)
|
|
cleaned = re.sub(r'</?think>', '', cleaned, flags=re.IGNORECASE)
|
|
cleaned = re.sub(r'◁/?think▷', '', cleaned, flags=re.IGNORECASE)
|
|
|
|
# Remove markdown code blocks (```json ... ``` or ``` ... ```)
|
|
# First try to extract content from ```json blocks
|
|
json_block_match = re.search(r'```(?:json)?\s*\n?(.*?)\n?```', cleaned, flags=re.DOTALL | re.IGNORECASE)
|
|
if json_block_match:
|
|
# Extract just the content inside the code block
|
|
cleaned = json_block_match.group(1)
|
|
else:
|
|
# If no code block, just remove any stray ``` markers
|
|
cleaned = re.sub(r'```(?:json)?', '', cleaned, flags=re.IGNORECASE)
|
|
|
|
return cleaned.strip()
|
|
|
|
|
|
def get_llama_server_url_from_config() -> Optional[str]:
|
|
"""
|
|
Read the LLAMA_CPP_SERVER_URL from scan.ini.
|
|
|
|
Returns:
|
|
Server URL from config or None if not found
|
|
"""
|
|
config_path = os.path.join(os.path.dirname(__file__), 'scan.ini')
|
|
|
|
if not os.path.exists(config_path):
|
|
return None
|
|
|
|
try:
|
|
config = configparser.ConfigParser()
|
|
config.read(config_path)
|
|
|
|
if 'DEFAULT' in config and 'LLAMA_CPP_SERVER_URL' in config['DEFAULT']:
|
|
return config['DEFAULT']['LLAMA_CPP_SERVER_URL']
|
|
except Exception as e:
|
|
print(f"Warning: Failed to read scan.ini: {e}")
|
|
|
|
return None
|
|
|
|
|
|
class JerseyDetectionTester:
|
|
"""Test runner for jersey detection evaluation."""
|
|
|
|
def __init__(self, server_url: str, prompt: str, model_name: Optional[str] = None, resize_max: Optional[int] = None, model_tag: Optional[str] = None):
|
|
"""
|
|
Initialize the tester.
|
|
|
|
Args:
|
|
server_url: Base URL for the llama.cpp server
|
|
prompt: Prompt text to use for detection
|
|
model_name: Name of the model being tested (optional)
|
|
resize_max: Maximum image dimension (resize if larger, None = no resize)
|
|
model_tag: Model tag for llama-swap integration (optional)
|
|
"""
|
|
self.client = LlamaCppClient(base_url=server_url)
|
|
self.prompt = prompt
|
|
self.model_name = model_name or "unknown"
|
|
self.resize_max = resize_max
|
|
self.model_tag = model_tag
|
|
self.results = []
|
|
|
|
def test_image(self, image_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Test jersey detection on a single image.
|
|
|
|
Args:
|
|
image_path: Path to the image file
|
|
|
|
Returns:
|
|
Dictionary containing test results for this image
|
|
"""
|
|
start_time = time.time()
|
|
|
|
# Load image
|
|
image = cv2.imread(image_path)
|
|
if image is None:
|
|
filename = Path(image_path).name
|
|
expected_jerseys = parse_expected_jerseys(filename)
|
|
return {
|
|
'image_path': image_path,
|
|
'error': 'Failed to load image',
|
|
'jerseys': [],
|
|
'processing_time': 0,
|
|
'resized': False,
|
|
'original_size': None,
|
|
'final_size': None,
|
|
'expected_jerseys': expected_jerseys,
|
|
'detected_jerseys': [],
|
|
'true_positives': [],
|
|
'false_positives': [],
|
|
'false_negatives': expected_jerseys,
|
|
'precision': 0.0,
|
|
'recall': 0.0,
|
|
'f1_score': 0.0,
|
|
'avg_confidence_correct': None,
|
|
'avg_confidence_incorrect': None,
|
|
'confidence_correct_count': 0,
|
|
'confidence_incorrect_count': 0
|
|
}
|
|
|
|
# Track original size
|
|
original_height, original_width = image.shape[:2]
|
|
original_size = (original_width, original_height)
|
|
resized = False
|
|
|
|
# Resize if needed
|
|
if self.resize_max and (original_width > self.resize_max or original_height > self.resize_max):
|
|
# Calculate new dimensions maintaining aspect ratio
|
|
if original_width > original_height:
|
|
new_width = self.resize_max
|
|
new_height = int(original_height * (self.resize_max / original_width))
|
|
else:
|
|
new_height = self.resize_max
|
|
new_width = int(original_width * (self.resize_max / original_height))
|
|
|
|
# Resize image
|
|
image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA)
|
|
resized = True
|
|
|
|
final_height, final_width = image.shape[:2]
|
|
final_size = (final_width, final_height)
|
|
|
|
# Create multimodal message
|
|
message = self.client.create_multimodal_message(
|
|
role="user",
|
|
content=self.prompt,
|
|
images=[image]
|
|
)
|
|
|
|
# Send to LLM
|
|
try:
|
|
# Prepare kwargs for chat completion
|
|
completion_kwargs = {
|
|
'messages': [message],
|
|
'temperature': 0.1,
|
|
'max_tokens': 1000
|
|
}
|
|
|
|
# Add model parameter if model_tag is specified (for llama-swap)
|
|
if self.model_tag:
|
|
completion_kwargs['model'] = self.model_tag
|
|
# Note: We don't print this for every image to avoid spam, but it's being sent
|
|
|
|
response = self.client.chat_completion(**completion_kwargs)
|
|
|
|
processing_time = time.time() - start_time
|
|
|
|
# Extract response text
|
|
if 'choices' in response and len(response['choices']) > 0:
|
|
response_text = response['choices'][0]['message']['content']
|
|
|
|
# Clean response (remove think tags and markdown code blocks)
|
|
cleaned_text = clean_response(response_text)
|
|
|
|
# Parse JSON response
|
|
try:
|
|
result = json.loads(cleaned_text)
|
|
jerseys = result.get('jerseys', [])
|
|
|
|
# Apply hallucination detection
|
|
filtered_jerseys = []
|
|
hallucinated_count = 0
|
|
|
|
for jersey in jerseys:
|
|
jersey_number = jersey.get('jersey_number', '')
|
|
|
|
# Check for hallucination (model returning example numbers)
|
|
if jersey_number in HALLUCINATION_NUMBERS:
|
|
hallucinated_count += 1
|
|
continue
|
|
|
|
filtered_jerseys.append(jersey)
|
|
|
|
# Ground truth comparison
|
|
filename = Path(image_path).name
|
|
expected_jerseys = set(parse_expected_jerseys(filename))
|
|
detected_jerseys = set(jersey.get('jersey_number', '') for jersey in filtered_jerseys if jersey.get('jersey_number', ''))
|
|
|
|
# Calculate ground truth metrics
|
|
true_positives = expected_jerseys & detected_jerseys # Correctly detected
|
|
false_positives = detected_jerseys - expected_jerseys # Detected but not expected
|
|
false_negatives = expected_jerseys - detected_jerseys # Expected but not detected
|
|
|
|
# Calculate precision, recall, F1
|
|
tp_count = len(true_positives)
|
|
fp_count = len(false_positives)
|
|
fn_count = len(false_negatives)
|
|
|
|
precision = tp_count / (tp_count + fp_count) if (tp_count + fp_count) > 0 else 0.0
|
|
recall = tp_count / (tp_count + fn_count) if (tp_count + fn_count) > 0 else 0.0
|
|
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0
|
|
|
|
# Handle edge case: if no expected jerseys, precision is 1.0 if no detections, else 0.0
|
|
if len(expected_jerseys) == 0:
|
|
precision = 1.0 if len(detected_jerseys) == 0 else 0.0
|
|
recall = 1.0 # No jerseys to detect
|
|
f1_score = 1.0 if len(detected_jerseys) == 0 else 0.0
|
|
|
|
# Calculate confidence scores for correct vs incorrect detections
|
|
confidence_correct = [] # Confidence for true positives
|
|
confidence_incorrect = [] # Confidence for false positives
|
|
|
|
for jersey in filtered_jerseys:
|
|
jersey_number = jersey.get('jersey_number', '')
|
|
confidence = jersey.get('confidence')
|
|
|
|
if confidence is not None:
|
|
if jersey_number in true_positives:
|
|
confidence_correct.append(confidence)
|
|
elif jersey_number in false_positives:
|
|
confidence_incorrect.append(confidence)
|
|
|
|
avg_confidence_correct = sum(confidence_correct) / len(confidence_correct) if confidence_correct else None
|
|
avg_confidence_incorrect = sum(confidence_incorrect) / len(confidence_incorrect) if confidence_incorrect else None
|
|
|
|
return {
|
|
'image_path': image_path,
|
|
'jerseys': filtered_jerseys,
|
|
'hallucinated_count': hallucinated_count,
|
|
'raw_response': cleaned_text,
|
|
'processing_time': processing_time,
|
|
'error': None,
|
|
'resized': resized,
|
|
'original_size': original_size,
|
|
'final_size': final_size,
|
|
# Ground truth metrics
|
|
'expected_jerseys': sorted(expected_jerseys),
|
|
'detected_jerseys': sorted(detected_jerseys),
|
|
'true_positives': sorted(true_positives),
|
|
'false_positives': sorted(false_positives),
|
|
'false_negatives': sorted(false_negatives),
|
|
'precision': precision,
|
|
'recall': recall,
|
|
'f1_score': f1_score,
|
|
# Confidence calibration metrics
|
|
'avg_confidence_correct': avg_confidence_correct,
|
|
'avg_confidence_incorrect': avg_confidence_incorrect,
|
|
'confidence_correct_count': len(confidence_correct),
|
|
'confidence_incorrect_count': len(confidence_incorrect)
|
|
}
|
|
except json.JSONDecodeError as e:
|
|
filename = Path(image_path).name
|
|
expected_jerseys = parse_expected_jerseys(filename)
|
|
return {
|
|
'image_path': image_path,
|
|
'error': f'JSON parse error: {e}',
|
|
'raw_response': cleaned_text,
|
|
'original_response': response_text if cleaned_text != response_text else None,
|
|
'jerseys': [],
|
|
'processing_time': processing_time,
|
|
'resized': resized,
|
|
'original_size': original_size,
|
|
'final_size': final_size,
|
|
'expected_jerseys': expected_jerseys,
|
|
'detected_jerseys': [],
|
|
'true_positives': [],
|
|
'false_positives': [],
|
|
'false_negatives': expected_jerseys,
|
|
'precision': 0.0,
|
|
'recall': 0.0,
|
|
'f1_score': 0.0
|
|
}
|
|
else:
|
|
filename = Path(image_path).name
|
|
expected_jerseys = parse_expected_jerseys(filename)
|
|
return {
|
|
'image_path': image_path,
|
|
'error': 'Empty response from model',
|
|
'jerseys': [],
|
|
'processing_time': processing_time,
|
|
'resized': resized,
|
|
'original_size': original_size,
|
|
'final_size': final_size,
|
|
'expected_jerseys': expected_jerseys,
|
|
'detected_jerseys': [],
|
|
'true_positives': [],
|
|
'false_positives': [],
|
|
'false_negatives': expected_jerseys,
|
|
'precision': 0.0,
|
|
'recall': 0.0,
|
|
'f1_score': 0.0
|
|
}
|
|
|
|
except Exception as e:
|
|
processing_time = time.time() - start_time
|
|
filename = Path(image_path).name
|
|
expected_jerseys = parse_expected_jerseys(filename)
|
|
return {
|
|
'image_path': image_path,
|
|
'error': f'Request error: {e}',
|
|
'jerseys': [],
|
|
'processing_time': processing_time,
|
|
'resized': resized,
|
|
'original_size': original_size,
|
|
'final_size': final_size,
|
|
'expected_jerseys': expected_jerseys,
|
|
'detected_jerseys': [],
|
|
'true_positives': [],
|
|
'false_positives': [],
|
|
'false_negatives': expected_jerseys,
|
|
'precision': 0.0,
|
|
'recall': 0.0,
|
|
'f1_score': 0.0,
|
|
'avg_confidence_correct': None,
|
|
'avg_confidence_incorrect': None,
|
|
'confidence_correct_count': 0,
|
|
'confidence_incorrect_count': 0
|
|
}
|
|
|
|
def test_directory(self, directory_path: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Test all images in a directory.
|
|
|
|
Args:
|
|
directory_path: Path to directory containing images
|
|
|
|
Returns:
|
|
List of results for all images
|
|
"""
|
|
# Get all image files
|
|
image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'}
|
|
image_files = []
|
|
|
|
for ext in image_extensions:
|
|
image_files.extend(Path(directory_path).glob(f'*{ext}'))
|
|
image_files.extend(Path(directory_path).glob(f'*{ext.upper()}'))
|
|
|
|
image_files = sorted(image_files)
|
|
|
|
if not image_files:
|
|
print(f"No image files found in {directory_path}")
|
|
return []
|
|
|
|
print(f"Found {len(image_files)} images to process\n")
|
|
|
|
# Process each image
|
|
results = []
|
|
for i, image_path in enumerate(image_files, 1):
|
|
# Show model tag in progress if using llama-swap
|
|
model_info = f" [{self.model_tag}]" if self.model_tag else ""
|
|
print(f"[{i}/{len(image_files)}]{model_info} Processing {image_path.name}...")
|
|
result = self.test_image(str(image_path))
|
|
results.append(result)
|
|
|
|
# Display result
|
|
self._display_result(result)
|
|
print()
|
|
|
|
return results
|
|
|
|
def _display_result(self, result: Dict[str, Any]):
|
|
"""Display the result for a single image."""
|
|
if result.get('error'):
|
|
print(f" ❌ Error: {result['error']}")
|
|
if 'raw_response' in result:
|
|
print(f" Cleaned response: {result['raw_response']}...")
|
|
if result.get('original_response'):
|
|
print(f" (Think tags and/or markdown were filtered from response)")
|
|
else:
|
|
jerseys = result.get('jerseys', [])
|
|
hallucinated_count = result.get('hallucinated_count', 0)
|
|
|
|
if jerseys:
|
|
print(f" ✓ Found {len(jerseys)} jersey(s):")
|
|
for jersey in jerseys:
|
|
number = jersey.get('jersey_number', 'N/A')
|
|
jersey_color = jersey.get('jersey_color', 'N/A')
|
|
number_color = jersey.get('number_color', 'N/A')
|
|
confidence = jersey.get('confidence', None)
|
|
|
|
conf_str = f" (confidence: {confidence})" if confidence is not None else ""
|
|
print(f" - #{number}: {jersey_color} jersey, {number_color} number{conf_str}")
|
|
else:
|
|
print(f" ○ No jerseys detected")
|
|
|
|
if hallucinated_count > 0:
|
|
print(f" ⚠ Filtered {hallucinated_count} hallucinated detection(s)")
|
|
|
|
# Display ground truth comparison
|
|
expected = result.get('expected_jerseys', [])
|
|
detected = result.get('detected_jerseys', [])
|
|
true_positives = result.get('true_positives', [])
|
|
false_positives = result.get('false_positives', [])
|
|
false_negatives = result.get('false_negatives', [])
|
|
|
|
if expected:
|
|
print(f" Ground truth: Expected {expected}, Detected {detected}")
|
|
if true_positives:
|
|
print(f" ✓ Correct: {true_positives}")
|
|
if false_positives:
|
|
print(f" ✗ False positives: {false_positives}")
|
|
if false_negatives:
|
|
print(f" ✗ Missed: {false_negatives}")
|
|
precision = result.get('precision', 0.0)
|
|
recall = result.get('recall', 0.0)
|
|
f1 = result.get('f1_score', 0.0)
|
|
print(f" Precision: {precision:.2%}, Recall: {recall:.2%}, F1: {f1:.2%}")
|
|
|
|
print(f" Processing time: {result['processing_time']:.2f}s")
|
|
|
|
|
|
def save_results_to_file(self, results: List[Dict[str, Any]], prompt_file: str, output_file: str = "jersey_detection_results.jsonl"):
|
|
"""
|
|
Save test results to a JSON Lines file for later analysis.
|
|
|
|
Args:
|
|
results: List of all test results
|
|
prompt_file: Path to the prompt file used
|
|
output_file: Path to output file (default: jersey_detection_results.jsonl)
|
|
"""
|
|
# Calculate summary statistics
|
|
total_images = len(results)
|
|
images_with_errors = sum(1 for r in results if r.get('error'))
|
|
images_with_jerseys = sum(1 for r in results if not r.get('error') and len(r.get('jerseys', [])) > 0)
|
|
images_without_jerseys = sum(1 for r in results if not r.get('error') and len(r.get('jerseys', [])) == 0)
|
|
total_jerseys = sum(len(r.get('jerseys', [])) for r in results if not r.get('error'))
|
|
total_hallucinated = sum(r.get('hallucinated_count', 0) for r in results if not r.get('error'))
|
|
total_raw_detections = total_jerseys + total_hallucinated
|
|
total_processing_time = sum(r.get('processing_time', 0) for r in results)
|
|
avg_processing_time = total_processing_time / total_images if total_images > 0 else 0
|
|
|
|
# Collect confidence statistics if available
|
|
confidences = [
|
|
jersey.get('confidence')
|
|
for r in results if not r.get('error')
|
|
for jersey in r.get('jerseys', [])
|
|
if 'confidence' in jersey and jersey.get('confidence') is not None
|
|
]
|
|
|
|
confidence_stats = None
|
|
if confidences:
|
|
buckets = {
|
|
'90-100': sum(1 for c in confidences if 90 <= c <= 100),
|
|
'70-89': sum(1 for c in confidences if 70 <= c <= 89),
|
|
'50-69': sum(1 for c in confidences if 50 <= c <= 69),
|
|
'30-49': sum(1 for c in confidences if 30 <= c <= 49),
|
|
'0-29': sum(1 for c in confidences if 0 <= c <= 29)
|
|
}
|
|
confidence_stats = {
|
|
'avg': sum(confidences) / len(confidences),
|
|
'min': min(confidences),
|
|
'max': max(confidences),
|
|
'count': len(confidences),
|
|
'distribution': buckets
|
|
}
|
|
|
|
# Calculate resize statistics
|
|
images_resized = sum(1 for r in results if r.get('resized', False))
|
|
|
|
# Calculate ground truth statistics
|
|
results_without_errors = [r for r in results if not r.get('error')]
|
|
total_expected_jerseys = sum(len(r.get('expected_jerseys', [])) for r in results_without_errors)
|
|
total_true_positives = sum(len(r.get('true_positives', [])) for r in results_without_errors)
|
|
total_false_positives = sum(len(r.get('false_positives', [])) for r in results_without_errors)
|
|
total_false_negatives = sum(len(r.get('false_negatives', [])) for r in results_without_errors)
|
|
|
|
# Calculate overall precision, recall, F1
|
|
overall_precision = total_true_positives / (total_true_positives + total_false_positives) if (total_true_positives + total_false_positives) > 0 else 0.0
|
|
overall_recall = total_true_positives / (total_true_positives + total_false_negatives) if (total_true_positives + total_false_negatives) > 0 else 0.0
|
|
overall_f1 = 2 * (overall_precision * overall_recall) / (overall_precision + overall_recall) if (overall_precision + overall_recall) > 0 else 0.0
|
|
|
|
# Average per-image metrics
|
|
avg_precision = sum(r.get('precision', 0.0) for r in results_without_errors) / len(results_without_errors) if results_without_errors else 0.0
|
|
avg_recall = sum(r.get('recall', 0.0) for r in results_without_errors) / len(results_without_errors) if results_without_errors else 0.0
|
|
avg_f1 = sum(r.get('f1_score', 0.0) for r in results_without_errors) / len(results_without_errors) if results_without_errors else 0.0
|
|
|
|
# Calculate confidence calibration metrics (correct vs incorrect detections)
|
|
all_confidence_correct = []
|
|
all_confidence_incorrect = []
|
|
for r in results_without_errors:
|
|
if r.get('avg_confidence_correct') is not None:
|
|
# Weight by the count of correct detections in this image
|
|
count = r.get('confidence_correct_count', 0)
|
|
avg_conf = r.get('avg_confidence_correct')
|
|
all_confidence_correct.extend([avg_conf] * count)
|
|
if r.get('avg_confidence_incorrect') is not None:
|
|
# Weight by the count of incorrect detections in this image
|
|
count = r.get('confidence_incorrect_count', 0)
|
|
avg_conf = r.get('avg_confidence_incorrect')
|
|
all_confidence_incorrect.extend([avg_conf] * count)
|
|
|
|
overall_avg_confidence_correct = sum(all_confidence_correct) / len(all_confidence_correct) if all_confidence_correct else None
|
|
overall_avg_confidence_incorrect = sum(all_confidence_incorrect) / len(all_confidence_incorrect) if all_confidence_incorrect else None
|
|
|
|
# Create summary record
|
|
summary_record = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'model_name': self.model_name,
|
|
'model_tag': self.model_tag,
|
|
'prompt_file': prompt_file,
|
|
'prompt_length': len(self.prompt),
|
|
'total_images': total_images,
|
|
'images_with_jerseys': images_with_jerseys,
|
|
'images_without_jerseys': images_without_jerseys,
|
|
'images_with_errors': images_with_errors,
|
|
'total_raw_detections': total_raw_detections,
|
|
'total_valid_jerseys': total_jerseys,
|
|
'total_hallucinated': total_hallucinated,
|
|
'avg_processing_time': avg_processing_time,
|
|
'total_processing_time': total_processing_time,
|
|
'confidence_stats': confidence_stats,
|
|
'empty_response_capable': images_without_jerseys > 0,
|
|
'resize_enabled': self.resize_max is not None,
|
|
'resize_max': self.resize_max,
|
|
'images_resized': images_resized,
|
|
# Ground truth statistics
|
|
'ground_truth': {
|
|
'total_expected': total_expected_jerseys,
|
|
'total_true_positives': total_true_positives,
|
|
'total_false_positives': total_false_positives,
|
|
'total_false_negatives': total_false_negatives,
|
|
'overall_precision': overall_precision,
|
|
'overall_recall': overall_recall,
|
|
'overall_f1': overall_f1,
|
|
'avg_precision': avg_precision,
|
|
'avg_recall': avg_recall,
|
|
'avg_f1': avg_f1,
|
|
# Confidence calibration
|
|
'avg_confidence_correct': overall_avg_confidence_correct,
|
|
'avg_confidence_incorrect': overall_avg_confidence_incorrect,
|
|
'confidence_correct_count': len(all_confidence_correct),
|
|
'confidence_incorrect_count': len(all_confidence_incorrect)
|
|
}
|
|
}
|
|
|
|
# Append to file
|
|
try:
|
|
with open(output_file, 'a') as f:
|
|
f.write(json.dumps(summary_record) + '\n')
|
|
print(f"\n✓ Results saved to {output_file}")
|
|
except Exception as e:
|
|
print(f"\n❌ Failed to save results: {e}")
|
|
|
|
def print_summary(self, results: List[Dict[str, Any]]):
|
|
"""
|
|
Print summary statistics for all results.
|
|
|
|
Args:
|
|
results: List of all test results
|
|
"""
|
|
print("=" * 70)
|
|
print("SUMMARY")
|
|
print("=" * 70)
|
|
print(f"\nModel: {self.model_name}")
|
|
if self.model_tag:
|
|
print(f"Model tag: {self.model_tag}")
|
|
|
|
# Display resize info
|
|
if self.resize_max:
|
|
images_resized = sum(1 for r in results if r.get('resized', False))
|
|
print(f"Resize: Enabled (max: {self.resize_max}px, {images_resized} images resized)")
|
|
else:
|
|
print(f"Resize: Disabled")
|
|
|
|
total_images = len(results)
|
|
images_with_errors = sum(1 for r in results if r.get('error'))
|
|
images_with_jerseys = sum(1 for r in results if not r.get('error') and len(r.get('jerseys', [])) > 0)
|
|
images_without_jerseys = sum(1 for r in results if not r.get('error') and len(r.get('jerseys', [])) == 0)
|
|
total_jerseys = sum(len(r.get('jerseys', [])) for r in results if not r.get('error'))
|
|
total_hallucinated = sum(r.get('hallucinated_count', 0) for r in results if not r.get('error'))
|
|
total_raw_detections = total_jerseys + total_hallucinated
|
|
total_processing_time = sum(r.get('processing_time', 0) for r in results)
|
|
avg_processing_time = total_processing_time / total_images if total_images > 0 else 0
|
|
|
|
print(f"\nTotal images processed: {total_images}")
|
|
print(f" - Images with jerseys: {images_with_jerseys} ({images_with_jerseys/total_images*100:.1f}%)")
|
|
print(f" - Images without jerseys: {images_without_jerseys} ({images_without_jerseys/total_images*100:.1f}%)")
|
|
print(f" - Images with errors: {images_with_errors} ({images_with_errors/total_images*100:.1f}%)")
|
|
print(f"\nJersey detections:")
|
|
print(f" - Total raw detections: {total_raw_detections}")
|
|
print(f" - Valid jerseys (after filtering): {total_jerseys}")
|
|
print(f" - Hallucinations filtered out: {total_hallucinated}")
|
|
if images_with_jerseys > 0:
|
|
print(f" - Average valid jerseys per image (when detected): {total_jerseys/images_with_jerseys:.2f}")
|
|
|
|
# Empty response capability (important for evaluating model's ability to return empty results)
|
|
print(f"\nEmpty response capability:")
|
|
print(f" - Empty responses returned: {images_without_jerseys}")
|
|
print(f" - Percentage of images: {images_without_jerseys/total_images*100:.1f}%")
|
|
print(f" - Model can return empty results: {'✓ Yes' if images_without_jerseys > 0 else '✗ No (potential issue)'}")
|
|
|
|
if total_hallucinated > 0:
|
|
print(f"\nHallucination detection:")
|
|
print(f" - Total hallucinated detections filtered: {total_hallucinated}")
|
|
images_with_hallucinations = sum(1 for r in results if not r.get('error') and r.get('hallucinated_count', 0) > 0)
|
|
print(f" - Images with hallucinations: {images_with_hallucinations} ({images_with_hallucinations/total_images*100:.1f}%)")
|
|
|
|
# Ground truth statistics
|
|
results_without_errors = [r for r in results if not r.get('error')]
|
|
total_expected_jerseys = sum(len(r.get('expected_jerseys', [])) for r in results_without_errors)
|
|
|
|
if total_expected_jerseys > 0:
|
|
total_true_positives = sum(len(r.get('true_positives', [])) for r in results_without_errors)
|
|
total_false_positives = sum(len(r.get('false_positives', [])) for r in results_without_errors)
|
|
total_false_negatives = sum(len(r.get('false_negatives', [])) for r in results_without_errors)
|
|
|
|
# Calculate overall metrics
|
|
overall_precision = total_true_positives / (total_true_positives + total_false_positives) if (total_true_positives + total_false_positives) > 0 else 0.0
|
|
overall_recall = total_true_positives / (total_true_positives + total_false_negatives) if (total_true_positives + total_false_negatives) > 0 else 0.0
|
|
overall_f1 = 2 * (overall_precision * overall_recall) / (overall_precision + overall_recall) if (overall_precision + overall_recall) > 0 else 0.0
|
|
|
|
# Calculate average per-image metrics
|
|
avg_precision = sum(r.get('precision', 0.0) for r in results_without_errors) / len(results_without_errors) if results_without_errors else 0.0
|
|
avg_recall = sum(r.get('recall', 0.0) for r in results_without_errors) / len(results_without_errors) if results_without_errors else 0.0
|
|
avg_f1 = sum(r.get('f1_score', 0.0) for r in results_without_errors) / len(results_without_errors) if results_without_errors else 0.0
|
|
|
|
print(f"\nGround truth performance:")
|
|
print(f" - Total expected jerseys: {total_expected_jerseys}")
|
|
print(f" - True positives: {total_true_positives}")
|
|
print(f" - False positives: {total_false_positives}")
|
|
print(f" - False negatives: {total_false_negatives}")
|
|
print(f"\n Overall metrics (across all jerseys):")
|
|
print(f" - Precision: {overall_precision:.2%}")
|
|
print(f" - Recall: {overall_recall:.2%}")
|
|
print(f" - F1 Score: {overall_f1:.2%}")
|
|
print(f"\n Average per-image metrics:")
|
|
print(f" - Avg Precision: {avg_precision:.2%}")
|
|
print(f" - Avg Recall: {avg_recall:.2%}")
|
|
print(f" - Avg F1 Score: {avg_f1:.2%}")
|
|
|
|
# Confidence calibration metrics
|
|
all_confidence_correct = []
|
|
all_confidence_incorrect = []
|
|
for r in results_without_errors:
|
|
if r.get('avg_confidence_correct') is not None:
|
|
count = r.get('confidence_correct_count', 0)
|
|
avg_conf = r.get('avg_confidence_correct')
|
|
all_confidence_correct.extend([avg_conf] * count)
|
|
if r.get('avg_confidence_incorrect') is not None:
|
|
count = r.get('confidence_incorrect_count', 0)
|
|
avg_conf = r.get('avg_confidence_incorrect')
|
|
all_confidence_incorrect.extend([avg_conf] * count)
|
|
|
|
if all_confidence_correct or all_confidence_incorrect:
|
|
print(f"\n Confidence calibration:")
|
|
if all_confidence_correct:
|
|
avg_conf_correct = sum(all_confidence_correct) / len(all_confidence_correct)
|
|
print(f" - Avg confidence (correct detections): {avg_conf_correct:.2f} ({len(all_confidence_correct)} detections)")
|
|
else:
|
|
print(f" - Avg confidence (correct detections): N/A (no correct detections with confidence)")
|
|
|
|
if all_confidence_incorrect:
|
|
avg_conf_incorrect = sum(all_confidence_incorrect) / len(all_confidence_incorrect)
|
|
print(f" - Avg confidence (incorrect detections): {avg_conf_incorrect:.2f} ({len(all_confidence_incorrect)} detections)")
|
|
|
|
# Show confidence difference
|
|
if all_confidence_correct:
|
|
avg_conf_correct = sum(all_confidence_correct) / len(all_confidence_correct)
|
|
diff = avg_conf_correct - avg_conf_incorrect
|
|
if diff > 0:
|
|
print(f" - Confidence difference: +{diff:.2f} (correct > incorrect, good calibration)")
|
|
else:
|
|
print(f" - Confidence difference: {diff:.2f} (⚠ incorrect ≥ correct, poor calibration)")
|
|
else:
|
|
print(f" - Avg confidence (incorrect detections): N/A (no incorrect detections with confidence)")
|
|
|
|
print(f"\nProcessing time:")
|
|
print(f" - Total: {total_processing_time:.2f}s")
|
|
print(f" - Average per image: {avg_processing_time:.2f}s")
|
|
|
|
# Check for confidence values
|
|
has_confidence = any(
|
|
any('confidence' in jersey for jersey in r.get('jerseys', []))
|
|
for r in results if not r.get('error')
|
|
)
|
|
|
|
if has_confidence:
|
|
print(f"\nConfidence statistics:")
|
|
confidences = [
|
|
jersey.get('confidence')
|
|
for r in results if not r.get('error')
|
|
for jersey in r.get('jerseys', [])
|
|
if 'confidence' in jersey and jersey.get('confidence') is not None
|
|
]
|
|
if confidences:
|
|
avg_confidence = sum(confidences) / len(confidences)
|
|
min_confidence = min(confidences)
|
|
max_confidence = max(confidences)
|
|
print(f" - Total detections with confidence: {len(confidences)}")
|
|
print(f" - Average confidence: {avg_confidence:.2f}")
|
|
print(f" - Min confidence: {min_confidence:.2f}")
|
|
print(f" - Max confidence: {max_confidence:.2f}")
|
|
|
|
# Confidence distribution by bucket
|
|
print(f"\n Confidence distribution:")
|
|
buckets = {
|
|
'90-100 (Extremely clear)': (90, 100),
|
|
'70-89 (Clear, minor issues)': (70, 89),
|
|
'50-69 (Partially visible)': (50, 69),
|
|
'30-49 (Difficult to read)': (30, 49),
|
|
'0-29 (Very uncertain)': (0, 29)
|
|
}
|
|
|
|
for bucket_name, (min_val, max_val) in buckets.items():
|
|
count = sum(1 for c in confidences if min_val <= c <= max_val)
|
|
percentage = (count / len(confidences) * 100) if len(confidences) > 0 else 0
|
|
bar_length = int(percentage / 2) # Scale to max 50 chars
|
|
bar = '█' * bar_length
|
|
print(f" {bucket_name}: {count:3d} ({percentage:5.1f}%) {bar}")
|
|
|
|
# List errors if any
|
|
if images_with_errors > 0:
|
|
print(f"\nErrors encountered:")
|
|
for r in results:
|
|
if r.get('error'):
|
|
print(f" - {Path(r['image_path']).name}: {r['error']}")
|
|
|
|
print()
|
|
|
|
|
|
def main():
|
|
"""Main entry point for the test script."""
|
|
# Get default server URL from config
|
|
default_server_url = get_llama_server_url_from_config() or 'http://192.168.1.34:8080'
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description='Test jersey detection with different models and prompts',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter
|
|
)
|
|
parser.add_argument('image_directory', help='Path to directory containing test images')
|
|
parser.add_argument('prompt_file', help='Path to text file containing the prompt')
|
|
parser.add_argument('--model-name', default=None,
|
|
help='Name of the model being tested (auto-detected from server if not provided)')
|
|
parser.add_argument('--server-url', default=default_server_url,
|
|
help=f'llama.cpp server URL (default: {default_server_url})')
|
|
parser.add_argument('--output-file', default='jersey_detection_results.jsonl',
|
|
help='Output file for results (default: jersey_detection_results.jsonl)')
|
|
parser.add_argument('--resize', type=int, default=None, metavar='MAX_SIZE',
|
|
help='Resize images to maximum dimension (e.g., 1024) before processing')
|
|
parser.add_argument('--model-tag', default=None,
|
|
help='Model tag for llama-swap (e.g., "qwen2.5-vl-7b"). If not specified, uses whatever model is loaded.')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Validate inputs
|
|
if not os.path.isdir(args.image_directory):
|
|
print(f"Error: Directory not found: {args.image_directory}")
|
|
sys.exit(1)
|
|
|
|
if not os.path.isfile(args.prompt_file):
|
|
print(f"Error: Prompt file not found: {args.prompt_file}")
|
|
sys.exit(1)
|
|
|
|
# Load prompt
|
|
try:
|
|
with open(args.prompt_file, 'r') as f:
|
|
prompt = f.read()
|
|
except Exception as e:
|
|
print(f"Error reading prompt file: {e}")
|
|
sys.exit(1)
|
|
|
|
# Print test configuration
|
|
print("=" * 70)
|
|
print("JERSEY DETECTION TEST")
|
|
print("=" * 70)
|
|
print(f"Model name: {args.model_name if args.model_name else '(auto-detect)'}")
|
|
print(f"Model tag: {args.model_tag if args.model_tag else 'None (use loaded model)'}")
|
|
print(f"Server URL: {args.server_url}")
|
|
print(f"Image directory: {args.image_directory}")
|
|
print(f"Prompt file: {args.prompt_file}")
|
|
print(f"Prompt length: {len(prompt)} characters")
|
|
print(f"Output file: {args.output_file}")
|
|
print(f"Resize images: {f'Yes (max: {args.resize}px)' if args.resize else 'No'}")
|
|
print("=" * 70)
|
|
print()
|
|
|
|
# Check server health
|
|
print("Checking server health...")
|
|
try:
|
|
client = LlamaCppClient(base_url=args.server_url)
|
|
|
|
# Try health check (handle both JSON and plain text responses)
|
|
try:
|
|
health = client.health_check()
|
|
print(f"✓ Server is healthy: {health}")
|
|
except json.JSONDecodeError:
|
|
# llama-swap returns plain text "OK" instead of JSON
|
|
response = requests.get(f"{args.server_url}/health")
|
|
response.raise_for_status()
|
|
print(f"✓ Server is healthy: {response.text}")
|
|
|
|
# Determine model name to use
|
|
model_name = args.model_name
|
|
|
|
# If model_tag is provided, use it as the model name (unless user explicitly provided a model_name)
|
|
if args.model_tag and not args.model_name:
|
|
model_name = args.model_tag
|
|
print(f"✓ Using model tag as model name: {model_name}")
|
|
elif not model_name:
|
|
# Only auto-detect if neither model_tag nor model_name was provided
|
|
detected_model_name = None
|
|
try:
|
|
models = client.get_models()
|
|
if 'data' in models and len(models['data']) > 0:
|
|
model_id = models['data'][0].get('id', 'unknown')
|
|
print(f"✓ Active model: {model_id}")
|
|
|
|
# Extract just the model filename (without path)
|
|
if model_id and model_id != 'unknown':
|
|
# Remove path and get base filename
|
|
model_filename = os.path.basename(model_id)
|
|
# Remove common extensions (.gguf, .bin, etc.)
|
|
model_name_no_ext = os.path.splitext(model_filename)[0]
|
|
detected_model_name = model_name_no_ext
|
|
except:
|
|
pass
|
|
|
|
if detected_model_name:
|
|
model_name = detected_model_name
|
|
print(f"✓ Using auto-detected model name: {model_name}")
|
|
else:
|
|
model_name = "unknown"
|
|
print(f"⚠ Could not detect model name, using 'unknown'")
|
|
else:
|
|
# User explicitly provided model_name
|
|
print(f"✓ Using provided model name: {model_name}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Failed to connect to server: {e}")
|
|
print(f"Make sure llama.cpp server is running at {args.server_url}")
|
|
sys.exit(1)
|
|
|
|
print()
|
|
|
|
# Show model tag info if using llama-swap
|
|
if args.model_tag:
|
|
print(f"Requesting model from llama-swap: {args.model_tag}")
|
|
|
|
# Check currently running models on llama-swap
|
|
try:
|
|
running_response = requests.get(f"{args.server_url}/running")
|
|
if running_response.status_code == 200:
|
|
try:
|
|
running_models = running_response.json()
|
|
if running_models:
|
|
print(f"Currently running models: {running_models}")
|
|
except:
|
|
pass
|
|
except:
|
|
pass
|
|
|
|
print()
|
|
|
|
# Run tests
|
|
tester = JerseyDetectionTester(args.server_url, prompt, model_name, args.resize, args.model_tag)
|
|
results = tester.test_directory(args.image_directory)
|
|
|
|
# Print summary
|
|
if results:
|
|
tester.print_summary(results)
|
|
|
|
# Save results to file
|
|
tester.save_results_to_file(results, args.prompt_file, args.output_file)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|