Skip to content

geno_lewm.data.gnomad

gnomad

gnomAD local VCF preparation and shard loading.

GnomadVariant dataclass

GnomadVariant(chrom: str, pos: int, ref: str, alt: str, af_global: float, af_afr: float | None, af_ami: float | None, af_amr: float | None, af_asj: float | None, af_eas: float | None, af_fin: float | None, af_nfe: float | None, af_oth: float | None, af_sas: float | None, filter: str, schema_version: str = GNOMAD_SCHEMA_VERSION)

One normalized common-variant row for the gnomAD shard.

GnomadPrepareReport dataclass

GnomadPrepareReport(output_path: Path, release: str, records_read: int, allele_records_seen: int, records_written: int, skipped_filter: int, skipped_af: int, skipped_allele: int, size_bytes: int, already_exists: bool = False)

Summary emitted by geno-lewm-prepare-gnomad.

prepare_gnomad_shard

prepare_gnomad_shard(input_vcf: str | Path, output_dir: str | Path, *, release: str = 'v4.1', min_af: float = 0.01, max_allele_len: int = 16, overwrite: bool = False) -> GnomadPrepareReport

Filter a local gnomAD VCF/VCF.gz into the release shard schema.

Source code in geno_lewm/data/gnomad.py
def prepare_gnomad_shard(
    input_vcf: str | Path,
    output_dir: str | Path,
    *,
    release: str = "v4.1",
    min_af: float = 0.01,
    max_allele_len: int = 16,
    overwrite: bool = False,
) -> GnomadPrepareReport:
    """Filter a local gnomAD VCF/VCF.gz into the release shard schema."""
    _require_release(release)
    _require_probability("min_af", min_af)
    _require_positive_int("max_allele_len", max_allele_len)

    target = Path(output_dir) / "gnomad" / release / "variants.parquet"
    if target.exists() and not overwrite:
        return GnomadPrepareReport(
            output_path=target,
            release=release,
            records_read=0,
            allele_records_seen=0,
            records_written=_parquet_num_rows(target),
            skipped_filter=0,
            skipped_af=0,
            skipped_allele=0,
            size_bytes=target.stat().st_size,
            already_exists=True,
        )

    records_read = 0
    allele_records_seen = 0
    skipped_filter = 0
    skipped_af = 0
    skipped_allele = 0

    def _selected_rows() -> Iterator[GnomadVariant]:
        nonlocal records_read, allele_records_seen, skipped_filter, skipped_af, skipped_allele
        for row in iter_vcf_rows(input_vcf):
            records_read += 1
            for alt_index, alt in enumerate(row.alts):
                allele_records_seen += 1
                if row.filter != "PASS":
                    skipped_filter += 1
                    continue
                if not is_supported_allele(
                    row.ref, max_len=max_allele_len
                ) or not is_supported_allele(alt, max_len=max_allele_len):
                    skipped_allele += 1
                    continue
                af_global = _af_for(row.info, ("AF", "AF_global", "AF_GLOBAL"), alt_index)
                if af_global is None or af_global < min_af:
                    skipped_af += 1
                    continue
                yield GnomadVariant(
                    chrom=row.chrom,
                    pos=row.pos,
                    ref=row.ref,
                    alt=alt,
                    af_global=af_global,
                    af_afr=_af_for(row.info, ("AF_afr", "AF_AFR"), alt_index),
                    af_ami=_af_for(row.info, ("AF_ami", "AF_AMI"), alt_index),
                    af_amr=_af_for(row.info, ("AF_amr", "AF_AMR"), alt_index),
                    af_asj=_af_for(row.info, ("AF_asj", "AF_ASJ"), alt_index),
                    af_eas=_af_for(row.info, ("AF_eas", "AF_EAS"), alt_index),
                    af_fin=_af_for(row.info, ("AF_fin", "AF_FIN"), alt_index),
                    af_nfe=_af_for(row.info, ("AF_nfe", "AF_NFE"), alt_index),
                    af_oth=_af_for(row.info, ("AF_oth", "AF_OTH"), alt_index),
                    af_sas=_af_for(row.info, ("AF_sas", "AF_SAS"), alt_index),
                    filter=row.filter,
                )

    records_written = _write_parquet(_selected_rows(), target)
    return GnomadPrepareReport(
        output_path=target,
        release=release,
        records_read=records_read,
        allele_records_seen=allele_records_seen,
        records_written=records_written,
        skipped_filter=skipped_filter,
        skipped_af=skipped_af,
        skipped_allele=skipped_allele,
        size_bytes=target.stat().st_size,
    )

