Skip to content

geno_lewm.deploy

deploy

Deployment runtime contract for RFC-0010.

BackendProbe dataclass

BackendProbe(backend: str, available: bool, reason: str)

Capability probe result for one runtime backend.

GenoLeWMRuntime

GenoLeWMRuntime(model_dir: str | Path, backend: str = BACKEND_AUTO, *, encoder: object | None = None, action_encoder: object | None = None, predictor: object | None = None, calibration: CalibrationTable | None = None)

Top-level runtime facade for on-device inference workflows.

Source code in geno_lewm/deploy/runtime.py
def __init__(
    self,
    model_dir: str | Path,
    backend: str = BACKEND_AUTO,
    *,
    encoder: object | None = None,
    action_encoder: object | None = None,
    predictor: object | None = None,
    calibration: CalibrationTable | None = None,
) -> None:
    root = Path(model_dir).expanduser()
    if not root.exists() or not root.is_dir():
        raise ModelNotFoundError(
            "model_dir must be an existing directory",
            details={"model_dir": str(root)},
        )
    self.model_dir = root
    self.manifest = _load_runtime_manifest(root)
    if self.manifest is not None:
        _verify_manifest_artifacts(root, self.manifest)
    self.probes = probe_backends(root)
    self.backend = select_backend(backend, probes=self.probes)
    self._scorer = _resolve_scorer_components(
        root,
        self.manifest,
        encoder=encoder,
        action_encoder=action_encoder,
        predictor=predictor,
        calibration=calibration,
    )

score_variant

score_variant(variant: EditSpec, window: str | None = None, *, receipt_path: str | Path | None = None) -> Any

Score a single variant through local scorer components when available.

Source code in geno_lewm/deploy/runtime.py
def score_variant(
    self,
    variant: EditSpec,
    window: str | None = None,
    *,
    receipt_path: str | Path | None = None,
) -> Any:
    """Score a single variant through local scorer components when available."""
    if not isinstance(variant, EditSpec):
        raise InputError(
            "variant must be an EditSpec",
            details={"type": type(variant).__name__},
        )
    normalized_window = None
    if window is not None:
        normalized_window = canonicalize_dna(window)
    scorer = self._scorer
    with fail_closed_network_guard(), torch_inference_context():
        if scorer is not None:
            if normalized_window is None:
                raise InputError(
                    "score_variant requires a reference window",
                    remediation="pass window=... or use score_vcf with a local FASTA",
                )
            result = score_surprise_variant(
                variant,
                scorer.encoder,
                scorer.action_encoder,
                scorer.predictor,
                scorer.calibration,
                reference_window=normalized_window,
            )
            if receipt_path is not None:
                _write_score_variant_receipt(
                    backend=self.backend,
                    model_dir=self.model_dir,
                    manifest=self.manifest,
                    variant=variant,
                    reference_window=normalized_window,
                    result=result,
                    receipt_path=receipt_path,
                )
            return result
        _raise_backend_not_ready("score_variant", self.backend, self.model_dir)

score_vcf

score_vcf(vcf_path: str | Path, fasta_path: str | Path, output_path: str | Path, batch_size: int = 64, progress: bool = True, *, receipt_path: str | Path | None = None) -> None

Score a VCF through local scorer components when available.

When receipt_path is provided, the runtime writes JSONL with one canonical v1 receipt per scored alternate. The v1 schema commits a single output, so this is intentionally not a batch aggregate receipt.

