Skip to content

geno_lewm.encoder

encoder

State-encoder input preparation and Carbon wrapper helpers.

The pure-Python windowing, pooling, and cache helpers import without the ML runtime. CarbonStateEncoder loads the optional Transformers stack only when callers construct it without injected model/tokenizer objects.

CacheReindexReport dataclass

CacheReindexReport(indexed_shards: int, indexed_rows: int, index_path: Path)

Summary of a SQLite index rebuild.

CacheRepairReport dataclass

CacheRepairReport(checked_shards: int, quarantined: tuple[Path, ...], reindex: CacheReindexReport)

Summary of a repair pass over Parquet shards.

WindowCacheKey dataclass

WindowCacheKey(window_hash: bytes, encoder_hash: bytes, state_layer: int, pool_type: str, pool_radius: int, dtype: str)

Content-addressed key for a cached embedding row.

WindowCacheRecord dataclass

WindowCacheRecord(chrom: str, start_bp: int, end_bp: int, window_hash: bytes, encoder_hash: bytes, state_layer: int, pool_type: str, pool_radius: int, dtype: str, embedding: tuple[float, ...], untargeted: bool, created_at: int = 0, schema_version: str = CACHE_SCHEMA_VERSION)

One row in the window-embedding cache schema.

key property

key: WindowCacheKey

Return the content-addressed key for this row.

with_created_at

with_created_at() -> WindowCacheRecord

Fill created_at with current UTC nanoseconds when absent.

Source code in geno_lewm/encoder/cache.py
def with_created_at(self) -> WindowCacheRecord:
    """Fill ``created_at`` with current UTC nanoseconds when absent."""
    if self.created_at:
        return self
    return WindowCacheRecord(
        chrom=self.chrom,
        start_bp=self.start_bp,
        end_bp=self.end_bp,
        window_hash=self.window_hash,
        encoder_hash=self.encoder_hash,
        state_layer=self.state_layer,
        pool_type=self.pool_type,
        pool_radius=self.pool_radius,
        dtype=self.dtype,
        embedding=self.embedding,
        untargeted=self.untargeted,
        created_at=time.time_ns(),
        schema_version=self.schema_version,
    )

CarbonStateEncoder

CarbonStateEncoder(model_id: str, revision: str, *, dtype: str = 'bf16', state_layer: int = -1, pool_type: str = POOL_CENTERED_MEAN, pool_radius: int = DEFAULT_POOL_RADIUS_TOKENS, normalize: bool = True, lora_config: object | None = None, model: object | None = None, tokenizer: object | None = None, encoder_hash: bytes | str | None = None, local_files_only: bool = True, trust_remote_code: bool = False, device: str | None = None)

Encode DNA windows with Carbon hidden states plus deterministic pooling.

