Skip to content

geno_lewm.data.corpus

corpus

Carbon pretraining corpus records and RFC-0006 window sampling.

CarbonSourceMix dataclass

CarbonSourceMix(source: str, fraction: float)

One source bucket in the RFC-0006 Carbon sub-mix.

CarbonCorpusConfig dataclass

CarbonCorpusConfig(dataset_id: str = DEFAULT_CARBON_DATASET_ID, dataset_config: str | None = None, revision: str | None = None, default_source: str | None = None, skip_invalid: bool = False, split: str = 'train', streaming: bool = True, subset_fraction: float = DEFAULT_PHASE1_SUBSET_FRACTION, subset_seed: int = 0, sequence_field: str = DEFAULT_SEQUENCE_FIELD, source_field: str = DEFAULT_SOURCE_FIELD, source_id_field: str = DEFAULT_SOURCE_ID_FIELD, window_bp: int = DEFAULT_WINDOW_BP, margin_bp: int = DEFAULT_CORPUS_MARGIN_BP, stride_bp: int = DEFAULT_CORPUS_STRIDE_BP)

Configuration for reading and windowing the Carbon pretraining corpus.

CarbonRecord dataclass

CarbonRecord(record_id: str, source: str, sequence: str)

Canonicalized source sequence record from the Carbon corpus.

length_bp property

length_bp: int

Return the canonical DNA sequence length in base pairs.

CarbonWindow dataclass

CarbonWindow(record_id: str, source: str, start_bp: int, end_bp: int, sequence: str)

A fixed-width training window sampled from a Carbon corpus record.

window_bp property

window_bp: int

Return the window length in base pairs.

window_id property

window_id: str

Return the content-addressed window hash as lowercase hex.

normalize_source_label

normalize_source_label(value: object) -> str

Normalize a Carbon corpus source label to the RFC-0006 source key.

Source code in geno_lewm/data/corpus.py
def normalize_source_label(value: object) -> str:
    """Normalize a Carbon corpus source label to the RFC-0006 source key."""
    if not isinstance(value, str) or not value.strip():
        raise InputError(
            "source label must be a non-empty string",
            details={"value": value, "type": type(value).__name__},
        )
    key = value.strip().lower().replace("-", " ").replace("/", " ")
    key = " ".join(key.split())
    normalized = _SOURCE_ALIASES.get(key)
    if normalized is None:
        raise InputError(
            "unsupported Carbon corpus source label",
            details={"source": value, "known_sources": [entry.source for entry in CARBON_SUBMIX]},
        )
    return normalized

sample_source

sample_source(rng: Random, *, mix: Sequence[CarbonSourceMix] = CARBON_SUBMIX) -> str

Sample one source key from the configured RFC-0006 sub-mix.

Source code in geno_lewm/data/corpus.py
def sample_source(
    rng: random.Random,
    *,
    mix: Sequence[CarbonSourceMix] = CARBON_SUBMIX,
) -> str:
    """Sample one source key from the configured RFC-0006 sub-mix."""
    return _sample_source_from_entries(rng, _validate_mix(mix))

draw_source_counts

draw_source_counts(n: int, *, rng: Random, mix: Sequence[CarbonSourceMix] = CARBON_SUBMIX) -> dict[str, int]

Draw n source samples and return counts by normalized source key.

Source code in geno_lewm/data/corpus.py
def draw_source_counts(
    n: int,
    *,
    rng: random.Random,
    mix: Sequence[CarbonSourceMix] = CARBON_SUBMIX,
) -> dict[str, int]:
    """Draw ``n`` source samples and return counts by normalized source key."""
    _require_nonnegative_int("n", n)
    entries = _validate_mix(mix)
    counts = {entry.source: 0 for entry in entries}
    for _ in range(n):
        counts[_sample_source_from_entries(rng, entries)] += 1
    return counts

stable_subset_includes

stable_subset_includes(record_id: str, *, fraction: float, seed: int = 0) -> bool