Source code in geno_lewm/deploy/runtime.py
def score_vcf(
    self,
    vcf_path: str | Path,
    fasta_path: str | Path,
    output_path: str | Path,
    batch_size: int = 64,
    progress: bool = True,
    *,
    receipt_path: str | Path | None = None,
) -> None:
    """Score a VCF through local scorer components when available.

    When ``receipt_path`` is provided, the runtime writes JSONL with
    one canonical v1 receipt per scored alternate. The v1 schema
    commits a single output, so this is intentionally not a batch
    aggregate receipt.
    """
    if not isinstance(batch_size, int) or isinstance(batch_size, bool) or batch_size <= 0:
        raise InputError(
            "batch_size must be a positive integer",
            details={"batch_size": batch_size, "type": type(batch_size).__name__},
        )
    if not isinstance(progress, bool):
        raise InputError(
            "progress must be bool",
            details={"type": type(progress).__name__},
        )
    # Normalize path-like values now so type errors surface at the API boundary.
    Path(vcf_path)
    Path(fasta_path)
    normalized_output = Path(output_path)
    normalized_receipt = None if receipt_path is None else Path(receipt_path)
    if normalized_receipt is not None and normalized_receipt == normalized_output:
        raise InputError("--receipt must differ from --output for VCF scoring")
    scorer = self._scorer
    with fail_closed_network_guard(), torch_inference_context():
        if scorer is not None:
            if normalized_receipt is None:
                score_surprise_vcf(
                    vcf_path,
                    scorer.encoder,
                    scorer.action_encoder,
                    scorer.predictor,
                    scorer.calibration,
                    normalized_output,
                    reference_fasta=fasta_path,
                    batch_size=batch_size,
                    show_progress=progress,
                )
            else:
                _write_vcf_scores_and_receipts(
                    backend=self.backend,
                    model_dir=self.model_dir,
                    manifest=self.manifest,
                    scorer=scorer,
                    vcf_path=vcf_path,
                    fasta_path=fasta_path,
                    output_path=normalized_output,
                    receipt_path=normalized_receipt,
                    batch_size=batch_size,
                )
            return
        _raise_backend_not_ready("score_vcf", self.backend, self.model_dir)

encode_window

encode_window(window: str, edit_locus: int | None = None) -> Any

Encode a DNA window once the encoder backend is installed.

Source code in geno_lewm/deploy/runtime.py
def encode_window(self, window: str, edit_locus: int | None = None) -> Any:
    """Encode a DNA window once the encoder backend is installed."""
    canonicalize_dna(window)
    if edit_locus is not None and (
        not isinstance(edit_locus, int) or isinstance(edit_locus, bool) or edit_locus < 0
    ):
        raise InputError(
            "edit_locus must be a non-negative integer or None",
            details={"edit_locus": edit_locus, "type": type(edit_locus).__name__},
        )
    with fail_closed_network_guard():
        _raise_backend_not_ready("encode_window", self.backend, self.model_dir)

predict

predict(state: Any, edits: Sequence[RelEdit]) -> Any

Run the predictor once a predictor backend is installed.

Source code in geno_lewm/deploy/runtime.py
def predict(self, state: Any, edits: Sequence[RelEdit]) -> Any:
    """Run the predictor once a predictor backend is installed."""
    if state is None:
        raise InputError("state must be non-None")
    if not isinstance(edits, Sequence):
        raise InputError(
            "edits must be a sequence of RelEdit values",
            details={"type": type(edits).__name__},
        )
    for idx, edit in enumerate(edits):
        if not isinstance(edit, RelEdit):
            raise InputError(
                "edits must contain RelEdit values",
                details={"index": idx, "type": type(edit).__name__},
            )
    with fail_closed_network_guard():
        _raise_backend_not_ready("predict", self.backend, self.model_dir)

fail_closed_network_guard

fail_closed_network_guard() -> Iterator[None]

Block common network entry points inside an inference path.

Source code in geno_lewm/deploy/runtime.py
@contextlib.contextmanager
def fail_closed_network_guard() -> Iterator[None]:
    """Block common network entry points inside an inference path."""

    def _blocked(*_args: Any, **_kwargs: Any) -> NoReturn:
        raise NetworkCallProhibitedError(
            "runtime network call attempted after setup",
            remediation="perform downloads only through explicit setup/update commands",
        )

    with contextlib.ExitStack() as stack:
        for target in (
            "socket.create_connection",
            "socket.socket.connect",
            "urllib.request.urlopen",
            "http.client.HTTPConnection.connect",
            "http.client.HTTPSConnection.connect",
        ):
            stack.enter_context(patch(target, _blocked))
        yield