Source code in geno_lewm/encoder/carbon.py
def __init__(
    self,
    model_id: str,
    revision: str,
    *,
    dtype: str = "bf16",
    state_layer: int = -1,
    pool_type: str = POOL_CENTERED_MEAN,
    pool_radius: int = DEFAULT_POOL_RADIUS_TOKENS,
    normalize: bool = True,
    lora_config: object | None = None,
    model: object | None = None,
    tokenizer: object | None = None,
    encoder_hash: bytes | str | None = None,
    local_files_only: bool = True,
    trust_remote_code: bool = False,
    device: str | None = None,
) -> None:
    if not model_id:
        raise InputError("model_id must be non-empty")
    if not revision:
        raise InputError("revision must be non-empty")
    if dtype not in _SUPPORTED_DTYPES:
        raise InputError(
            "unsupported encoder dtype",
            details={"dtype": dtype, "supported": sorted(_SUPPORTED_DTYPES)},
        )
    if not isinstance(state_layer, int) or isinstance(state_layer, bool):
        raise InputError(
            "state_layer must be an integer",
            details={"state_layer": state_layer, "type": type(state_layer).__name__},
        )
    if pool_type not in {POOL_CENTERED_MEAN, POOL_GLOBAL_MEAN}:
        raise InputError(
            "unsupported pool_type",
            details={
                "pool_type": pool_type,
                "supported": [POOL_CENTERED_MEAN, POOL_GLOBAL_MEAN],
            },
        )
    if not isinstance(pool_radius, int) or isinstance(pool_radius, bool) or pool_radius < 0:
        raise InputError(
            "pool_radius must be a non-negative integer",
            details={"pool_radius": pool_radius, "type": type(pool_radius).__name__},
        )
    if not isinstance(normalize, bool):
        raise InputError(
            "normalize must be bool",
            details={"type": type(normalize).__name__},
        )
    if lora_config is not None:
        raise RuntimeSetupError(
            "Carbon LoRA adapters are not supported by CarbonStateEncoder yet",
            remediation="merge LoRA adapters before loading or track the Phase 2 adapter issue",
        )
    if (model is None) != (tokenizer is None):
        raise InputError(
            "model and tokenizer must be supplied together",
            details={"model": model is not None, "tokenizer": tokenizer is not None},
        )

    self.model_id = model_id
    self.revision = revision
    self.dtype = dtype
    self.state_layer = state_layer
    self.pool_type = cast(_PoolType, pool_type)
    self.pool_radius = pool_radius
    self.normalize = normalize
    self.local_files_only = local_files_only
    self.trust_remote_code = trust_remote_code
    self.device = _resolve_device(device)
    self._encoder_hash = _coerce_encoder_hash(encoder_hash)
    self._d_state: int | None = None

    if model is None or tokenizer is None:
        tokenizer, model = _load_transformers_components(
            model_id=model_id,
            revision=revision,
            dtype=dtype,
            local_files_only=local_files_only,
            trust_remote_code=trust_remote_code,
        )
    self.tokenizer = tokenizer
    self.model = model
    _eval_if_available(self.model)
    _move_module_to_device(self.model, self.device)
    config = getattr(self.model, "config", None)
    hidden_size = getattr(config, "hidden_size", None)
    if isinstance(hidden_size, int) and not isinstance(hidden_size, bool) and hidden_size > 0:
        self._d_state = hidden_size

encoder_hash property

encoder_hash: bytes

Return the configured encoder hash bytes.

d_state property

d_state: int

Return the pooled state width when known.

encode

encode(window: str, edit_locus: int | None = None) -> tuple[float, ...]

Encode and pool one DNA window.

Source code in geno_lewm/encoder/carbon.py
def encode(self, window: str, edit_locus: int | None = None) -> tuple[float, ...]:
    """Encode and pool one DNA window."""
    return self.encode_batch([window], [edit_locus])[0]

encode_batch

encode_batch(windows: Sequence[str], edit_loci: Sequence[int | None]) -> tuple[tuple[float, ...], ...]

Encode and pool a batch of DNA windows.

