Skip to content

geno_lewm.planning.cem

cem

Pure-Python CEM solver core for RFC-0008 planning.

The predictor-backed planner and CLI still need runtime integration and benchmark evidence. This module provides the deterministic search loop over existing RelEdit samplers so downstream integrations can bind a real rollout/evaluation function without importing optional ML runtimes.

PlanningConfig dataclass

PlanningConfig(horizon: int = 5, n_iterations: int = 5, n_samples: int = 1024, n_elite: int = 64, cost_weight: float = 0.0, stopping_eps: float = 0.05, patience: int = 2, seed: int | None = None, smoothing: float = _DEFAULT_SMOOTHING)

CEM search configuration for a fixed-horizon edit sequence.

CandidateEvaluation dataclass

CandidateEvaluation(distance: float, predicted_state: Any | None = None)

Evaluation returned by a caller-provided rollout/scoring function.

CEMIterationLog dataclass

CEMIterationLog(iteration: int, best_distance: float, best_cost: float, best_objective: float, elite_mean_distance: float, elite_mean_objective: float, n_candidates: int)

Summary statistics for one CEM iteration.

PlanningResult dataclass

PlanningResult(best_edits: tuple[RelEdit, ...], best_distance: float, best_cost: float, best_objective: float, best_predicted_state: Any | None, n_evaluations: int, iterations: tuple[CEMIterationLog, ...], elapsed_seconds: float, stopped_reason: str)

Best edit sequence and reproducibility trace from a CEM run.

n_predictor_calls property

n_predictor_calls: int

Compatibility alias for the final predictor-backed API shape.

plan

plan(evaluate: Callable[[Sequence[RelEdit]], float | CandidateEvaluation], sampler: ActionSampler, *, config: PlanningConfig | None = None, cost_fn: Callable[[Sequence[RelEdit]], float] | None = None) -> PlanningResult

Run CEM over valid RelEdit sequences from sampler.

evaluate is the integration boundary: it receives a candidate edit sequence and returns either a non-negative distance or a :class:CandidateEvaluation with optional predicted-state payload. The objective minimized by CEM is distance + cost_weight * cost.

Source code in geno_lewm/planning/cem.py
def plan(
    evaluate: Callable[[Sequence[RelEdit]], float | CandidateEvaluation],
    sampler: ActionSampler,
    *,
    config: PlanningConfig | None = None,
    cost_fn: Callable[[Sequence[RelEdit]], float] | None = None,
) -> PlanningResult:
    """Run CEM over valid ``RelEdit`` sequences from ``sampler``.

    ``evaluate`` is the integration boundary: it receives a candidate edit
    sequence and returns either a non-negative distance or a
    :class:`CandidateEvaluation` with optional predicted-state payload.
    The objective minimized by CEM is ``distance + cost_weight * cost``.
    """

    if not callable(evaluate):
        raise InputError("evaluate must be callable")
    if not isinstance(sampler, ActionSampler):
        raise InputError(
            "sampler must be an ActionSampler",
            details={"type": type(sampler).__name__},
        )
    resolved_cost_fn = count_cost if cost_fn is None else cost_fn
    if not callable(resolved_cost_fn):
        raise InputError("cost_fn must be callable")

    resolved_config = config if config is not None else PlanningConfig()
    rng = random.Random(resolved_config.seed)
    current_sampler = _clone_sampler(sampler, rng=rng)
    best: _ScoredCandidate | None = None
    logs: list[CEMIterationLog] = []
    n_evaluations = 0
    stale_iterations = 0
    stopped_reason = "max_iterations"
    started = time.perf_counter()

    for iteration in range(1, resolved_config.n_iterations + 1):
        candidates = current_sampler.sample_sequences(
            resolved_config.n_samples,
            resolved_config.horizon,
        )
        scored = tuple(
            _score_candidate(
                edits,
                index=index,
                evaluate=evaluate,
                cost_fn=resolved_cost_fn,
                cost_weight=resolved_config.cost_weight,
            )
            for index, edits in enumerate(candidates)
        )
        n_evaluations += len(scored)
        ranked = sorted(scored, key=_candidate_sort_key)
        iteration_best = ranked[0]
        elites = tuple(candidate.edits for candidate in ranked[: resolved_config.n_elite])

        if best is None or _candidate_value_key(iteration_best) < _candidate_value_key(best):
            best = iteration_best
            stale_iterations = 0
        else:
            stale_iterations += 1

        logs.append(
            _iteration_log(
                iteration,
                ranked[: resolved_config.n_elite],
                iteration_best,
                n_candidates=len(scored),
            )
        )

        if best.evaluation.distance < resolved_config.stopping_eps:
            stopped_reason = "distance_threshold"
            break
        if stale_iterations >= resolved_config.patience:
            stopped_reason = "patience"
            break

        current_sampler = _refit_sampler(
            current_sampler,
            elites=elites,
            rng=rng,
            smoothing=resolved_config.smoothing,
        )

    if best is None:  # pragma: no cover - PlanningConfig validation prevents this.
        raise InputError("CEM produced no candidates")

    return PlanningResult(
        best_edits=best.edits,
        best_distance=best.evaluation.distance,
        best_cost=best.cost,
        best_objective=best.objective,
        best_predicted_state=best.evaluation.predicted_state,
        n_evaluations=n_evaluations,
        iterations=tuple(logs),
        elapsed_seconds=time.perf_counter() - started,
        stopped_reason=stopped_reason,
    )

