Back to Data Processing & Curation

Data Curation and Filtering

25 min

Data Curation and Filtering

Raw data collected from the web or sensors is noisy and inconsistent. Data curation transforms this raw data into high-quality training sets for world models.

The Curation Pipeline

Raw Data → Filtering → Annotation → Deduplication → Organization → Training Data

Filtering Strategies

1. Technical Quality Filtering

python
class TechnicalQualityFilter:
    def __init__(self):
        self.min_resolution = (720, 480)
        self.min_fps = 24
        self.max_compression_artifacts = 0.3
    
    def filter(self, video_metadata):
        checks = [
            video_metadata.resolution >= self.min_resolution,
            video_metadata.fps >= self.min_fps,
            video_metadata.quality_score > self.max_compression_artifacts,
            not video_metadata.has_watermark,
            not video_metadata.is_corrupted
        ]
        return all(checks)

2. Content Quality Filtering

Using vision-language models to assess content:

python
from transformers import CLIPModel, CLIPProcessor

class ContentQualityFilter:
    def __init__(self):
        self.clip = CLIPModel.from_pretrained("openai/clip-vit-large")
        self.processor = CLIPProcessor.from_pretrained("openai/clip-vit-large")
        
        # Define quality prompts
        self.positive_prompts = [
            "a clear, high-quality video",
            "professional footage",
            "well-lit scene"
        ]
        self.negative_prompts = [
            "blurry, low-quality video",
            "amateur footage",
            "dark, poorly lit scene"
        ]
    
    def score(self, frame):
        inputs = self.processor(
            images=frame,
            text=self.positive_prompts + self.negative_prompts,
            return_tensors="pt"
        )
        outputs = self.clip(**inputs)
        
        pos_score = outputs.logits_per_image[:, :len(self.positive_prompts)].mean()
        neg_score = outputs.logits_per_image[:, len(self.positive_prompts):].mean()
        
        return pos_score - neg_score

3. Motion and Dynamics Filtering

Filter for videos with meaningful motion:

python
import cv2
import numpy as np

def compute_optical_flow_magnitude(video_path):
    cap = cv2.VideoCapture(video_path)
    prev_frame = None
    flow_magnitudes = []
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        if prev_frame is not None:
            flow = cv2.calcOpticalFlowFarneback(
                prev_frame, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0
            )
            magnitude = np.sqrt(flow[..., 0]**2 + flow[..., 1]**2)
            flow_magnitudes.append(magnitude.mean())
        
        prev_frame = gray
    
    return np.mean(flow_magnitudes)

def filter_by_motion(video_path, min_motion=2.0, max_motion=50.0):
    """Filter videos with too little or too much motion"""
    motion = compute_optical_flow_magnitude(video_path)
    return min_motion <= motion <= max_motion

Annotation

Automatic Annotation with VLMs

python
from transformers import Blip2Processor, Blip2ForConditionalGeneration

class VideoAnnotator:
    def __init__(self):
        self.processor = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b")
        self.model = Blip2ForConditionalGeneration.from_pretrained("Salesforce/blip2-opt-2.7b")
    
    def annotate_frame(self, frame):
        inputs = self.processor(images=frame, return_tensors="pt")
        generated_ids = self.model.generate(**inputs, max_length=50)
        caption = self.processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
        return caption
    
    def annotate_video(self, video_path, sample_rate=1):
        """Annotate video by sampling frames"""
        frames = extract_frames(video_path, sample_rate)
        annotations = [self.annotate_frame(f) for f in frames]
        return {
            "frame_captions": annotations,
            "video_summary": self.summarize(annotations)
        }

Deduplication

Remove redundant data to improve training efficiency:

python
from sentence_transformers import SentenceTransformer
import faiss

class SemanticDeduplicator:
    def __init__(self):
        self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
        self.index = None
        self.threshold = 0.95
    
    def build_index(self, embeddings):
        dimension = embeddings.shape[1]
        self.index = faiss.IndexFlatIP(dimension)
        faiss.normalize_L2(embeddings)
        self.index.add(embeddings)
    
    def find_duplicates(self, embeddings):
        faiss.normalize_L2(embeddings)
        distances, indices = self.index.search(embeddings, k=10)
        
        duplicates = set()
        for i, (dists, idxs) in enumerate(zip(distances, indices)):
            for d, j in zip(dists, idxs):
                if i != j and d > self.threshold:
                    duplicates.add(max(i, j))  # Keep lower index
        
        return duplicates

Summary

Data curation is essential for training high-quality world models. Effective curation combines technical filtering, content assessment, automatic annotation, and deduplication to create clean, diverse training datasets.