Source code in geno_lewm/encoder/carbon.py
def encode_batch(
    self,
    windows: Sequence[str],
    edit_loci: Sequence[int | None],
) -> tuple[tuple[float, ...], ...]:
    """Encode and pool a batch of DNA windows."""
    if not isinstance(windows, Sequence) or isinstance(windows, str | bytes):
        raise InputError(
            "windows must be a sequence of DNA strings",
            details={"type": type(windows).__name__},
        )
    if not isinstance(edit_loci, Sequence) or isinstance(edit_loci, str | bytes):
        raise InputError(
            "edit_loci must be a sequence of int or None values",
            details={"type": type(edit_loci).__name__},
        )
    if len(windows) != len(edit_loci):
        raise InputError(
            "windows and edit_loci must have the same length",
            details={"windows": len(windows), "edit_loci": len(edit_loci)},
        )
    if not windows:
        raise InputError("windows must contain at least one sequence")

    normalized = tuple(canonicalize_dna(window) for window in windows)
    wrapped = [wrap_dna_for_tokenizer(window) for window in normalized]
    tokenized = _tokenize(self.tokenizer, wrapped)
    tokenized = _move_inputs_to_device(tokenized, self.device)
    with torch_inference_context():
        output = _call_model(self.model, tokenized)
    rows_by_item = _hidden_rows_by_item(output, state_layer=self.state_layer)
    if len(rows_by_item) != len(windows):
        raise InputError(
            "encoder output batch size does not match input windows",
            details={"expected": len(windows), "observed": len(rows_by_item)},
        )

    encoded = tuple(
        pool_hidden_states(
            rows,
            edit_locus=edit_locus,
            pool_type=self.pool_type,
            pool_radius=self.pool_radius,
        ).vector
        for rows, edit_locus in zip(rows_by_item, edit_loci, strict=True)
    )
    if encoded:
        self._d_state = len(encoded[0])
    return encoded

PoolingResult dataclass

PoolingResult(vector: tuple[float, ...], pool_type: Literal['centered_mean', 'global_mean'], pool_radius: int, untargeted: bool, center_token: int | None, token_count: int)

Pooled state vector plus cache-key metadata.

d_state property

d_state: int

Return the pooled vector width.

as_cache_fields

as_cache_fields() -> Mapping[str, object]

Return fields shared with the window-cache schema.

Source code in geno_lewm/encoder/pooling.py
def as_cache_fields(self) -> Mapping[str, object]:
    """Return fields shared with the window-cache schema."""
    return {
        "pool_type": self.pool_type,
        "pool_radius": self.pool_radius,
        "untargeted": self.untargeted,
    }

ExtractedWindow dataclass

ExtractedWindow(sequence: str, start_bp: int, end_bp: int, window_bp: int, edit_locus: int | None = None, relative_edit_locus: int | None = None, pad_right_bp: int = 0)

A fixed-size DNA window plus its source-coordinate metadata.

start_bp and end_bp are 0-based half-open coordinates in the caller's source coordinate system. end_bp - start_bp always equals window_bp even when the sequence had to be right-padded past the available source bases; pad_right_bp records how many trailing A bases were introduced.

untargeted property

untargeted: bool

Return true when the window was not centered on an edit.

sha256 property

sha256: bytes

SHA-256 digest of the canonical window sequence.

as_tokenizer_input

as_tokenizer_input() -> str

Return the Carbon tokenizer input string for this window.

Source code in geno_lewm/encoder/windowing.py
def as_tokenizer_input(self) -> str:
    """Return the Carbon tokenizer input string for this window."""
    return wrap_dna_for_tokenizer(self.sequence)

default_cache_dir

default_cache_dir() -> Path

Return $GENO_LEWM_CACHE or the documented local default.

Source code in geno_lewm/encoder/cache.py
def default_cache_dir() -> Path:
    """Return ``$GENO_LEWM_CACHE`` or the documented local default."""
    return Path(os.environ.get("GENO_LEWM_CACHE", ".geno-lewm-cache")).expanduser()

read_embedding

read_embedding(cache_dir: Path | str, key: WindowCacheKey) -> tuple[float, ...] | None

Return an embedding by content key, or None on cache miss.

