Skip to content

geno_lewm.encoder.cache

cache

Parquet window-embedding cache and SQLite index.

Implements the on-disk cache contract from RFC-0002 §3.6 and docs/spec/03-data-model.md#on-disk-window-embedding-cache. Parquet support is intentionally imported lazily so the base package keeps its minimal dependency surface; install geno-lewm[train] or the development extra to use this module.

WindowCacheKey dataclass

WindowCacheKey(window_hash: bytes, encoder_hash: bytes, state_layer: int, pool_type: str, pool_radius: int, dtype: str)

Content-addressed key for a cached embedding row.

WindowCacheRecord dataclass

WindowCacheRecord(chrom: str, start_bp: int, end_bp: int, window_hash: bytes, encoder_hash: bytes, state_layer: int, pool_type: str, pool_radius: int, dtype: str, embedding: tuple[float, ...], untargeted: bool, created_at: int = 0, schema_version: str = CACHE_SCHEMA_VERSION)

One row in the window-embedding cache schema.

key property

key: WindowCacheKey

Return the content-addressed key for this row.

with_created_at

with_created_at() -> WindowCacheRecord

Fill created_at with current UTC nanoseconds when absent.

Source code in geno_lewm/encoder/cache.py
def with_created_at(self) -> WindowCacheRecord:
    """Fill ``created_at`` with current UTC nanoseconds when absent."""
    if self.created_at:
        return self
    return WindowCacheRecord(
        chrom=self.chrom,
        start_bp=self.start_bp,
        end_bp=self.end_bp,
        window_hash=self.window_hash,
        encoder_hash=self.encoder_hash,
        state_layer=self.state_layer,
        pool_type=self.pool_type,
        pool_radius=self.pool_radius,
        dtype=self.dtype,
        embedding=self.embedding,
        untargeted=self.untargeted,
        created_at=time.time_ns(),
        schema_version=self.schema_version,
    )

CacheReindexReport dataclass

CacheReindexReport(indexed_shards: int, indexed_rows: int, index_path: Path)

Summary of a SQLite index rebuild.

CacheRepairReport dataclass

CacheRepairReport(checked_shards: int, quarantined: tuple[Path, ...], reindex: CacheReindexReport)

Summary of a repair pass over Parquet shards.

default_cache_dir

default_cache_dir() -> Path

Return $GENO_LEWM_CACHE or the documented local default.

Source code in geno_lewm/encoder/cache.py
def default_cache_dir() -> Path:
    """Return ``$GENO_LEWM_CACHE`` or the documented local default."""
    return Path(os.environ.get("GENO_LEWM_CACHE", ".geno-lewm-cache")).expanduser()

shard_path_for

shard_path_for(cache_dir: Path | str, *, encoder_id: str, state_layer: int, pool_type: str, pool_radius: int, contig: str, stride_block: int) -> Path

Return the canonical Parquet shard path for a cache block.

Source code in geno_lewm/encoder/cache.py
def shard_path_for(
    cache_dir: Path | str,
    *,
    encoder_id: str,
    state_layer: int,
    pool_type: str,
    pool_radius: int,
    contig: str,
    stride_block: int,
) -> Path:
    """Return the canonical Parquet shard path for a cache block."""
    _validate_state_layer(state_layer)
    _validate_pool(pool_type, pool_radius)
    if not contig:
        raise InputError("contig must be non-empty")
    if not isinstance(stride_block, int) or isinstance(stride_block, bool) or stride_block < 0:
        raise InputError(
            "stride_block must be a non-negative integer",
            details={"stride_block": stride_block},
        )
    root = Path(cache_dir)
    encoder_part = _path_part(encoder_id)
    return (
        root
        / _EMBEDDINGS_DIR
        / encoder_part
        / str(state_layer)
        / f"{pool_type}_{pool_radius}"
        / f"chr{_path_part(contig)}_{stride_block}.parquet"
    )

write_shard

write_shard(cache_dir: Path | str, *, encoder_id: str, contig: str, stride_block: int, records: Sequence[WindowCacheRecord]) -> Path

Write one immutable Parquet shard and index its rows.

If the shard already exists with the same rows, this is a no-op. If it exists and new or conflicting rows are supplied, the function raises instead of rewriting in place (INV-DATA-3 / INV-DATA-10).