iter_gnomad_vcf_variants

iter_gnomad_vcf_variants(input_vcf: str | Path, *, min_af: float = 0.01, max_allele_len: int = 16) -> Iterator[GnomadVariant]

Yield normalized rows from a local gnomAD VCF without writing a shard.

Source code in geno_lewm/data/gnomad.py
def iter_gnomad_vcf_variants(
    input_vcf: str | Path,
    *,
    min_af: float = 0.01,
    max_allele_len: int = 16,
) -> Iterator[GnomadVariant]:
    """Yield normalized rows from a local gnomAD VCF without writing a shard."""
    report = prepare_gnomad_shard
    del report
    _require_probability("min_af", min_af)
    _require_positive_int("max_allele_len", max_allele_len)
    for row in iter_vcf_rows(input_vcf):
        for alt_index, alt in enumerate(row.alts):
            if row.filter != "PASS":
                continue
            if not is_supported_allele(row.ref, max_len=max_allele_len) or not is_supported_allele(
                alt, max_len=max_allele_len
            ):
                continue
            af_global = _af_for(row.info, ("AF", "AF_global", "AF_GLOBAL"), alt_index)
            if af_global is None or af_global < min_af:
                continue
            yield GnomadVariant(
                chrom=row.chrom,
                pos=row.pos,
                ref=row.ref,
                alt=alt,
                af_global=af_global,
                af_afr=_af_for(row.info, ("AF_afr", "AF_AFR"), alt_index),
                af_ami=_af_for(row.info, ("AF_ami", "AF_AMI"), alt_index),
                af_amr=_af_for(row.info, ("AF_amr", "AF_AMR"), alt_index),
                af_asj=_af_for(row.info, ("AF_asj", "AF_ASJ"), alt_index),
                af_eas=_af_for(row.info, ("AF_eas", "AF_EAS"), alt_index),
                af_fin=_af_for(row.info, ("AF_fin", "AF_FIN"), alt_index),
                af_nfe=_af_for(row.info, ("AF_nfe", "AF_NFE"), alt_index),
                af_oth=_af_for(row.info, ("AF_oth", "AF_OTH"), alt_index),
                af_sas=_af_for(row.info, ("AF_sas", "AF_SAS"), alt_index),
                filter=row.filter,
            )

iter_gnomad_shard

iter_gnomad_shard(path: str | Path) -> Iterator[GnomadVariant]

Yield normalized gnomAD rows from a Parquet shard.

Source code in geno_lewm/data/gnomad.py
def iter_gnomad_shard(path: str | Path) -> Iterator[GnomadVariant]:
    """Yield normalized gnomAD rows from a Parquet shard."""
    _pa, pq = _require_pyarrow()
    table = pq.read_table(Path(path))
    for row in table.to_pylist():
        yield GnomadVariant(
            chrom=str(row["chrom"]),
            pos=int(row["pos"]),
            ref=str(row["ref"]),
            alt=str(row["alt"]),
            af_global=float(row["af_global"]),
            af_afr=_optional_float(row.get("af_afr")),
            af_ami=_optional_float(row.get("af_ami")),
            af_amr=_optional_float(row.get("af_amr")),
            af_asj=_optional_float(row.get("af_asj")),
            af_eas=_optional_float(row.get("af_eas")),
            af_fin=_optional_float(row.get("af_fin")),
            af_nfe=_optional_float(row.get("af_nfe")),
            af_oth=_optional_float(row.get("af_oth")),
            af_sas=_optional_float(row.get("af_sas")),
            filter=str(row["filter"]),
            schema_version=str(row["schema_version"]),
        )