Skip to content

geno_lewm.data.builder

builder

Training tuple builder for the RFC-0006 data pipeline.

This module owns the dependency-free boundary between prepared data sources and the eventual PyTorch trainer. It does not download gnomAD, ClinVar, or Carbon data. Instead, callers provide edit-source providers that are easy to unit-test with fixtures and later wire to real shards.

DEFAULT_EDIT_SOURCE_COUNTS module-attribute

DEFAULT_EDIT_SOURCE_COUNTS: tuple[EditSourceCount, ...] = (EditSourceCount(SOURCE_GNOMAD_COMMON, 3), EditSourceCount(SOURCE_SYNTHETIC_SNV, 3), EditSourceCount(SOURCE_SYNTHETIC_INDEL, 1), EditSourceCount(SOURCE_CLINVAR, 1))

RFC-0006 §3.3 per-window source allocation for N_edits = 8.

DEFAULT_SOURCE_FALLBACKS module-attribute

DEFAULT_SOURCE_FALLBACKS: dict[str, str] = {SOURCE_CLINVAR: SOURCE_SYNTHETIC_SNV, SOURCE_GNOMAD_COMMON: SOURCE_SYNTHETIC_SNV}

Default fallback when an absolute VCF edit is unavailable for a window.

ClinVar hard-negatives and gnomAD common variants are placed (absolute) sources: they only apply to windows that carry genome coordinates. On unplaced windows (the synthetic Carbon pretraining corpus) the absolute providers yield nothing and the builder draws synthetic SNVs instead, so pretraining-corpus windows still produce full edit tuples.

EditSourceCount dataclass

EditSourceCount(source: str, count: int)

Number of edits to draw from one RFC-0006 source per window.

WindowContext dataclass

WindowContext(record_id: str, source: str, sequence: str, start_bp: int = 0, chrom: str | None = None)

One reference window plus source coordinates for tuple building.

Coordinates are 0-based half-open: start_bp is inclusive and end_bp is exclusive. chrom is required for absolute variant providers and chromosome/interval holdouts, but synthetic providers can operate on unplaced Carbon windows.

end_bp property

end_bp: int

Return the 0-based exclusive end coordinate.

window_id property

window_id: str

Return the content hash used for cache lookup.

HoldoutInterval dataclass

HoldoutInterval(chrom: str, start_bp: int, end_bp: int)

0-based half-open genomic interval excluded from training.

intersects

intersects(chrom: str | None, start_bp: int, end_bp: int) -> bool

Return whether [start_bp, end_bp) intersects this interval.

Source code in geno_lewm/data/builder.py
def intersects(self, chrom: str | None, start_bp: int, end_bp: int) -> bool:
    """Return whether ``[start_bp, end_bp)`` intersects this interval."""
    if chrom != self.chrom:
        return False
    return start_bp < self.end_bp and self.start_bp < end_bp

HoldoutPolicy dataclass

HoldoutPolicy(holdout_chroms: tuple[str, ...] = (), intervals: tuple[HoldoutInterval, ...] = (), edit_keys: tuple[str, ...] = (), record_ids: tuple[str, ...] = ())

Holdout exclusions enforced before a tuple reaches the trainer.

excludes_window

excludes_window(window: WindowContext) -> bool

Return whether the entire source window is in a holdout.

Source code in geno_lewm/data/builder.py
def excludes_window(self, window: WindowContext) -> bool:
    """Return whether the entire source window is in a holdout."""
    if not isinstance(window, WindowContext):
        raise InputError(
            "window must be a WindowContext",
            details={"type": type(window).__name__},
        )
    if window.record_id in self.record_ids:
        return True
    if window.chrom in self.holdout_chroms:
        return True
    return any(
        interval.intersects(window.chrom, window.start_bp, window.end_bp)
        for interval in self.intervals
    )

excludes_edit

excludes_edit(window: WindowContext, edit: RelEdit) -> bool

Return whether one relative edit intersects an edit-level holdout.

Source code in geno_lewm/data/builder.py
def excludes_edit(self, window: WindowContext, edit: RelEdit) -> bool:
    """Return whether one relative edit intersects an edit-level holdout."""
    if window.chrom is None:
        return False
    if window.chrom in self.holdout_chroms:
        return True
    edit_start = window.start_bp + edit.rel_pos
    edit_end = edit_start + len(edit.ref_bases)
    if any(
        interval.intersects(window.chrom, edit_start, edit_end) for interval in self.intervals
    ):
        return True
    return _edit_key(window.chrom, edit_start + 1, edit.ref_bases, edit.alt_bases) in set(
        self.edit_keys
    )