Source code in geno_lewm/encoder/cache.py
def write_shard(
    cache_dir: Path | str,
    *,
    encoder_id: str,
    contig: str,
    stride_block: int,
    records: Sequence[WindowCacheRecord],
) -> Path:
    """Write one immutable Parquet shard and index its rows.

    If the shard already exists with the same rows, this is a no-op.
    If it exists and new or conflicting rows are supplied, the function
    raises instead of rewriting in place (INV-DATA-3 / INV-DATA-10).
    """
    if not records:
        raise InputError("records must contain at least one cache row")
    normalized = tuple(record.with_created_at() for record in records)
    first = normalized[0]
    if any(record.chrom != contig for record in normalized):
        raise InputError("all records in a shard must match the contig argument")
    if any(record.state_layer != first.state_layer for record in normalized):
        raise InputError("all records in a shard must share state_layer")
    if any(record.pool_type != first.pool_type for record in normalized):
        raise InputError("all records in a shard must share pool_type")
    if any(record.pool_radius != first.pool_radius for record in normalized):
        raise InputError("all records in a shard must share pool_radius")

    root = Path(cache_dir)
    path = shard_path_for(
        root,
        encoder_id=encoder_id,
        state_layer=first.state_layer,
        pool_type=first.pool_type,
        pool_radius=first.pool_radius,
        contig=contig,
        stride_block=stride_block,
    )
    if path.exists():
        existing = _read_records_from_shard(path)
        _assert_existing_shard_equivalent(path, existing, normalized)
        _index_records(root, path, existing)
        return path

    _assert_index_keys_available(root, normalized)
    path.parent.mkdir(parents=True, exist_ok=True)
    _write_records_to_parquet(path, normalized)
    _index_records(root, path, normalized)
    return path

read_embedding

read_embedding(cache_dir: Path | str, key: WindowCacheKey) -> tuple[float, ...] | None

Return an embedding by content key, or None on cache miss.

Source code in geno_lewm/encoder/cache.py
def read_embedding(cache_dir: Path | str, key: WindowCacheKey) -> tuple[float, ...] | None:
    """Return an embedding by content key, or ``None`` on cache miss."""
    root = Path(cache_dir)
    index_path = _index_path(root)
    if not index_path.exists():
        return None
    with closing(sqlite3.connect(index_path)) as conn:
        _ensure_index_schema(conn)
        row = conn.execute(
            """
            SELECT shard_path, row_offset
            FROM window_index
            WHERE window_hash = ?
              AND encoder_hash = ?
              AND state_layer = ?
              AND pool_type = ?
              AND pool_radius = ?
              AND dtype = ?
            """,
            _index_key_params(key),
        ).fetchone()
        conn.commit()
    if row is None:
        return None
    shard_path = root / str(row[0])
    row_offset = int(row[1])
    try:
        records = _read_records_from_shard(shard_path)
    except CacheCorruptError:
        raise
    if row_offset < 0 or row_offset >= len(records):
        raise CacheCorruptError(
            "cache index row_offset points outside shard",
            details={"shard_path": str(shard_path), "row_offset": row_offset},
        )
    record = records[row_offset]
    if record.key != key:
        raise CacheCorruptError(
            "cache index key does not match shard row",
            details={"shard_path": str(shard_path), "row_offset": row_offset},
        )
    return record.embedding

reindex_cache

reindex_cache(cache_dir: Path | str) -> CacheReindexReport

Rebuild index.sqlite from every readable Parquet shard.

Source code in geno_lewm/encoder/cache.py
def reindex_cache(cache_dir: Path | str) -> CacheReindexReport:
    """Rebuild ``index.sqlite`` from every readable Parquet shard."""
    root = Path(cache_dir)
    index_path = _index_path(root)
    index_path.parent.mkdir(parents=True, exist_ok=True)
    if index_path.exists():
        index_path.unlink()
    indexed_shards = 0
    indexed_rows = 0
    with closing(sqlite3.connect(index_path)) as conn:
        _ensure_index_schema(conn)
        for shard in _iter_shards(root):
            records = _read_records_from_shard(shard)
            _insert_index_records(conn, root, shard, records)
            indexed_shards += 1
            indexed_rows += len(records)
        conn.commit()
    return CacheReindexReport(
        indexed_shards=indexed_shards,
        indexed_rows=indexed_rows,
        index_path=index_path,
    )

repair_cache

repair_cache(cache_dir: Path | str) -> CacheRepairReport

Quarantine unreadable Parquet shards and rebuild the SQLite index.

Source code in geno_lewm/encoder/cache.py
def repair_cache(cache_dir: Path | str) -> CacheRepairReport:
    """Quarantine unreadable Parquet shards and rebuild the SQLite index."""
    root = Path(cache_dir)
    quarantined: list[Path] = []
    checked = 0
    for shard in list(_iter_shards(root)):
        checked += 1
        try:
            _read_records_from_shard(shard)
        except CacheCorruptError:
            quarantined.append(_quarantine_shard(root, shard))
    report = reindex_cache(root)
    return CacheRepairReport(
        checked_shards=checked,
        quarantined=tuple(quarantined),
        reindex=report,
    )