Return whether record_id belongs to a deterministic corpus subset.

Source code in geno_lewm/data/corpus.py
def stable_subset_includes(record_id: str, *, fraction: float, seed: int = 0) -> bool:
    """Return whether ``record_id`` belongs to a deterministic corpus subset."""
    _require_nonempty_str("record_id", record_id)
    _validate_fraction("fraction", fraction)
    _require_nonnegative_int("seed", seed)
    digest = hashlib.sha256(f"{seed}:{record_id}".encode()).digest()
    value = int.from_bytes(digest[:8], byteorder="big") / float(1 << 64)
    return value < fraction

iter_window_starts

iter_window_starts(sequence_length: int, *, window_bp: int = DEFAULT_WINDOW_BP, margin_bp: int = DEFAULT_CORPUS_MARGIN_BP, stride_bp: int = DEFAULT_CORPUS_STRIDE_BP, rng: Random | None = None) -> Iterator[int]

Yield RFC-0006 window starts respecting margin and stride constraints.

Source code in geno_lewm/data/corpus.py
def iter_window_starts(
    sequence_length: int,
    *,
    window_bp: int = DEFAULT_WINDOW_BP,
    margin_bp: int = DEFAULT_CORPUS_MARGIN_BP,
    stride_bp: int = DEFAULT_CORPUS_STRIDE_BP,
    rng: random.Random | None = None,
) -> Iterator[int]:
    """Yield RFC-0006 window starts respecting margin and stride constraints."""
    _require_nonnegative_int("sequence_length", sequence_length)
    _require_positive_int("window_bp", window_bp)
    _require_nonnegative_int("margin_bp", margin_bp)
    _require_positive_int("stride_bp", stride_bp)

    required = window_bp + (2 * margin_bp)
    if sequence_length < required:
        return

    min_start = margin_bp
    max_start = sequence_length - window_bp - margin_bp
    phase_span = min(stride_bp, max_start - min_start + 1)
    offset = rng.randrange(phase_span) if rng is not None and phase_span > 1 else 0
    start = min_start + offset
    while start <= max_start:
        yield start
        start += stride_bp

iter_record_windows

iter_record_windows(record: CarbonRecord, *, window_bp: int = DEFAULT_WINDOW_BP, margin_bp: int = DEFAULT_CORPUS_MARGIN_BP, stride_bp: int = DEFAULT_CORPUS_STRIDE_BP, rng: Random | None = None) -> Iterator[CarbonWindow]

Yield canonical windows for one Carbon corpus record.

Source code in geno_lewm/data/corpus.py
def iter_record_windows(
    record: CarbonRecord,
    *,
    window_bp: int = DEFAULT_WINDOW_BP,
    margin_bp: int = DEFAULT_CORPUS_MARGIN_BP,
    stride_bp: int = DEFAULT_CORPUS_STRIDE_BP,
    rng: random.Random | None = None,
) -> Iterator[CarbonWindow]:
    """Yield canonical windows for one Carbon corpus record."""
    for start in iter_window_starts(
        record.length_bp,
        window_bp=window_bp,
        margin_bp=margin_bp,
        stride_bp=stride_bp,
        rng=rng,
    ):
        end = start + window_bp
        yield CarbonWindow(
            record_id=record.record_id,
            source=record.source,
            start_bp=start,
            end_bp=end,
            sequence=record.sequence[start:end],
        )

iter_carbon_records

iter_carbon_records(rows: Iterable[Mapping[str, Any]], *, sequence_field: str = DEFAULT_SEQUENCE_FIELD, source_field: str = DEFAULT_SOURCE_FIELD, source_id_field: str = DEFAULT_SOURCE_ID_FIELD, subset_fraction: float = 1.0, subset_seed: int = 0, default_source: str | None = None, skip_invalid: bool = False) -> Iterator[CarbonRecord]

Yield canonical Carbon records from HF-style row mappings.

