Skip to content

geno_lewm.data.clinvar

clinvar

ClinVar local VCF preparation and shard loading.

ClinvarVariant dataclass

ClinvarVariant(chrom: str, pos: int, ref: str, alt: str, clinical_significance: str, review_status: str, gene_symbol: str | None, clinvar_id: int, schema_version: str = CLINVAR_SCHEMA_VERSION)

One normalized ClinVar row.

ClinvarPrepareReport dataclass

ClinvarPrepareReport(output_path: Path, release: str, records_read: int, allele_records_seen: int, records_written: int, skipped_allele: int, size_bytes: int, already_exists: bool = False)

Summary emitted by geno-lewm-prepare-clinvar.

prepare_clinvar_shard

prepare_clinvar_shard(input_vcf: str | Path, output_dir: str | Path, *, release: str, max_allele_len: int = 16, overwrite: bool = False) -> ClinvarPrepareReport

Normalize a local ClinVar VCF/VCF.gz into the release shard schema.

Source code in geno_lewm/data/clinvar.py
def prepare_clinvar_shard(
    input_vcf: str | Path,
    output_dir: str | Path,
    *,
    release: str,
    max_allele_len: int = 16,
    overwrite: bool = False,
) -> ClinvarPrepareReport:
    """Normalize a local ClinVar VCF/VCF.gz into the release shard schema."""
    _require_release(release)
    _require_positive_int("max_allele_len", max_allele_len)
    target = Path(output_dir) / "clinvar" / release / "variants.parquet"
    if target.exists() and not overwrite:
        return ClinvarPrepareReport(
            output_path=target,
            release=release,
            records_read=0,
            allele_records_seen=0,
            records_written=_parquet_num_rows(target),
            skipped_allele=0,
            size_bytes=target.stat().st_size,
            already_exists=True,
        )

    records_read = 0
    allele_records_seen = 0
    skipped_allele = 0

    def _selected_rows() -> Iterator[ClinvarVariant]:
        nonlocal records_read, allele_records_seen, skipped_allele
        for row in iter_vcf_rows(input_vcf):
            records_read += 1
            for alt_index, alt in enumerate(row.alts):
                allele_records_seen += 1
                if not is_supported_allele(
                    row.ref, max_len=max_allele_len
                ) or not is_supported_allele(alt, max_len=max_allele_len):
                    skipped_allele += 1
                    continue
                yield ClinvarVariant(
                    chrom=row.chrom,
                    pos=row.pos,
                    ref=row.ref,
                    alt=alt,
                    clinical_significance=_clinical_significance(row.info, alt_index),
                    review_status=_review_status(row.info),
                    gene_symbol=_gene_symbol(row.info),
                    clinvar_id=_clinvar_id(row.info, row.variant_id, alt_index),
                )

    records_written = _write_parquet(_selected_rows(), target)
    return ClinvarPrepareReport(
        output_path=target,
        release=release,
        records_read=records_read,
        allele_records_seen=allele_records_seen,
        records_written=records_written,
        skipped_allele=skipped_allele,
        size_bytes=target.stat().st_size,
    )

iter_clinvar_vcf_variants

iter_clinvar_vcf_variants(input_vcf: str | Path, *, max_allele_len: int = 16) -> Iterator[ClinvarVariant]

Yield normalized ClinVar rows from a local VCF without writing a shard.

Source code in geno_lewm/data/clinvar.py
def iter_clinvar_vcf_variants(
    input_vcf: str | Path,
    *,
    max_allele_len: int = 16,
) -> Iterator[ClinvarVariant]:
    """Yield normalized ClinVar rows from a local VCF without writing a shard."""
    _require_positive_int("max_allele_len", max_allele_len)
    for row in iter_vcf_rows(input_vcf):
        for alt_index, alt in enumerate(row.alts):
            if not is_supported_allele(row.ref, max_len=max_allele_len) or not is_supported_allele(
                alt, max_len=max_allele_len
            ):
                continue
            yield ClinvarVariant(
                chrom=row.chrom,
                pos=row.pos,
                ref=row.ref,
                alt=alt,
                clinical_significance=_clinical_significance(row.info, alt_index),
                review_status=_review_status(row.info),
                gene_symbol=_gene_symbol(row.info),
                clinvar_id=_clinvar_id(row.info, row.variant_id, alt_index),
            )

iter_clinvar_shard

iter_clinvar_shard(path: str | Path) -> Iterator[ClinvarVariant]

Yield normalized ClinVar rows from a Parquet shard.

Source code in geno_lewm/data/clinvar.py
def iter_clinvar_shard(path: str | Path) -> Iterator[ClinvarVariant]:
    """Yield normalized ClinVar rows from a Parquet shard."""
    _pa, pq = _require_pyarrow()
    table = pq.read_table(Path(path))
    for row in table.to_pylist():
        yield ClinvarVariant(
            chrom=str(row["chrom"]),
            pos=int(row["pos"]),
            ref=str(row["ref"]),
            alt=str(row["alt"]),
            clinical_significance=str(row["clinical_significance"]),
            review_status=str(row["review_status"]),
            gene_symbol=None if row.get("gene_symbol") is None else str(row["gene_symbol"]),
            clinvar_id=int(row["clinvar_id"]),
            schema_version=str(row["schema_version"]),
        )

label_set

label_set(variants: Iterable[ClinvarVariant]) -> tuple[ClinvarVariant, ...]

Return ClinVar rows usable for labelled eval, excluding VUS/OTHER.

Source code in geno_lewm/data/clinvar.py
def label_set(variants: Iterable[ClinvarVariant]) -> tuple[ClinvarVariant, ...]:
    """Return ClinVar rows usable for labelled eval, excluding VUS/OTHER."""
    return tuple(row for row in variants if row.clinical_significance in CLINVAR_LABELLED_CLASSES)