TrainingTuple dataclass

TrainingTuple(window_id: str, source_record_id: str, edit_source: str, rel_edits: tuple[RelEdit, ...], target_window: str, window_start_bp: int, window_end_bp: int)

One RFC-0006 (window_id, action, target_window) training item.

TrainingDatasetItem dataclass

TrainingDatasetItem(source_window: WindowContext, training_tuple: TrainingTuple)

One stream item with the source window needed for trainer encoding.

GenoLeWMDataset

GenoLeWMDataset(windows: _WindowSource, providers: Mapping[str, _EditProvider], *, seed: int, mix: Sequence[EditSourceCount] = DEFAULT_EDIT_SOURCE_COUNTS, holdouts: HoldoutPolicy | None = None, fallback_sources: Mapping[str, str] | None = DEFAULT_SOURCE_FALLBACKS, preserve_length: bool = True)

Bases: _load_iterable_dataset_base()

Deterministic iterable dataset over windows and edit-source providers.

The class subclasses torch.utils.data.IterableDataset when torch is installed, but falls back to a plain Python iterable in core/dev environments. That keeps the data contract testable without pulling in the full training extra.

Source code in geno_lewm/data/builder.py
def __init__(
    self,
    windows: _WindowSource,
    providers: Mapping[str, _EditProvider],
    *,
    seed: int,
    mix: Sequence[EditSourceCount] = DEFAULT_EDIT_SOURCE_COUNTS,
    holdouts: HoldoutPolicy | None = None,
    fallback_sources: Mapping[str, str] | None = DEFAULT_SOURCE_FALLBACKS,
    preserve_length: bool = True,
) -> None:
    _require_nonnegative_int("seed", seed)
    if not providers:
        raise InputError("providers must contain at least one edit source")
    self.windows = windows
    self.providers = dict(providers)
    self.seed = seed
    self.mix = _normalize_mix(mix)
    self.holdouts = holdouts
    self.fallback_sources = dict(fallback_sources or {})
    self.preserve_length = preserve_length

__iter__

__iter__() -> Iterator[TrainingTuple]

Yield training tuples suitable for a PyTorch DataLoader.

Source code in geno_lewm/data/builder.py
def __iter__(self) -> Iterator[TrainingTuple]:
    """Yield training tuples suitable for a PyTorch DataLoader."""
    for item in self.iter_with_source_windows():
        yield item.training_tuple

iter_with_source_windows

iter_with_source_windows() -> Iterator[TrainingDatasetItem]

Yield tuples together with their source windows for trainer encoding.

Source code in geno_lewm/data/builder.py
def iter_with_source_windows(self) -> Iterator[TrainingDatasetItem]:
    """Yield tuples together with their source windows for trainer encoding."""
    worker = _torch_worker_info()
    rng = random.Random(self.seed + worker.id)
    for index, window in enumerate(_iter_window_source(self.windows)):
        if index % worker.num_workers != worker.id:
            continue
        if not isinstance(window, WindowContext):
            raise InputError(
                "window source must yield WindowContext values",
                details={"type": type(window).__name__},
            )
        for item in build_training_tuples(
            window,
            self.providers,
            rng=rng,
            mix=self.mix,
            holdouts=self.holdouts,
            fallback_sources=self.fallback_sources,
            preserve_length=self.preserve_length,
        ):
            yield TrainingDatasetItem(source_window=window, training_tuple=item)

build_training_tuples

build_training_tuples(window: WindowContext, providers: Mapping[str, _EditProvider], *, rng: Random, mix: Sequence[EditSourceCount] = DEFAULT_EDIT_SOURCE_COUNTS, holdouts: HoldoutPolicy | None = None, fallback_sources: Mapping[str, str] | None = DEFAULT_SOURCE_FALLBACKS, preserve_length: bool = True) -> tuple[TrainingTuple, ...]

Build per-window training tuples with source mix and holdout checks.

providers map source names to callables returning relative edits for that window. The default mix encodes RFC-0006's 3/3/1/1 gnomAD/synthetic-SNV/synthetic-indel/ClinVar allocation. If a source cannot produce enough edits, only explicitly configured fallbacks are used; missing gnomAD data therefore fails instead of silently turning the training stream synthetic.

