The multi-ref matching method was missing a margin check against other logos, causing excessive false positives. This fix adds: - margin parameter to find_best_match_multi_ref() that requires the best logo's score to exceed the second-best by a minimum margin - Test script now passes --margin to both matching methods - Updated documentation to reflect margin applies to both methods Also adds run_comparison_tests.sh to run all three matching methods and compare results.
570 lines
21 KiB
Python
570 lines
21 KiB
Python
"""
|
|
Logo detection using DETR for object detection and CLIP for feature matching.
|
|
|
|
This module provides a class for detecting logos in images using:
|
|
1. DETR (DEtection TRansformer) for initial logo region detection
|
|
2. CLIP (Contrastive Language-Image Pre-training) for feature extraction and matching
|
|
|
|
The class supports caching of embeddings for efficient reprocessing.
|
|
The class automatically uses local models if available, otherwise falls back to HuggingFace.
|
|
"""
|
|
|
|
import os
|
|
import torch
|
|
import torch.nn.functional as F
|
|
from transformers import pipeline, CLIPProcessor, CLIPModel
|
|
from PIL import Image
|
|
import cv2
|
|
import numpy as np
|
|
from pathlib import Path
|
|
from typing import List, Tuple, Dict, Optional, Any
|
|
|
|
|
|
class DetectLogosDETR:
|
|
"""
|
|
Logo detection class using DETR and CLIP models.
|
|
|
|
This class detects logos in images by:
|
|
1. Using DETR to find potential logo regions (bounding boxes)
|
|
2. Extracting CLIP embeddings for each detected region
|
|
3. Comparing embeddings with reference logos for identification
|
|
|
|
The class automatically checks for local models before downloading from HuggingFace.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
logger,
|
|
detr_model: str = "Pravallika6/detr-finetuned-logo-detection_v2",
|
|
#clip_model: str = "openai/clip-vit-base-patch32",
|
|
clip_model: str = "openai/clip-vit-large-patch14",
|
|
detr_threshold: float = 0.5,
|
|
min_box_size: int = 20,
|
|
nms_iou_threshold: float = 0.5,
|
|
):
|
|
"""
|
|
Initialize DETR and CLIP models.
|
|
|
|
The class will automatically check for local models in the default directories
|
|
before downloading from HuggingFace. You can override this by providing absolute
|
|
paths to local models.
|
|
|
|
Args:
|
|
logger: Logger instance for logging
|
|
detr_model: HuggingFace model name or local path for DETR object detection
|
|
clip_model: HuggingFace model name or local path for CLIP embeddings
|
|
detr_threshold: Confidence threshold for DETR detections (0-1)
|
|
min_box_size: Minimum width/height in pixels for detected boxes (filters noise)
|
|
nms_iou_threshold: IoU threshold for Non-Maximum Suppression
|
|
"""
|
|
self.logger = logger
|
|
self.detr_threshold = detr_threshold
|
|
self.min_box_size = min_box_size
|
|
self.nms_iou_threshold = nms_iou_threshold
|
|
|
|
# Set device
|
|
self.device_str = "cuda:0" if torch.cuda.is_available() else "cpu"
|
|
self.device_index = 0 if torch.cuda.is_available() else -1
|
|
self.device = torch.device(self.device_str)
|
|
|
|
self.logger.info(f"Initializing DetectLogosDETR on device: {self.device_str}")
|
|
|
|
# Get default model directories from environment variables
|
|
default_detr_dir = os.environ.get('LOGO_DETR_MODEL_DIR', 'models/logo_detection/detr')
|
|
default_clip_dir = os.environ.get('LOGO_CLIP_MODEL_DIR', 'models/logo_detection/clip')
|
|
|
|
# Resolve DETR model path (check local first, then use HuggingFace name)
|
|
detr_model_path = self._resolve_model_path(
|
|
detr_model, default_detr_dir, "DETR"
|
|
)
|
|
|
|
# Initialize DETR pipeline for logo detection
|
|
self.logger.info(f"Loading DETR model: {detr_model_path}")
|
|
self.detr_pipe = pipeline(
|
|
task="object-detection",
|
|
model=detr_model_path,
|
|
device=self.device_index,
|
|
use_fast=True,
|
|
)
|
|
|
|
# Resolve CLIP model path (check local first, then use HuggingFace name)
|
|
clip_model_path = self._resolve_model_path(
|
|
clip_model, default_clip_dir, "CLIP"
|
|
)
|
|
|
|
# Initialize CLIP model for feature extraction
|
|
self.logger.info(f"Loading CLIP model: {clip_model_path}")
|
|
self.clip_model = CLIPModel.from_pretrained(clip_model_path).to(self.device)
|
|
self.clip_processor = CLIPProcessor.from_pretrained(clip_model_path)
|
|
|
|
self.logger.info("DetectLogosDETR initialization complete")
|
|
|
|
def _resolve_model_path(
|
|
self, model_name_or_path: str, default_local_dir: str, model_type: str
|
|
) -> str:
|
|
"""
|
|
Resolve model path, checking for local models before using HuggingFace.
|
|
|
|
Args:
|
|
model_name_or_path: HuggingFace model name or absolute path
|
|
default_local_dir: Default local directory to check
|
|
model_type: Type of model (for logging, e.g., "DETR" or "CLIP")
|
|
|
|
Returns:
|
|
Resolved model path (local path or HuggingFace model name)
|
|
"""
|
|
# If it's an absolute path, use it directly
|
|
if os.path.isabs(model_name_or_path):
|
|
if os.path.exists(model_name_or_path):
|
|
self.logger.info(
|
|
f"{model_type} model: Using local model at {model_name_or_path}"
|
|
)
|
|
return model_name_or_path
|
|
else:
|
|
self.logger.warning(
|
|
f"{model_type} model: Local path {model_name_or_path} does not exist, "
|
|
f"falling back to HuggingFace"
|
|
)
|
|
return model_name_or_path
|
|
|
|
# Check if default local directory exists
|
|
if os.path.exists(default_local_dir):
|
|
# Verify it's a valid model directory (has config.json)
|
|
config_file = os.path.join(default_local_dir, "config.json")
|
|
if os.path.exists(config_file):
|
|
abs_path = os.path.abspath(default_local_dir)
|
|
self.logger.info(
|
|
f"{model_type} model: Found local model at {abs_path}"
|
|
)
|
|
return abs_path
|
|
else:
|
|
self.logger.warning(
|
|
f"{model_type} model: Local directory {default_local_dir} exists but "
|
|
f"is not a valid model (missing config.json)"
|
|
)
|
|
|
|
# Use HuggingFace model name
|
|
self.logger.info(
|
|
f"{model_type} model: No local model found, will download from HuggingFace: "
|
|
f"{model_name_or_path}"
|
|
)
|
|
return model_name_or_path
|
|
|
|
def detect(self, image: np.ndarray) -> List[Dict[str, Any]]:
|
|
"""
|
|
Detect logos in an image and return bounding boxes with CLIP embeddings.
|
|
|
|
Args:
|
|
image: OpenCV image (BGR format, numpy array)
|
|
|
|
Returns:
|
|
List of dictionaries, each containing:
|
|
- 'box': dict with 'xmin', 'ymin', 'xmax', 'ymax' (pixel coordinates)
|
|
- 'score': DETR confidence score (float 0-1)
|
|
- 'embedding': CLIP feature embedding (torch.Tensor)
|
|
- 'label': DETR predicted label (string)
|
|
"""
|
|
# Convert OpenCV BGR to RGB PIL Image
|
|
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
|
pil_image = Image.fromarray(image_rgb)
|
|
|
|
# Run DETR detection
|
|
predictions = self.detr_pipe(pil_image)
|
|
|
|
# Filter by threshold and size, then add CLIP embeddings
|
|
detections = []
|
|
for pred in predictions:
|
|
score = pred.get("score", 0.0)
|
|
if score < self.detr_threshold:
|
|
continue
|
|
|
|
box = pred.get("box", {})
|
|
xmin = box.get("xmin", 0)
|
|
ymin = box.get("ymin", 0)
|
|
xmax = box.get("xmax", 0)
|
|
ymax = box.get("ymax", 0)
|
|
|
|
# Filter by minimum box size
|
|
box_width = xmax - xmin
|
|
box_height = ymax - ymin
|
|
if box_width < self.min_box_size or box_height < self.min_box_size:
|
|
continue
|
|
|
|
# Extract bounding box region
|
|
bbox_crop = pil_image.crop((xmin, ymin, xmax, ymax))
|
|
|
|
# Get CLIP embedding for this region
|
|
embedding = self._get_clip_embedding_pil(bbox_crop)
|
|
|
|
detections.append(
|
|
{
|
|
"box": {"xmin": xmin, "ymin": ymin, "xmax": xmax, "ymax": ymax},
|
|
"score": score,
|
|
"embedding": embedding,
|
|
"label": pred.get("label", "logo"),
|
|
}
|
|
)
|
|
|
|
# Apply Non-Maximum Suppression to remove overlapping detections
|
|
detections = self._apply_nms(detections, self.nms_iou_threshold)
|
|
|
|
self.logger.debug(f"Detected {len(detections)} logos (threshold: {self.detr_threshold})")
|
|
return detections
|
|
|
|
def _apply_nms(self, predictions: List[Dict], iou_threshold: float) -> List[Dict]:
|
|
"""
|
|
Apply Non-Maximum Suppression to remove overlapping detections.
|
|
|
|
Args:
|
|
predictions: List of prediction dictionaries with 'box' and 'score'
|
|
iou_threshold: IoU threshold for considering boxes as overlapping
|
|
|
|
Returns:
|
|
Filtered list of predictions after NMS
|
|
"""
|
|
if len(predictions) == 0:
|
|
return []
|
|
|
|
# Extract boxes and scores
|
|
boxes = []
|
|
scores = []
|
|
for pred in predictions:
|
|
box = pred.get("box", {})
|
|
boxes.append([
|
|
box.get("xmin", 0),
|
|
box.get("ymin", 0),
|
|
box.get("xmax", 0),
|
|
box.get("ymax", 0)
|
|
])
|
|
scores.append(pred.get("score", 0.0))
|
|
|
|
# Convert to numpy arrays
|
|
boxes = np.array(boxes, dtype=np.float32)
|
|
scores = np.array(scores, dtype=np.float32)
|
|
|
|
# Sort by scores (descending)
|
|
sorted_indices = np.argsort(scores)[::-1]
|
|
|
|
keep_indices = []
|
|
while len(sorted_indices) > 0:
|
|
# Keep the box with highest score
|
|
current_idx = sorted_indices[0]
|
|
keep_indices.append(current_idx)
|
|
|
|
if len(sorted_indices) == 1:
|
|
break
|
|
|
|
# Calculate IoU with remaining boxes
|
|
current_box = boxes[current_idx]
|
|
remaining_boxes = boxes[sorted_indices[1:]]
|
|
|
|
ious = self._calculate_iou_batch(current_box, remaining_boxes)
|
|
|
|
# Keep only boxes with IoU below threshold
|
|
mask = ious < iou_threshold
|
|
sorted_indices = sorted_indices[1:][mask]
|
|
|
|
# Return predictions for kept indices
|
|
return [predictions[i] for i in keep_indices]
|
|
|
|
def _calculate_iou_batch(self, box: np.ndarray, boxes: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Calculate IoU between one box and multiple boxes.
|
|
|
|
Args:
|
|
box: Single box [xmin, ymin, xmax, ymax]
|
|
boxes: Multiple boxes [[xmin, ymin, xmax, ymax], ...]
|
|
|
|
Returns:
|
|
Array of IoU values
|
|
"""
|
|
# Calculate intersection coordinates
|
|
x1 = np.maximum(box[0], boxes[:, 0])
|
|
y1 = np.maximum(box[1], boxes[:, 1])
|
|
x2 = np.minimum(box[2], boxes[:, 2])
|
|
y2 = np.minimum(box[3], boxes[:, 3])
|
|
|
|
# Calculate intersection area
|
|
intersection = np.maximum(0, x2 - x1) * np.maximum(0, y2 - y1)
|
|
|
|
# Calculate union area
|
|
box_area = (box[2] - box[0]) * (box[3] - box[1])
|
|
boxes_area = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
|
|
union = box_area + boxes_area - intersection
|
|
|
|
# Calculate IoU
|
|
iou = intersection / (union + 1e-6) # Add small epsilon to avoid division by zero
|
|
|
|
return iou
|
|
|
|
def get_embedding(self, image: np.ndarray) -> torch.Tensor:
|
|
"""
|
|
Get CLIP embedding for a reference logo image.
|
|
|
|
This method is used to compute embeddings for reference logos
|
|
that will be compared against detected regions.
|
|
|
|
Args:
|
|
image: OpenCV image (BGR format, numpy array)
|
|
|
|
Returns:
|
|
Normalized CLIP feature embedding (torch.Tensor, shape: [1, 512])
|
|
"""
|
|
# Convert OpenCV BGR to RGB PIL Image
|
|
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
|
pil_image = Image.fromarray(image_rgb)
|
|
|
|
return self._get_clip_embedding_pil(pil_image)
|
|
|
|
def _get_clip_embedding_pil(self, pil_image: Image.Image) -> torch.Tensor:
|
|
"""
|
|
Internal method to get CLIP embedding from PIL image.
|
|
|
|
Args:
|
|
pil_image: PIL Image (RGB format)
|
|
|
|
Returns:
|
|
Normalized CLIP feature embedding (torch.Tensor)
|
|
"""
|
|
# Process image through CLIP
|
|
inputs = self.clip_processor(images=pil_image, return_tensors="pt").to(self.device)
|
|
|
|
with torch.no_grad():
|
|
features = self.clip_model.get_image_features(**inputs)
|
|
# Normalize for cosine similarity
|
|
features = F.normalize(features, dim=-1)
|
|
|
|
return features
|
|
|
|
def compare_embeddings(
|
|
self, embedding1: torch.Tensor, embedding2: torch.Tensor
|
|
) -> float:
|
|
"""
|
|
Compute cosine similarity between two CLIP embeddings.
|
|
|
|
Args:
|
|
embedding1: First CLIP embedding (torch.Tensor)
|
|
embedding2: Second CLIP embedding (torch.Tensor)
|
|
|
|
Returns:
|
|
Cosine similarity score (float, range: -1 to 1, typically 0 to 1)
|
|
"""
|
|
# Ensure tensors are on the same device
|
|
if embedding1.device != embedding2.device:
|
|
embedding2 = embedding2.to(embedding1.device)
|
|
|
|
# Compute cosine similarity
|
|
similarity = F.cosine_similarity(embedding1, embedding2, dim=-1)
|
|
|
|
# Return as Python float
|
|
return similarity.item()
|
|
|
|
def find_best_match(
|
|
self,
|
|
detected_embedding: torch.Tensor,
|
|
reference_embeddings: List[Tuple[str, torch.Tensor]],
|
|
similarity_threshold: float = 0.7,
|
|
) -> Optional[Tuple[str, float]]:
|
|
"""
|
|
Find the best matching reference logo for a detected embedding.
|
|
|
|
Args:
|
|
detected_embedding: CLIP embedding from detected logo region
|
|
reference_embeddings: List of (label, embedding) tuples for reference logos
|
|
similarity_threshold: Minimum similarity to consider a match (0-1)
|
|
|
|
Returns:
|
|
Tuple of (label, similarity) for best match, or None if no match above threshold
|
|
"""
|
|
if not reference_embeddings:
|
|
return None
|
|
|
|
best_similarity = -1.0
|
|
best_label = None
|
|
|
|
for label, ref_embedding in reference_embeddings:
|
|
similarity = self.compare_embeddings(detected_embedding, ref_embedding)
|
|
|
|
if similarity > best_similarity:
|
|
best_similarity = similarity
|
|
best_label = label
|
|
|
|
if best_similarity >= similarity_threshold:
|
|
return (best_label, best_similarity)
|
|
else:
|
|
return None
|
|
|
|
def find_best_match_multi_ref(
|
|
self,
|
|
detected_embedding: torch.Tensor,
|
|
reference_embeddings: Dict[str, List[torch.Tensor]],
|
|
similarity_threshold: float = 0.85,
|
|
min_matching_refs: int = 1,
|
|
use_mean_similarity: bool = True,
|
|
margin: float = 0.0,
|
|
) -> Optional[Tuple[str, float, int]]:
|
|
"""
|
|
Find the best matching reference logo using multiple reference embeddings per logo.
|
|
|
|
This method improves accuracy by using multiple reference images for each logo
|
|
and requiring consistency across references.
|
|
|
|
Args:
|
|
detected_embedding: CLIP embedding from detected logo region
|
|
reference_embeddings: Dict mapping logo name to list of embeddings
|
|
similarity_threshold: Minimum similarity to consider a match (0-1)
|
|
min_matching_refs: Minimum number of references that must match above threshold
|
|
use_mean_similarity: If True, use mean similarity across all refs; if False, use max
|
|
margin: Required margin between best and second-best logo scores (0-1)
|
|
|
|
Returns:
|
|
Tuple of (label, similarity, num_matching_refs) for best match,
|
|
or None if no match meets criteria
|
|
"""
|
|
if not reference_embeddings:
|
|
return None
|
|
|
|
# Calculate scores for all logos that meet the min_matching_refs requirement
|
|
logo_scores = []
|
|
|
|
for label, ref_embedding_list in reference_embeddings.items():
|
|
if not ref_embedding_list:
|
|
continue
|
|
|
|
# Calculate similarity to each reference embedding
|
|
similarities = []
|
|
for ref_embedding in ref_embedding_list:
|
|
sim = self.compare_embeddings(detected_embedding, ref_embedding)
|
|
similarities.append(sim)
|
|
|
|
# Count how many references match above threshold
|
|
num_matches = sum(1 for s in similarities if s >= similarity_threshold)
|
|
|
|
# Calculate aggregate score
|
|
if use_mean_similarity:
|
|
score = sum(similarities) / len(similarities)
|
|
else:
|
|
score = max(similarities)
|
|
|
|
# Only consider logos that meet the minimum matching refs requirement
|
|
if num_matches >= min_matching_refs:
|
|
logo_scores.append((label, score, num_matches))
|
|
|
|
if not logo_scores:
|
|
return None
|
|
|
|
# Sort by score descending
|
|
logo_scores.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
best_label, best_score, best_num_matches = logo_scores[0]
|
|
|
|
# Check if best score meets threshold
|
|
if best_score < similarity_threshold:
|
|
return None
|
|
|
|
# Check margin against second-best logo (if exists)
|
|
if margin > 0 and len(logo_scores) > 1:
|
|
second_best_score = logo_scores[1][1]
|
|
if best_score - second_best_score < margin:
|
|
return None # Not confident enough
|
|
|
|
return (best_label, best_score, best_num_matches)
|
|
|
|
def find_best_match_with_margin(
|
|
self,
|
|
detected_embedding: torch.Tensor,
|
|
reference_embeddings: List[Tuple[str, torch.Tensor]],
|
|
similarity_threshold: float = 0.85,
|
|
margin: float = 0.05,
|
|
) -> Optional[Tuple[str, float]]:
|
|
"""
|
|
Find best match with a confidence margin over the second-best match.
|
|
|
|
This reduces false positives by requiring the best match to be
|
|
significantly better than alternatives.
|
|
|
|
Args:
|
|
detected_embedding: CLIP embedding from detected logo region
|
|
reference_embeddings: List of (label, embedding) tuples for reference logos
|
|
similarity_threshold: Minimum similarity to consider a match (0-1)
|
|
margin: Required margin between best and second-best match
|
|
|
|
Returns:
|
|
Tuple of (label, similarity) for best match, or None if no confident match
|
|
"""
|
|
if not reference_embeddings:
|
|
return None
|
|
|
|
# Calculate all similarities
|
|
similarities = []
|
|
for label, ref_embedding in reference_embeddings:
|
|
sim = self.compare_embeddings(detected_embedding, ref_embedding)
|
|
similarities.append((label, sim))
|
|
|
|
# Sort by similarity descending
|
|
similarities.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
best_label, best_sim = similarities[0]
|
|
|
|
# Check if best is above threshold
|
|
if best_sim < similarity_threshold:
|
|
return None
|
|
|
|
# Check margin against second best (if exists)
|
|
if len(similarities) > 1:
|
|
second_best_sim = similarities[1][1]
|
|
if best_sim - second_best_sim < margin:
|
|
return None # Not confident enough
|
|
|
|
return (best_label, best_sim)
|
|
|
|
def detect_and_match(
|
|
self,
|
|
image: np.ndarray,
|
|
reference_embeddings: List[Tuple[str, torch.Tensor]],
|
|
similarity_threshold: float = 0.7,
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Detect logos and match them against reference embeddings in one step.
|
|
|
|
This is a convenience method that combines detection and matching.
|
|
|
|
Args:
|
|
image: OpenCV image (BGR format, numpy array)
|
|
reference_embeddings: List of (label, embedding) tuples for reference logos
|
|
similarity_threshold: Minimum similarity to consider a match (0-1)
|
|
|
|
Returns:
|
|
List of matched detections, each containing:
|
|
- 'box': bounding box coordinates
|
|
- 'detr_score': DETR confidence score
|
|
- 'clip_similarity': CLIP similarity score
|
|
- 'label': matched reference logo label
|
|
"""
|
|
# Detect all logos
|
|
detections = self.detect(image)
|
|
|
|
# Match each detection against references
|
|
matched_detections = []
|
|
for detection in detections:
|
|
match_result = self.find_best_match(
|
|
detection["embedding"], reference_embeddings, similarity_threshold
|
|
)
|
|
|
|
if match_result is not None:
|
|
label, similarity = match_result
|
|
matched_detections.append(
|
|
{
|
|
"box": detection["box"],
|
|
"detr_score": detection["score"],
|
|
"clip_similarity": similarity,
|
|
"label": label,
|
|
}
|
|
)
|
|
|
|
self.logger.debug(
|
|
f"Matched {len(matched_detections)}/{len(detections)} detections "
|
|
f"(threshold: {similarity_threshold})"
|
|
)
|
|
|
|
return matched_detections |