Source code in geno_lewm/encoder/cache.py
def read_embedding(cache_dir: Path | str, key: WindowCacheKey) -> tuple[float, ...] | None:
    """Return an embedding by content key, or ``None`` on cache miss."""
    root = Path(cache_dir)
    index_path = _index_path(root)
    if not index_path.exists():
        return None
    with closing(sqlite3.connect(index_path)) as conn:
        _ensure_index_schema(conn)
        row = conn.execute(
            """
            SELECT shard_path, row_offset
            FROM window_index
            WHERE window_hash = ?
              AND encoder_hash = ?
              AND state_layer = ?
              AND pool_type = ?
              AND pool_radius = ?
              AND dtype = ?
            """,
            _index_key_params(key),
        ).fetchone()
        conn.commit()
    if row is None:
        return None
    shard_path = root / str(row[0])
    row_offset = int(row[1])
    try:
        records = _read_records_from_shard(shard_path)
    except CacheCorruptError:
        raise
    if row_offset < 0 or row_offset >= len(records):
        raise CacheCorruptError(
            "cache index row_offset points outside shard",
            details={"shard_path": str(shard_path), "row_offset": row_offset},
        )
    record = records[row_offset]
    if record.key != key:
        raise CacheCorruptError(
            "cache index key does not match shard row",
            details={"shard_path": str(shard_path), "row_offset": row_offset},
        )
    return record.embedding

reindex_cache

reindex_cache(cache_dir: Path | str) -> CacheReindexReport

Rebuild index.sqlite from every readable Parquet shard.

Source code in geno_lewm/encoder/cache.py
def reindex_cache(cache_dir: Path | str) -> CacheReindexReport:
    """Rebuild ``index.sqlite`` from every readable Parquet shard."""
    root = Path(cache_dir)
    index_path = _index_path(root)
    index_path.parent.mkdir(parents=True, exist_ok=True)
    if index_path.exists():
        index_path.unlink()
    indexed_shards = 0
    indexed_rows = 0
    with closing(sqlite3.connect(index_path)) as conn:
        _ensure_index_schema(conn)
        for shard in _iter_shards(root):
            records = _read_records_from_shard(shard)
            _insert_index_records(conn, root, shard, records)
            indexed_shards += 1
            indexed_rows += len(records)
        conn.commit()
    return CacheReindexReport(
        indexed_shards=indexed_shards,
        indexed_rows=indexed_rows,
        index_path=index_path,
    )

repair_cache

repair_cache(cache_dir: Path | str) -> CacheRepairReport

Quarantine unreadable Parquet shards and rebuild the SQLite index.

Source code in geno_lewm/encoder/cache.py
def repair_cache(cache_dir: Path | str) -> CacheRepairReport:
    """Quarantine unreadable Parquet shards and rebuild the SQLite index."""
    root = Path(cache_dir)
    quarantined: list[Path] = []
    checked = 0
    for shard in list(_iter_shards(root)):
        checked += 1
        try:
            _read_records_from_shard(shard)
        except CacheCorruptError:
            quarantined.append(_quarantine_shard(root, shard))
    report = reindex_cache(root)
    return CacheRepairReport(
        checked_shards=checked,
        quarantined=tuple(quarantined),
        reindex=report,
    )

shard_path_for

shard_path_for(cache_dir: Path | str, *, encoder_id: str, state_layer: int, pool_type: str, pool_radius: int, contig: str, stride_block: int) -> Path

Return the canonical Parquet shard path for a cache block.

Source code in geno_lewm/encoder/cache.py
def shard_path_for(
    cache_dir: Path | str,
    *,
    encoder_id: str,
    state_layer: int,
    pool_type: str,
    pool_radius: int,
    contig: str,
    stride_block: int,
) -> Path:
    """Return the canonical Parquet shard path for a cache block."""
    _validate_state_layer(state_layer)
    _validate_pool(pool_type, pool_radius)
    if not contig:
        raise InputError("contig must be non-empty")
    if not isinstance(stride_block, int) or isinstance(stride_block, bool) or stride_block < 0:
        raise InputError(
            "stride_block must be a non-negative integer",
            details={"stride_block": stride_block},
        )
    root = Path(cache_dir)
    encoder_part = _path_part(encoder_id)
    return (
        root
        / _EMBEDDINGS_DIR
        / encoder_part
        / str(state_layer)
        / f"{pool_type}_{pool_radius}"
        / f"chr{_path_part(contig)}_{stride_block}.parquet"
    )

write_shard