Source code in geno_lewm/data/builder.py
def build_training_tuples(
    window: WindowContext,
    providers: Mapping[str, _EditProvider],
    *,
    rng: random.Random,
    mix: Sequence[EditSourceCount] = DEFAULT_EDIT_SOURCE_COUNTS,
    holdouts: HoldoutPolicy | None = None,
    fallback_sources: Mapping[str, str] | None = DEFAULT_SOURCE_FALLBACKS,
    preserve_length: bool = True,
) -> tuple[TrainingTuple, ...]:
    """Build per-window training tuples with source mix and holdout checks.

    ``providers`` map source names to callables returning relative edits
    for that window. The default mix encodes RFC-0006's 3/3/1/1
    gnomAD/synthetic-SNV/synthetic-indel/ClinVar allocation. If a source
    cannot produce enough edits, only explicitly configured fallbacks are
    used; missing gnomAD data therefore fails instead of silently turning
    the training stream synthetic.
    """
    if not isinstance(window, WindowContext):
        raise InputError("window must be a WindowContext")
    if not isinstance(rng, random.Random):
        raise InputError("rng must be a random.Random instance")
    active_holdouts = holdouts if holdouts is not None else HoldoutPolicy()
    if active_holdouts.excludes_window(window):
        return ()

    source_mix = _normalize_mix(mix)
    fallbacks = dict(fallback_sources or {})
    tuples: list[TrainingTuple] = []
    for entry in source_mix:
        if entry.count == 0:
            continue
        edits = _sample_edits(
            source=entry.source,
            count=entry.count,
            window=window,
            providers=providers,
            rng=rng,
            holdouts=active_holdouts,
            fallback_sources=fallbacks,
        )
        tuples.extend(
            _tuple_for_edit(
                window,
                edit,
                source=source,
                preserve_length=preserve_length,
            )
            for source, edit in edits
        )
    return tuple(tuples)

synthetic_snv_provider

synthetic_snv_provider(window: WindowContext, count: int, rng: Random) -> tuple[RelEdit, ...]

Provider for RFC-0006 uniform synthetic SNVs.

Source code in geno_lewm/data/builder.py
def synthetic_snv_provider(
    window: WindowContext, count: int, rng: random.Random
) -> tuple[RelEdit, ...]:
    """Provider for RFC-0006 uniform synthetic SNVs."""
    _require_nonnegative_int("count", count)
    return tuple(uniform_snv(window.sequence, count, rng=rng))

synthetic_indel_provider

synthetic_indel_provider(window: WindowContext, count: int, rng: Random) -> tuple[RelEdit, ...]

Provider for RFC-0006 synthetic indels.

Source code in geno_lewm/data/builder.py
def synthetic_indel_provider(
    window: WindowContext, count: int, rng: random.Random
) -> tuple[RelEdit, ...]:
    """Provider for RFC-0006 synthetic indels."""
    _require_nonnegative_int("count", count)
    return tuple(indel(window.sequence, count, rng=rng))

variant_provider

variant_provider(variants: Sequence[EditSpec]) -> _EditProvider

Return a provider backed by absolute VCF-style variants.

Source code in geno_lewm/data/builder.py
def variant_provider(variants: Sequence[EditSpec]) -> _EditProvider:
    """Return a provider backed by absolute VCF-style variants."""
    normalized = tuple(_require_edit_spec(value) for value in variants)
    by_chrom: dict[str, tuple[tuple[int, ...], tuple[EditSpec, ...]]] = {}
    chroms = sorted({variant.chrom for variant in normalized})
    for chrom in chroms:
        ordered = tuple(sorted((item for item in normalized if item.chrom == chrom), key=_edit_pos))
        by_chrom[chrom] = (tuple(item.pos for item in ordered), ordered)

    def _provider(window: WindowContext, count: int, rng: random.Random) -> tuple[RelEdit, ...]:
        _require_nonnegative_int("count", count)
        if count == 0:
            return ()
        if window.chrom is None:
            # Unplaced windows (e.g. the synthetic Carbon pretraining corpus)
            # carry no genome coordinates, so absolute VCF variants cannot be
            # mapped onto them. Yield nothing and let the source fallback supply
            # synthetic edits (see DEFAULT_SOURCE_FALLBACKS). Placed windows with
            # a chrom still receive their real gnomAD/ClinVar variants.
            return ()
        indexed = by_chrom.get(window.chrom)
        if indexed is None:
            return ()
        positions, chrom_variants = indexed
        start = bisect_right(positions, window.start_bp)
        stop = bisect_right(positions, window.end_bp)
        candidates = [
            variant.relative_to(window.start_bp, window.end_bp - 1)
            for variant in chrom_variants[start:stop]
            if variant.pos - 1 + len(variant.ref) <= window.end_bp
        ]
        rng.shuffle(candidates)
        return tuple(candidates[:count])

    return _provider