load_scorer_modules

load_scorer_modules(model_dir: Path | str) -> tuple[object, object, object]

Load (encoder, action_encoder, predictor) from a model dir without calibration.

Used by calibration-table generation, which must run the model over a background set before calibration.parquet exists and therefore cannot use :class:GenoLeWMRuntime (whose scorer requires a calibration artifact). Requires geno-lewm[train] (torch + transformers + safetensors) and a manifest.json plus the exported predictor/action_encoder safetensors artifacts.

Source code in geno_lewm/deploy/runtime.py
def load_scorer_modules(model_dir: Path | str) -> tuple[object, object, object]:
    """Load (encoder, action_encoder, predictor) from a model dir without calibration.

    Used by calibration-table generation, which must run the model over a
    background set *before* ``calibration.parquet`` exists and therefore cannot
    use :class:`GenoLeWMRuntime` (whose scorer requires a calibration artifact).
    Requires ``geno-lewm[train]`` (torch + transformers + safetensors) and a
    ``manifest.json`` plus the exported ``predictor``/``action_encoder``
    safetensors artifacts.
    """
    model_dir = Path(model_dir)
    manifest = _load_runtime_manifest(model_dir)
    if manifest is None:
        raise ModelNotFoundError(
            "model_dir must contain manifest.json",
            details={"model_dir": str(model_dir)},
        )
    if not _native_scorer_runtime_available():
        raise RuntimeSetupError(
            "scoring requires torch, transformers, and safetensors",
            remediation="install geno-lewm[train]",
        )
    try:
        return _build_loaded_scorer_modules(model_dir, manifest)
    except RuntimeSetupError:
        raise
    except Exception as exc:
        raise RuntimeSetupError(
            "could not load scorer modules from model_dir",
            details={"model_dir": str(model_dir), "error": str(exc)},
            remediation=(
                "verify that the checkpoint was exported for this geno-lewm version "
                "and install geno-lewm[train]"
            ),
        ) from exc

probe_backends

probe_backends(model_dir: str | Path | None = None) -> tuple[BackendProbe, ...]

Probe runtime backends in RFC-0010 auto-selection order.

Source code in geno_lewm/deploy/runtime.py
def probe_backends(model_dir: str | Path | None = None) -> tuple[BackendProbe, ...]:
    """Probe runtime backends in RFC-0010 auto-selection order."""
    root = None if model_dir is None else Path(model_dir).expanduser()
    return (
        _probe_coreml(root),
        _probe_cuda(root),
        _probe_onnx(root),
        BackendProbe(BACKEND_CPU, True, "portable CPU fallback is always available"),
    )

select_backend

select_backend(backend: str = BACKEND_AUTO, *, probes: Sequence[BackendProbe] | None = None) -> str

Select a backend from probe results, or raise if the requested one is unavailable.

Source code in geno_lewm/deploy/runtime.py
def select_backend(
    backend: str = BACKEND_AUTO,
    *,
    probes: Sequence[BackendProbe] | None = None,
) -> str:
    """Select a backend from probe results, or raise if the requested one is unavailable."""
    normalized = _normalize_backend(backend)
    observed = tuple(probe_backends() if probes is None else probes)
    by_backend = {probe.backend: probe for probe in observed}

    if normalized == BACKEND_AUTO:
        for name in BACKEND_PRIORITY:
            probe = by_backend.get(name)
            if probe is not None and probe.available:
                return name
        raise BackendUnsupportedError(
            "no runtime backend is available",
            details={"probes": [_probe_details(probe) for probe in observed]},
        )

    probe = by_backend.get(normalized)
    if probe is None:
        raise BackendUnsupportedError(
            "requested runtime backend was not probed",
            details={"backend": normalized, "probed": sorted(by_backend)},
        )
    if not probe.available:
        raise BackendUnsupportedError(
            "requested runtime backend is unavailable",
            details={"backend": normalized, "reason": probe.reason},
        )
    return normalized