l2_distance

l2_distance(predicted: Iterable[float], target: Iterable[float]) -> float

Return Euclidean distance between two finite numeric vectors.

Source code in geno_lewm/planning/cem.py
def l2_distance(predicted: Iterable[float], target: Iterable[float]) -> float:
    """Return Euclidean distance between two finite numeric vectors."""

    left, right = _paired_vectors(predicted, target)
    return math.sqrt(sum((a - b) ** 2 for a, b in zip(left, right, strict=True)))

cosine_distance

cosine_distance(predicted: Iterable[float], target: Iterable[float]) -> float

Return 1 - cosine_similarity for two non-zero vectors.

Source code in geno_lewm/planning/cem.py
def cosine_distance(predicted: Iterable[float], target: Iterable[float]) -> float:
    """Return ``1 - cosine_similarity`` for two non-zero vectors."""

    left, right = _paired_vectors(predicted, target)
    left_norm = math.sqrt(sum(value * value for value in left))
    right_norm = math.sqrt(sum(value * value for value in right))
    if left_norm <= 0.0 or right_norm <= 0.0:
        raise InputError("cosine distance requires non-zero vectors")
    cosine = sum(a * b for a, b in zip(left, right, strict=True)) / (left_norm * right_norm)
    return max(0.0, 1.0 - max(-1.0, min(1.0, cosine)))

region_distance

region_distance(predicted: Iterable[float], target: Iterable[float], indices: Iterable[int]) -> float

Return L2 distance restricted to explicit vector indices.

Source code in geno_lewm/planning/cem.py
def region_distance(
    predicted: Iterable[float],
    target: Iterable[float],
    indices: Iterable[int],
) -> float:
    """Return L2 distance restricted to explicit vector indices."""

    left, right = _paired_vectors(predicted, target)
    selected = _normalize_indices(indices, len(left))
    return l2_distance((left[idx] for idx in selected), (right[idx] for idx in selected))

projection_distance

projection_distance(predicted: Iterable[float], target: Iterable[float], projection: Iterable[Iterable[float]]) -> float

Return L2 distance after applying a row-major linear projection.

Source code in geno_lewm/planning/cem.py
def projection_distance(
    predicted: Iterable[float],
    target: Iterable[float],
    projection: Iterable[Iterable[float]],
) -> float:
    """Return L2 distance after applying a row-major linear projection."""

    left, right = _paired_vectors(predicted, target)
    rows = tuple(_normalize_projection_row(row, width=len(left)) for row in projection)
    if not rows:
        raise InputError("projection must contain at least one row")
    projected_left = tuple(
        sum(weight * value for weight, value in zip(row, left, strict=True)) for row in rows
    )
    projected_right = tuple(
        sum(weight * value for weight, value in zip(row, right, strict=True)) for row in rows
    )
    return l2_distance(projected_left, projected_right)