Data Curation and Filtering
25 min
Data Curation and Filtering
Raw data collected from the web or sensors is noisy and inconsistent. Data curation transforms this raw data into high-quality training sets for world models.
The Curation Pipeline
Raw Data → Filtering → Annotation → Deduplication → Organization → Training Data
Raw Data → Filtering → Annotation → Deduplication → Organization → Training Data
Filtering Strategies
1. Technical Quality Filtering
python
class TechnicalQualityFilter:
def __init__(self):
self.min_resolution = (720, 480)
self.min_fps = 24
self.max_compression_artifacts = 0.3
def filter(self, video_metadata):
checks = [
video_metadata.resolution >= self.min_resolution,
video_metadata.fps >= self.min_fps,
video_metadata.quality_score > self.max_compression_artifacts,
not video_metadata.has_watermark,
not video_metadata.is_corrupted
]
return all(checks)
class TechnicalQualityFilter:
def __init__(self):
self.min_resolution = (720, 480)
self.min_fps = 24
self.max_compression_artifacts = 0.3
def filter(self, video_metadata):
checks = [
video_metadata.resolution >= self.min_resolution,
video_metadata.fps >= self.min_fps,
video_metadata.quality_score > self.max_compression_artifacts,
not video_metadata.has_watermark,
not video_metadata.is_corrupted
]
return all(checks)
2. Content Quality Filtering
Using vision-language models to assess content:
python
from transformers import CLIPModel, CLIPProcessor
class ContentQualityFilter:
def __init__(self):
self.clip = CLIPModel.from_pretrained("openai/clip-vit-large")
self.processor = CLIPProcessor.from_pretrained("openai/clip-vit-large")
# Define quality prompts
self.positive_prompts = [
"a clear, high-quality video",
"professional footage",
"well-lit scene"
]
self.negative_prompts = [
"blurry, low-quality video",
"amateur footage",
"dark, poorly lit scene"
]
def score(self, frame):
inputs = self.processor(
images=frame,
text=self.positive_prompts + self.negative_prompts,
return_tensors="pt"
)
outputs = self.clip(**inputs)
pos_score = outputs.logits_per_image[:, :len(self.positive_prompts)].mean()
neg_score = outputs.logits_per_image[:, len(self.positive_prompts):].mean()
return pos_score - neg_score
from transformers import CLIPModel, CLIPProcessor
class ContentQualityFilter:
def __init__(self):
self.clip = CLIPModel.from_pretrained("openai/clip-vit-large")
self.processor = CLIPProcessor.from_pretrained("openai/clip-vit-large")
# Define quality prompts
self.positive_prompts = [
"a clear, high-quality video",
"professional footage",
"well-lit scene"
]
self.negative_prompts = [
"blurry, low-quality video",
"amateur footage",
"dark, poorly lit scene"
]
def score(self, frame):
inputs = self.processor(
images=frame,
text=self.positive_prompts + self.negative_prompts,
return_tensors="pt"
)
outputs = self.clip(**inputs)
pos_score = outputs.logits_per_image[:, :len(self.positive_prompts)].mean()
neg_score = outputs.logits_per_image[:, len(self.positive_prompts):].mean()
return pos_score - neg_score
3. Motion and Dynamics Filtering
Filter for videos with meaningful motion:
python
import cv2
import numpy as np
def compute_optical_flow_magnitude(video_path):
cap = cv2.VideoCapture(video_path)
prev_frame = None
flow_magnitudes = []
while True:
ret, frame = cap.read()
if not ret:
break
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if prev_frame is not None:
flow = cv2.calcOpticalFlowFarneback(
prev_frame, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0
)
magnitude = np.sqrt(flow[..., 0]**2 + flow[..., 1]**2)
flow_magnitudes.append(magnitude.mean())
prev_frame = gray
return np.mean(flow_magnitudes)
def filter_by_motion(video_path, min_motion=2.0, max_motion=50.0):
"""Filter videos with too little or too much motion"""
motion = compute_optical_flow_magnitude(video_path)
return min_motion <= motion <= max_motion
import cv2
import numpy as np
def compute_optical_flow_magnitude(video_path):
cap = cv2.VideoCapture(video_path)
prev_frame = None
flow_magnitudes = []
while True:
ret, frame = cap.read()
if not ret:
break
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if prev_frame is not None:
flow = cv2.calcOpticalFlowFarneback(
prev_frame, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0
)
magnitude = np.sqrt(flow[..., 0]**2 + flow[..., 1]**2)
flow_magnitudes.append(magnitude.mean())
prev_frame = gray
return np.mean(flow_magnitudes)
def filter_by_motion(video_path, min_motion=2.0, max_motion=50.0):
"""Filter videos with too little or too much motion"""
motion = compute_optical_flow_magnitude(video_path)
return min_motion <= motion <= max_motion
Annotation
Automatic Annotation with VLMs
python
from transformers import Blip2Processor, Blip2ForConditionalGeneration
class VideoAnnotator:
def __init__(self):
self.processor = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b")
self.model = Blip2ForConditionalGeneration.from_pretrained("Salesforce/blip2-opt-2.7b")
def annotate_frame(self, frame):
inputs = self.processor(images=frame, return_tensors="pt")
generated_ids = self.model.generate(**inputs, max_length=50)
caption = self.processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
return caption
def annotate_video(self, video_path, sample_rate=1):
"""Annotate video by sampling frames"""
frames = extract_frames(video_path, sample_rate)
annotations = [self.annotate_frame(f) for f in frames]
return {
"frame_captions": annotations,
"video_summary": self.summarize(annotations)
}
from transformers import Blip2Processor, Blip2ForConditionalGeneration
class VideoAnnotator:
def __init__(self):
self.processor = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b")
self.model = Blip2ForConditionalGeneration.from_pretrained("Salesforce/blip2-opt-2.7b")
def annotate_frame(self, frame):
inputs = self.processor(images=frame, return_tensors="pt")
generated_ids = self.model.generate(**inputs, max_length=50)
caption = self.processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
return caption
def annotate_video(self, video_path, sample_rate=1):
"""Annotate video by sampling frames"""
frames = extract_frames(video_path, sample_rate)
annotations = [self.annotate_frame(f) for f in frames]
return {
"frame_captions": annotations,
"video_summary": self.summarize(annotations)
}
Deduplication
Remove redundant data to improve training efficiency:
python
from sentence_transformers import SentenceTransformer
import faiss
class SemanticDeduplicator:
def __init__(self):
self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
self.index = None
self.threshold = 0.95
def build_index(self, embeddings):
dimension = embeddings.shape[1]
self.index = faiss.IndexFlatIP(dimension)
faiss.normalize_L2(embeddings)
self.index.add(embeddings)
def find_duplicates(self, embeddings):
faiss.normalize_L2(embeddings)
distances, indices = self.index.search(embeddings, k=10)
duplicates = set()
for i, (dists, idxs) in enumerate(zip(distances, indices)):
for d, j in zip(dists, idxs):
if i != j and d > self.threshold:
duplicates.add(max(i, j)) # Keep lower index
return duplicates
from sentence_transformers import SentenceTransformer
import faiss
class SemanticDeduplicator:
def __init__(self):
self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
self.index = None
self.threshold = 0.95
def build_index(self, embeddings):
dimension = embeddings.shape[1]
self.index = faiss.IndexFlatIP(dimension)
faiss.normalize_L2(embeddings)
self.index.add(embeddings)
def find_duplicates(self, embeddings):
faiss.normalize_L2(embeddings)
distances, indices = self.index.search(embeddings, k=10)
duplicates = set()
for i, (dists, idxs) in enumerate(zip(distances, indices)):
for d, j in zip(dists, idxs):
if i != j and d > self.threshold:
duplicates.add(max(i, j)) # Keep lower index
return duplicates
Summary
Data curation is essential for training high-quality world models. Effective curation combines technical filtering, content assessment, automatic annotation, and deduplication to create clean, diverse training datasets.