Skip to content

geno_lewm.encoder.carbon

carbon

Carbon state encoder wrapper.

The heavy runtime dependencies remain optional. Callers can inject a tokenizer/model pair for tests or already-loaded runtimes; otherwise the wrapper loads from Hugging Face Transformers with local_files_only by default so constructing the encoder does not hide a network download.

CarbonStateEncoder

CarbonStateEncoder(model_id: str, revision: str, *, dtype: str = 'bf16', state_layer: int = -1, pool_type: str = POOL_CENTERED_MEAN, pool_radius: int = DEFAULT_POOL_RADIUS_TOKENS, normalize: bool = True, lora_config: object | None = None, model: object | None = None, tokenizer: object | None = None, encoder_hash: bytes | str | None = None, local_files_only: bool = True, trust_remote_code: bool = False, device: str | None = None)

Encode DNA windows with Carbon hidden states plus deterministic pooling.

Source code in geno_lewm/encoder/carbon.py
def __init__(
    self,
    model_id: str,
    revision: str,
    *,
    dtype: str = "bf16",
    state_layer: int = -1,
    pool_type: str = POOL_CENTERED_MEAN,
    pool_radius: int = DEFAULT_POOL_RADIUS_TOKENS,
    normalize: bool = True,
    lora_config: object | None = None,
    model: object | None = None,
    tokenizer: object | None = None,
    encoder_hash: bytes | str | None = None,
    local_files_only: bool = True,
    trust_remote_code: bool = False,
    device: str | None = None,
) -> None:
    if not model_id:
        raise InputError("model_id must be non-empty")
    if not revision:
        raise InputError("revision must be non-empty")
    if dtype not in _SUPPORTED_DTYPES:
        raise InputError(
            "unsupported encoder dtype",
            details={"dtype": dtype, "supported": sorted(_SUPPORTED_DTYPES)},
        )
    if not isinstance(state_layer, int) or isinstance(state_layer, bool):
        raise InputError(
            "state_layer must be an integer",
            details={"state_layer": state_layer, "type": type(state_layer).__name__},
        )
    if pool_type not in {POOL_CENTERED_MEAN, POOL_GLOBAL_MEAN}:
        raise InputError(
            "unsupported pool_type",
            details={
                "pool_type": pool_type,
                "supported": [POOL_CENTERED_MEAN, POOL_GLOBAL_MEAN],
            },
        )
    if not isinstance(pool_radius, int) or isinstance(pool_radius, bool) or pool_radius < 0:
        raise InputError(
            "pool_radius must be a non-negative integer",
            details={"pool_radius": pool_radius, "type": type(pool_radius).__name__},
        )
    if not isinstance(normalize, bool):
        raise InputError(
            "normalize must be bool",
            details={"type": type(normalize).__name__},
        )
    if lora_config is not None:
        raise RuntimeSetupError(
            "Carbon LoRA adapters are not supported by CarbonStateEncoder yet",
            remediation="merge LoRA adapters before loading or track the Phase 2 adapter issue",
        )
    if (model is None) != (tokenizer is None):
        raise InputError(
            "model and tokenizer must be supplied together",
            details={"model": model is not None, "tokenizer": tokenizer is not None},
        )

    self.model_id = model_id
    self.revision = revision
    self.dtype = dtype
    self.state_layer = state_layer
    self.pool_type = cast(_PoolType, pool_type)
    self.pool_radius = pool_radius
    self.normalize = normalize
    self.local_files_only = local_files_only
    self.trust_remote_code = trust_remote_code
    self.device = _resolve_device(device)
    self._encoder_hash = _coerce_encoder_hash(encoder_hash)
    self._d_state: int | None = None

    if model is None or tokenizer is None:
        tokenizer, model = _load_transformers_components(
            model_id=model_id,
            revision=revision,
            dtype=dtype,
            local_files_only=local_files_only,
            trust_remote_code=trust_remote_code,
        )
    self.tokenizer = tokenizer
    self.model = model
    _eval_if_available(self.model)
    _move_module_to_device(self.model, self.device)
    config = getattr(self.model, "config", None)
    hidden_size = getattr(config, "hidden_size", None)
    if isinstance(hidden_size, int) and not isinstance(hidden_size, bool) and hidden_size > 0:
        self._d_state = hidden_size

encoder_hash property

encoder_hash: bytes

Return the configured encoder hash bytes.

d_state property

d_state: int

Return the pooled state width when known.

encode

encode(window: str, edit_locus: int | None = None) -> tuple[float, ...]

Encode and pool one DNA window.

Source code in geno_lewm/encoder/carbon.py
def encode(self, window: str, edit_locus: int | None = None) -> tuple[float, ...]:
    """Encode and pool one DNA window."""
    return self.encode_batch([window], [edit_locus])[0]

encode_batch

encode_batch(windows: Sequence[str], edit_loci: Sequence[int | None]) -> tuple[tuple[float, ...], ...]

Encode and pool a batch of DNA windows.

Source code in geno_lewm/encoder/carbon.py
def encode_batch(
    self,
    windows: Sequence[str],
    edit_loci: Sequence[int | None],
) -> tuple[tuple[float, ...], ...]:
    """Encode and pool a batch of DNA windows."""
    if not isinstance(windows, Sequence) or isinstance(windows, str | bytes):
        raise InputError(
            "windows must be a sequence of DNA strings",
            details={"type": type(windows).__name__},
        )
    if not isinstance(edit_loci, Sequence) or isinstance(edit_loci, str | bytes):
        raise InputError(
            "edit_loci must be a sequence of int or None values",
            details={"type": type(edit_loci).__name__},
        )
    if len(windows) != len(edit_loci):
        raise InputError(
            "windows and edit_loci must have the same length",
            details={"windows": len(windows), "edit_loci": len(edit_loci)},
        )
    if not windows:
        raise InputError("windows must contain at least one sequence")

    normalized = tuple(canonicalize_dna(window) for window in windows)
    wrapped = [wrap_dna_for_tokenizer(window) for window in normalized]
    tokenized = _tokenize(self.tokenizer, wrapped)
    tokenized = _move_inputs_to_device(tokenized, self.device)
    with torch_inference_context():
        output = _call_model(self.model, tokenized)
    rows_by_item = _hidden_rows_by_item(output, state_layer=self.state_layer)
    if len(rows_by_item) != len(windows):
        raise InputError(
            "encoder output batch size does not match input windows",
            details={"expected": len(windows), "observed": len(rows_by_item)},
        )

    encoded = tuple(
        pool_hidden_states(
            rows,
            edit_locus=edit_locus,
            pool_type=self.pool_type,
            pool_radius=self.pool_radius,
        ).vector
        for rows, edit_locus in zip(rows_by_item, edit_loci, strict=True)
    )
    if encoded:
        self._d_state = len(encoded[0])
    return encoded