Source code for rowvoi.core
"""Core types and data structures for rowvoi.
This module defines the fundamental building blocks used throughout the rowvoi package:
- CandidateState: Represents current uncertainty over which row is "the one"
- FeatureSuggestion: A recommendation for which column to query next
"""
from collections.abc import Hashable, Mapping, Sequence
from dataclasses import dataclass
from typing import Any
import numpy as np
import pandas as pd
# Type aliases
RowIndex = Hashable
ColName = Hashable
[docs]
@dataclass
class CandidateState:
"""Represents the current uncertainty over which row is "the one".
Attributes
----------
candidate_rows : Sequence[RowIndex]
List of row indices under consideration
posterior : np.ndarray
Probabilities over candidate_rows, shape (n_candidates,)
Use uniform if deterministic / no model
observed_cols : set[ColName]
Set of columns that have been queried
observed_values : Mapping[ColName, Any]
Mapping col -> observed value (may be empty in planning mode)
"""
candidate_rows: Sequence[RowIndex]
posterior: np.ndarray
observed_cols: set[ColName]
observed_values: Mapping[ColName, Any]
[docs]
def __post_init__(self):
"""Validate state consistency."""
if len(self.candidate_rows) != len(self.posterior):
raise ValueError(
f"Length mismatch: {len(self.candidate_rows)} candidates "
f"but posterior has shape {self.posterior.shape}"
)
# Allow empty states (sum=0) or properly normalized states (sum=1)
if len(self.posterior) > 0 and not np.allclose(self.posterior.sum(), 1.0):
raise ValueError(f"Posterior must sum to 1, got {self.posterior.sum()}")
@property
def entropy(self) -> float:
"""Shannon entropy H(posterior) in bits."""
# Avoid log(0) by filtering out zero probabilities
nonzero = self.posterior[self.posterior > 0]
if len(nonzero) == 0:
return 0.0
return -np.sum(nonzero * np.log2(nonzero))
@property
def max_posterior(self) -> float:
"""max_r p(r | E)."""
return np.max(self.posterior) if len(self.posterior) > 0 else 0.0
@property
def residual_uncertainty(self) -> float:
"""1 - max_posterior."""
return 1.0 - self.max_posterior
@property
def is_unique(self) -> bool:
"""True if there is a single candidate with posterior ~1."""
return len(self.candidate_rows) == 1 or (
len(self.candidate_rows) > 1 and self.max_posterior > 0.99999
)
@property
def unique_row(self) -> RowIndex | None:
"""Return the most probable row if unique, else None."""
if self.is_unique and len(self.candidate_rows) > 0:
idx = np.argmax(self.posterior)
return self.candidate_rows[idx]
return None
[docs]
def filter_candidates(
self, df: pd.DataFrame, col: ColName, value: Any
) -> "CandidateState":
"""Filter candidates to those matching the observed value.
Parameters
----------
df : pd.DataFrame
The data frame containing candidate rows
col : ColName
The column that was observed
value : Any
The observed value
Returns
-------
CandidateState
New state with filtered candidates and renormalized posterior
"""
# Find which candidates match the observed value
matching_mask = []
new_candidates = []
new_posterior_values = []
for i, row_idx in enumerate(self.candidate_rows):
if df.iloc[row_idx][col] == value:
matching_mask.append(True)
new_candidates.append(row_idx)
new_posterior_values.append(self.posterior[i])
else:
matching_mask.append(False)
# Renormalize posterior
if new_posterior_values:
new_posterior = np.array(new_posterior_values)
new_posterior /= new_posterior.sum()
else:
new_posterior = np.array([])
# Update observed columns and values
new_observed_cols = self.observed_cols | {col}
new_observed_values = dict(self.observed_values)
new_observed_values[col] = value
return CandidateState(
candidate_rows=new_candidates,
posterior=new_posterior,
observed_cols=new_observed_cols,
observed_values=new_observed_values,
)
[docs]
@dataclass
class FeatureSuggestion:
"""A recommendation of which column to query next.
Attributes
----------
col : ColName
The column name suggested to query next
score : float
Raw score used to rank columns (e.g., MI, coverage gain)
expected_voi : float, optional
Expected value of information in bits
marginal_cost : float, optional
Cost of querying this column
debug : dict[str, Any], optional
Additional debugging information
"""
col: ColName
score: float
expected_voi: float | None = None
marginal_cost: float | None = None
debug: dict[str, Any] | None = None
@property
def cost_adjusted_score(self) -> float:
"""Score divided by cost (if cost is available)."""
if self.marginal_cost is not None and self.marginal_cost > 0:
return self.score / self.marginal_cost
return self.score