write_shard(cache_dir: Path | str, *, encoder_id: str, contig: str, stride_block: int, records: Sequence[WindowCacheRecord]) -> Path

Write one immutable Parquet shard and index its rows.

If the shard already exists with the same rows, this is a no-op. If it exists and new or conflicting rows are supplied, the function raises instead of rewriting in place (INV-DATA-3 / INV-DATA-10).

Source code in geno_lewm/encoder/cache.py
def write_shard(
    cache_dir: Path | str,
    *,
    encoder_id: str,
    contig: str,
    stride_block: int,
    records: Sequence[WindowCacheRecord],
) -> Path:
    """Write one immutable Parquet shard and index its rows.

    If the shard already exists with the same rows, this is a no-op.
    If it exists and new or conflicting rows are supplied, the function
    raises instead of rewriting in place (INV-DATA-3 / INV-DATA-10).
    """
    if not records:
        raise InputError("records must contain at least one cache row")
    normalized = tuple(record.with_created_at() for record in records)
    first = normalized[0]
    if any(record.chrom != contig for record in normalized):
        raise InputError("all records in a shard must match the contig argument")
    if any(record.state_layer != first.state_layer for record in normalized):
        raise InputError("all records in a shard must share state_layer")
    if any(record.pool_type != first.pool_type for record in normalized):
        raise InputError("all records in a shard must share pool_type")
    if any(record.pool_radius != first.pool_radius for record in normalized):
        raise InputError("all records in a shard must share pool_radius")

    root = Path(cache_dir)
    path = shard_path_for(
        root,
        encoder_id=encoder_id,
        state_layer=first.state_layer,
        pool_type=first.pool_type,
        pool_radius=first.pool_radius,
        contig=contig,
        stride_block=stride_block,
    )
    if path.exists():
        existing = _read_records_from_shard(path)
        _assert_existing_shard_equivalent(path, existing, normalized)
        _index_records(root, path, existing)
        return path

    _assert_index_keys_available(root, normalized)
    path.parent.mkdir(parents=True, exist_ok=True)
    _write_records_to_parquet(path, normalized)
    _index_records(root, path, normalized)
    return path

centered_mean

centered_mean(hidden_states: Sequence[Sequence[float]], *, center_token: int, pool_radius: int = DEFAULT_POOL_RADIUS_TOKENS) -> tuple[float, ...]

Mean-pool the inclusive token span center_token ± pool_radius.

Source code in geno_lewm/encoder/pooling.py
def centered_mean(
    hidden_states: Sequence[Sequence[float]],
    *,
    center_token: int,
    pool_radius: int = DEFAULT_POOL_RADIUS_TOKENS,
) -> tuple[float, ...]:
    """Mean-pool the inclusive token span ``center_token ± pool_radius``."""
    rows = _coerce_hidden_states(hidden_states)
    center = _validate_center_token(center_token, len(rows))
    radius = _validate_pool_radius(pool_radius)

    start = max(0, center - radius)
    end = min(len(rows), center + radius + 1)
    return _mean_rows(rows[start:end])

global_mean

global_mean(hidden_states: Sequence[Sequence[float]]) -> tuple[float, ...]

Mean-pool every token vector in hidden_states.

Source code in geno_lewm/encoder/pooling.py
def global_mean(hidden_states: Sequence[Sequence[float]]) -> tuple[float, ...]:
    """Mean-pool every token vector in ``hidden_states``."""
    rows = _coerce_hidden_states(hidden_states)
    return _mean_rows(rows)

pool_hidden_states

pool_hidden_states(hidden_states: Sequence[Sequence[float]], *, edit_locus: int | None = None, pool_type: Literal['centered_mean', 'global_mean'] = POOL_CENTERED_MEAN, pool_radius: int = DEFAULT_POOL_RADIUS_TOKENS, token_bp: int = CARBON_TOKEN_BP) -> PoolingResult