Single-source corpus configs (e.g. eukaryote_generator_10B_subset) do not carry a per-row source_field; pass default_source to label every record (it must still be a recognized source key). With skip_invalid, rows whose sequence carries unsupported (non-ACGTN) bases are skipped rather than raising — corpus shards occasionally contain IUPAC ambiguity codes.

Source code in geno_lewm/data/corpus.py
def iter_carbon_records(
    rows: Iterable[Mapping[str, Any]],
    *,
    sequence_field: str = DEFAULT_SEQUENCE_FIELD,
    source_field: str = DEFAULT_SOURCE_FIELD,
    source_id_field: str = DEFAULT_SOURCE_ID_FIELD,
    subset_fraction: float = 1.0,
    subset_seed: int = 0,
    default_source: str | None = None,
    skip_invalid: bool = False,
) -> Iterator[CarbonRecord]:
    """Yield canonical Carbon records from HF-style row mappings.

    Single-source corpus configs (e.g. ``eukaryote_generator_10B_subset``) do
    not carry a per-row ``source_field``; pass ``default_source`` to label every
    record (it must still be a recognized source key). With ``skip_invalid``,
    rows whose sequence carries unsupported (non-ACGTN) bases are skipped rather
    than raising — corpus shards occasionally contain IUPAC ambiguity codes.
    """
    _require_nonempty_str("sequence_field", sequence_field)
    _require_nonempty_str("source_field", source_field)
    _require_nonempty_str("source_id_field", source_id_field)
    _validate_fraction("subset_fraction", subset_fraction)
    _require_nonnegative_int("subset_seed", subset_seed)

    for row_idx, row in enumerate(rows):
        sequence_value = row.get(sequence_field)
        if not isinstance(sequence_value, str):
            if skip_invalid:
                continue
            raise InputError(
                "Carbon corpus row is missing a DNA sequence string",
                details={"row": row_idx, "sequence_field": sequence_field},
            )
        raw_source = row.get(source_field)
        if default_source is not None and (
            raw_source is None or (isinstance(raw_source, str) and not raw_source.strip())
        ):
            raw_source = default_source
        try:
            source = normalize_source_label(raw_source)
            sequence = canonicalize_dna(sequence_value)
        except InputError:
            if skip_invalid:
                continue
            raise
        raw_record_id = row.get(source_id_field)
        record_id = (
            str(raw_record_id) if raw_record_id not in (None, "") else _fallback_id(sequence)
        )
        if not stable_subset_includes(record_id, fraction=subset_fraction, seed=subset_seed):
            continue
        yield CarbonRecord(record_id=record_id, source=source, sequence=sequence)

load_hf_carbon_records

load_hf_carbon_records(config: CarbonCorpusConfig | None = None) -> Iterator[CarbonRecord]

Load Carbon corpus records through Hugging Face datasets lazily.

Source code in geno_lewm/data/corpus.py
def load_hf_carbon_records(
    config: CarbonCorpusConfig | None = None,
) -> Iterator[CarbonRecord]:
    """Load Carbon corpus records through Hugging Face ``datasets`` lazily."""
    if config is None:
        config = CarbonCorpusConfig()
    try:
        datasets = importlib.import_module("datasets")
    except ImportError as exc:
        raise RuntimeSetupError(
            "Carbon corpus loading requires Hugging Face datasets",
            remediation="install geno-lewm[train] or install datasets",
        ) from exc

    args: tuple[str, ...]
    if config.dataset_config is None:
        args = (config.dataset_id,)
    else:
        args = (config.dataset_id, config.dataset_config)
    dataset = datasets.load_dataset(
        *args,
        split=config.split,
        streaming=config.streaming,
        revision=config.revision,
    )
    return iter_carbon_records(
        dataset,
        sequence_field=config.sequence_field,
        source_field=config.source_field,
        source_id_field=config.source_id_field,
        subset_fraction=config.subset_fraction,
        subset_seed=config.subset_seed,
        default_source=config.default_source,
        skip_invalid=config.skip_invalid,
    )