Pool token-level hidden states into a state vector.

edit_locus is a 0-based base-pair offset within the encoder window. When it is absent, RFC-0002 requires a global-mean fallback tagged as untargeted=True so cache consumers do not mix arbitrary reference-window embeddings with edit-local embeddings.

Source code in geno_lewm/encoder/pooling.py
def pool_hidden_states(
    hidden_states: Sequence[Sequence[float]],
    *,
    edit_locus: int | None = None,
    pool_type: Literal["centered_mean", "global_mean"] = POOL_CENTERED_MEAN,
    pool_radius: int = DEFAULT_POOL_RADIUS_TOKENS,
    token_bp: int = CARBON_TOKEN_BP,
) -> PoolingResult:
    """Pool token-level hidden states into a state vector.

    ``edit_locus`` is a 0-based base-pair offset within the encoder
    window. When it is absent, RFC-0002 requires a global-mean fallback
    tagged as ``untargeted=True`` so cache consumers do not mix arbitrary
    reference-window embeddings with edit-local embeddings.
    """
    rows = _coerce_hidden_states(hidden_states)
    requested_type = _validate_pool_type(pool_type)
    radius = _validate_pool_radius(pool_radius)

    if edit_locus is None:
        return PoolingResult(
            vector=_mean_rows(rows),
            pool_type=POOL_GLOBAL_MEAN,
            pool_radius=0,
            untargeted=True,
            center_token=None,
            token_count=len(rows),
        )

    center_token = _edit_locus_to_token(edit_locus, token_count=len(rows), token_bp=token_bp)
    if requested_type == POOL_GLOBAL_MEAN:
        return PoolingResult(
            vector=_mean_rows(rows),
            pool_type=POOL_GLOBAL_MEAN,
            pool_radius=0,
            untargeted=False,
            center_token=None,
            token_count=len(rows),
        )

    return PoolingResult(
        vector=centered_mean(rows, center_token=center_token, pool_radius=radius),
        pool_type=POOL_CENTERED_MEAN,
        pool_radius=radius,
        untargeted=False,
        center_token=center_token,
        token_count=len(rows),
    )

canonicalize_dna

canonicalize_dna(sequence: str) -> str

Return uppercase DNA after validating the supported alphabet.

The cache hash invariant is based on uppercased window content, so callers can hash raw source slices and already-canonical windows interchangeably. N is accepted because reference FASTA and edited windows may contain masked bases.

Source code in geno_lewm/encoder/windowing.py
def canonicalize_dna(sequence: str) -> str:
    """Return uppercase DNA after validating the supported alphabet.

    The cache hash invariant is based on uppercased window content, so
    callers can hash raw source slices and already-canonical windows
    interchangeably. ``N`` is accepted because reference FASTA and
    edited windows may contain masked bases.
    """
    if not isinstance(sequence, str):
        raise InputError(
            "DNA sequence must be a string",
            details={"type": type(sequence).__name__},
        )
    canonical = sequence.upper()
    bad = sorted(set(canonical) - _VALID_DNA_BASES)
    if bad:
        raise InputError(
            "DNA sequence contains unsupported base(s)",
            details={"bad_chars": bad},
            remediation="provide only A, C, G, T, or N bases",
        )
    return canonical

extract_window

extract_window(source_sequence: str, *, edit_locus: int | None = None, window_bp: int = DEFAULT_WINDOW_BP, assume_canonical: bool = False) -> ExtractedWindow

Extract a supported-width DNA window from source_sequence.

edit_locus is a 0-based offset in source_sequence. When it is supplied the window is centered on that locus unless clamped by source boundaries. When omitted, the source midpoint is used. If the source is shorter than the requested window or the selected interval extends past the right edge, trailing A bases are appended per Carbon's tokenizer convention.

Set assume_canonical when source_sequence is already uppercase, validated DNA (e.g. a contig from a loaded reference FASTA) to skip the O(len) re-validation. Re-validating a whole chromosome once per variant otherwise dominates VCF scoring wall-clock.

Source code in geno_lewm/encoder/windowing.py
def extract_window(
    source_sequence: str,
    *,
    edit_locus: int | None = None,
    window_bp: int = DEFAULT_WINDOW_BP,
    assume_canonical: bool = False,
) -> ExtractedWindow:
    """Extract a supported-width DNA window from ``source_sequence``.

    ``edit_locus`` is a 0-based offset in ``source_sequence``. When it
    is supplied the window is centered on that locus unless clamped by
    source boundaries. When omitted, the source midpoint is used. If
    the source is shorter than the requested window or the selected
    interval extends past the right edge, trailing ``A`` bases are
    appended per Carbon's tokenizer convention.

    Set ``assume_canonical`` when ``source_sequence`` is already uppercase,
    validated DNA (e.g. a contig from a loaded reference FASTA) to skip the
    O(len) re-validation. Re-validating a whole chromosome once per variant
    otherwise dominates VCF scoring wall-clock.
    """
    _validate_window_bp(window_bp)
    source = source_sequence if assume_canonical else canonicalize_dna(source_sequence)
    if not source:
        raise InputError("source_sequence must be non-empty")

    source_len = len(source)
    center = _center_for(source_len, edit_locus)
    start_bp = _centered_start(source_len, center, window_bp)
    end_bp = start_bp + window_bp

    observed = source[start_bp : min(end_bp, source_len)]
    pad_right_bp = window_bp - len(observed)
    window = observed + (_PAD_BASE * pad_right_bp)

    relative_edit_locus: int | None = None
    if edit_locus is not None:
        relative_edit_locus = edit_locus - start_bp

    return ExtractedWindow(
        sequence=window,
        start_bp=start_bp,
        end_bp=end_bp,
        window_bp=window_bp,
        edit_locus=edit_locus,
        relative_edit_locus=relative_edit_locus,
        pad_right_bp=pad_right_bp,
    )

pad_for_carbon_tokenizer

pad_for_carbon_tokenizer(sequence: str, *, token_bp: int = CARBON_TOKEN_BP) -> str

Right-pad canonical DNA to Carbon's token multiple.

Source code in geno_lewm/encoder/windowing.py
def pad_for_carbon_tokenizer(sequence: str, *, token_bp: int = CARBON_TOKEN_BP) -> str:
    """Right-pad canonical DNA to Carbon's token multiple."""
    if not isinstance(token_bp, int) or isinstance(token_bp, bool) or token_bp <= 0:
        raise InputError(
            "token_bp must be a positive integer",
            details={"token_bp": token_bp, "type": type(token_bp).__name__},
        )
    canonical = canonicalize_dna(sequence)
    remainder = len(canonical) % token_bp
    if remainder == 0:
        return canonical
    return canonical + (_PAD_BASE * (token_bp - remainder))

window_sha256

window_sha256(sequence: str) -> bytes

Return SHA-256 bytes for the canonicalized DNA sequence.

Source code in geno_lewm/encoder/windowing.py
def window_sha256(sequence: str) -> bytes:
    """Return SHA-256 bytes for the canonicalized DNA sequence."""
    canonical = canonicalize_dna(sequence)
    return hashlib.sha256(canonical.encode("ascii")).digest()

wrap_dna_for_tokenizer

wrap_dna_for_tokenizer(sequence: str) -> str

Return <dna>...</dna> input with Carbon-compatible padding.

Source code in geno_lewm/encoder/windowing.py
def wrap_dna_for_tokenizer(sequence: str) -> str:
    """Return ``<dna>...</dna>`` input with Carbon-compatible padding."""
    padded = pad_for_carbon_tokenizer(sequence)
    return f"{CARBON_DNA_OPEN_TAG}{padded}{CARBON_DNA_CLOSE_TAG}"