Skip to content

geno_lewm.surprise

surprise

Surprise-scoring helpers for RFC-0009.

CALIBRATION_SCHEMA_VERSION module-attribute

CALIBRATION_SCHEMA_VERSION = '1.0.0'

On-disk calibration table schema version.

DEFAULT_CDF_POINTS module-attribute

DEFAULT_CDF_POINTS = 1001

Number of points in each empirical CDF grid.

DEFAULT_REFERENCE_PER_BUCKET module-attribute

DEFAULT_REFERENCE_PER_BUCKET = 10000

Default maximum number of reference variants sampled per bucket.

LOW_CONFIDENCE_BUCKET_SIZE module-attribute

LOW_CONFIDENCE_BUCKET_SIZE = 100

Buckets below this size are marked low-confidence by RFC-0009.

DEFAULT_GC_HIGH_CUTOFF module-attribute

DEFAULT_GC_HIGH_CUTOFF: float = 2.0 / 3.0

Inclusive upper-tercile GC cutoff used when no fitted cutpoints are supplied.

DEFAULT_GC_LOW_CUTOFF module-attribute

DEFAULT_GC_LOW_CUTOFF: float = 1.0 / 3.0

Inclusive lower-tercile GC cutoff used when no fitted cutpoints are supplied.

DEFAULT_MIN_BUCKET_SIZE module-attribute

DEFAULT_MIN_BUCKET_SIZE = 1000

RFC-0009 default threshold for a well-populated calibration bucket.

GC_BINS module-attribute

GC_BINS: tuple[str, ...] = ('low', 'mid', 'high')

Canonical RFC-0009 gc_bin values.

REGION_CLASSES module-attribute

REGION_CLASSES: tuple[str, ...] = ('coding_synonymous', 'coding_missense', 'coding_nonsense', 'splice', 'utr5', 'utr3', 'intron', 'promoter', 'enhancer', 'intergenic', 'other')

Canonical RFC-0009 region_class values.

REPEAT_CLASSES module-attribute

REPEAT_CLASSES: tuple[str, ...] = ('none', 'simple', 'low_complexity', 'transposon', 'segmental_dup')

Canonical RFC-0009 repeat_class values.

UNKNOWN_BUCKET_ID module-attribute

UNKNOWN_BUCKET_ID = '*'

Catch-all calibration bucket reached after every parent bucket is sparse.

Aggregation module-attribute

Aggregation: TypeAlias = Literal['mean', 'max', 'median']

Supported aggregation modes for multi-step predictor outputs.

CalibrationBucket dataclass

CalibrationBucket(bucket_id: str, n_calibration: int, cdf: tuple[float, ...], sigma_grid: tuple[float, ...], back_off_to: str | None = None, schema_version: str = CALIBRATION_SCHEMA_VERSION)

One row in calibration.parquet.

confidence property

confidence: float

Return RFC-0009 confidence from this bucket's row count.

low_confidence property

low_confidence: bool

Return true when this bucket is below the low-confidence floor.

CalibrationExample dataclass

CalibrationExample(bucket_id: str, sigma_raw: float)

One pre-scored reference variant used to build calibration CDFs.

CalibrationTable dataclass

CalibrationTable(buckets: tuple[CalibrationBucket, ...], warnings: tuple[CalibrationWarning, ...] = (), schema_version: str = CALIBRATION_SCHEMA_VERSION)

In-memory representation of calibration.parquet.

get

get(bucket_id: str) -> CalibrationBucket | None

Return a bucket by ID, or None if absent.

Source code in geno_lewm/surprise/calibration.py
def get(self, bucket_id: str) -> CalibrationBucket | None:
    """Return a bucket by ID, or ``None`` if absent."""
    _require_bucket_id(bucket_id)
    by_id = {bucket.bucket_id: bucket for bucket in self.buckets}
    return by_id.get(bucket_id)

require

require(bucket_id: str) -> CalibrationBucket

Return a bucket by ID, raising InputError when absent.

Source code in geno_lewm/surprise/calibration.py
def require(self, bucket_id: str) -> CalibrationBucket:
    """Return a bucket by ID, raising ``InputError`` when absent."""
    bucket = self.get(bucket_id)
    if bucket is None:
        raise InputError(
            "calibration bucket is not present",
            details={"bucket_id": bucket_id},
        )
    return bucket

resolve

resolve(label_or_bucket: str, *, min_bucket_size: int = DEFAULT_MIN_BUCKET_SIZE) -> CalibrationBucket

Resolve a sparse bucket through the table's fixed backoff chain.

Source code in geno_lewm/surprise/calibration.py
def resolve(
    self,
    label_or_bucket: str,
    *,
    min_bucket_size: int = DEFAULT_MIN_BUCKET_SIZE,
) -> CalibrationBucket:
    """Resolve a sparse bucket through the table's fixed backoff chain."""
    threshold = _require_positive_int("min_bucket_size", min_bucket_size)
    counts = {bucket.bucket_id: bucket.n_calibration for bucket in self.buckets}
    resolved = select_backoff_bucket(label_or_bucket, counts, min_count=threshold)
    return self.require(resolved)

CalibrationWarning dataclass

CalibrationWarning(bucket_id: str, resolved_bucket_id: str, n_calibration: int, min_bucket_size: int, low_confidence: bool)

Sparse-bucket warning emitted while building a calibration table.

ContextLabel dataclass

ContextLabel(region_class: str, gc_bin: str, repeat_class: str)

Canonical RFC-0009 context tuple for a single variant locus.

bucket_id property

bucket_id: str

Return {region_class}|{gc_bin}|{repeat_class}.

as_tuple

as_tuple() -> tuple[str, str, str]

Return the canonical (region_class, gc_bin, repeat_class) tuple.

Source code in geno_lewm/surprise/context.py
def as_tuple(self) -> tuple[str, str, str]:
    """Return the canonical ``(region_class, gc_bin, repeat_class)`` tuple."""
    return (self.region_class, self.gc_bin, self.repeat_class)

backoff_chain

backoff_chain() -> tuple[str, ...]

Return bucket IDs from most specific to catch-all.

Source code in geno_lewm/surprise/context.py
def backoff_chain(self) -> tuple[str, ...]:
    """Return bucket IDs from most specific to catch-all."""
    return backoff_chain(self)

SurpriseResult dataclass

SurpriseResult(sigma_raw: float, sigma_calibrated: float, bucket_id: str, confidence: float, low_confidence: bool)

Calibrated surprise score for one edit.

to_dict

to_dict() -> dict[str, float | str | bool]

Return a JSON-native payload for CLI and JSONL outputs.

Source code in geno_lewm/surprise/score.py
def to_dict(self) -> dict[str, float | str | bool]:
    """Return a JSON-native payload for CLI and JSONL outputs."""
    return {
        "sigma_raw": self.sigma_raw,
        "sigma_calibrated": self.sigma_calibrated,
        "bucket_id": self.bucket_id,
        "confidence": self.confidence,
        "low_confidence": self.low_confidence,
    }

build_calibration_table

build_calibration_table(examples: Iterable[CalibrationExample], *, seed: int = 0, per_bucket_sample: int = DEFAULT_REFERENCE_PER_BUCKET, grid_size: int = DEFAULT_CDF_POINTS, min_bucket_size: int = DEFAULT_MIN_BUCKET_SIZE, low_confidence_size: int = LOW_CONFIDENCE_BUCKET_SIZE, warn_sparse: bool = True) -> CalibrationTable

Build deterministic empirical CDF buckets from pre-scored examples.

Source code in geno_lewm/surprise/calibration.py
def build_calibration_table(
    examples: Iterable[CalibrationExample],
    *,
    seed: int = 0,
    per_bucket_sample: int = DEFAULT_REFERENCE_PER_BUCKET,
    grid_size: int = DEFAULT_CDF_POINTS,
    min_bucket_size: int = DEFAULT_MIN_BUCKET_SIZE,
    low_confidence_size: int = LOW_CONFIDENCE_BUCKET_SIZE,
    warn_sparse: bool = True,
) -> CalibrationTable:
    """Build deterministic empirical CDF buckets from pre-scored examples."""
    _require_seed(seed)
    sample_limit = _require_positive_int("per_bucket_sample", per_bucket_sample)
    points = _require_positive_int("grid_size", grid_size)
    if points < 2:
        raise InputError("grid_size must be at least 2", details={"grid_size": points})
    min_size = _require_positive_int("min_bucket_size", min_bucket_size)
    low_size = _require_positive_int("low_confidence_size", low_confidence_size)
    if low_size > min_size:
        raise InputError(
            "low_confidence_size must be <= min_bucket_size",
            details={"low_confidence_size": low_size, "min_bucket_size": min_size},
        )

    aggregated: dict[str, list[float]] = {}
    source_buckets: set[str] = set()
    for example in examples:
        if not isinstance(example, CalibrationExample):
            raise InputError(
                "examples must contain CalibrationExample instances",
                details={"type": type(example).__name__},
            )
        source_buckets.add(example.bucket_id)
        for bucket_id in backoff_chain(example.bucket_id):
            aggregated.setdefault(bucket_id, []).append(float(example.sigma_raw))

    if not source_buckets:
        raise InputError("calibration examples must contain at least one row")

    sampled: dict[str, tuple[float, ...]] = {}
    for bucket_id, values in sorted(aggregated.items()):
        sampled[bucket_id] = _sample_bucket_values(
            values,
            seed=seed,
            bucket_id=bucket_id,
            sample_limit=sample_limit,
        )

    counts = {bucket_id: len(values) for bucket_id, values in sampled.items()}
    buckets: list[CalibrationBucket] = []
    for bucket_id, sampled_values in sorted(sampled.items()):
        cdf, sigma_grid = _empirical_cdf(sampled_values, grid_size=points)
        resolved = select_backoff_bucket(bucket_id, counts, min_count=min_size)
        buckets.append(
            CalibrationBucket(
                bucket_id=bucket_id,
                n_calibration=len(sampled_values),
                cdf=cdf,
                sigma_grid=sigma_grid,
                back_off_to=None if resolved == bucket_id else resolved,
            )
        )

    sparse_warnings = _sparse_warnings(
        sorted(source_buckets),
        counts,
        min_bucket_size=min_size,
        low_confidence_size=low_size,
    )
    if warn_sparse:
        for warning in sparse_warnings:
            warnings.warn(
                "calibration bucket remains sparse after backoff: "
                f"{warning.bucket_id} -> {warning.resolved_bucket_id} "
                f"n={warning.n_calibration} min={warning.min_bucket_size}",
                RuntimeWarning,
                stacklevel=2,
            )

    return CalibrationTable(buckets=tuple(buckets), warnings=sparse_warnings)

read_calibration_table

read_calibration_table(path: str | Path) -> CalibrationTable

Read and validate a calibration Parquet file.

Source code in geno_lewm/surprise/calibration.py
def read_calibration_table(path: str | Path) -> CalibrationTable:
    """Read and validate a calibration Parquet file."""
    _pa, pq = _require_pyarrow()
    source = Path(path)
    try:
        arrow_table = pq.read_table(source)
    except Exception as exc:
        raise SchemaCompatError(
            "calibration table could not be read",
            details={"path": str(source), "error": str(exc)},
        ) from exc

    observed = tuple(arrow_table.column_names)
    expected = _column_names()
    if observed != expected:
        raise SchemaCompatError(
            "calibration table columns do not match the documented schema",
            details={"observed": list(observed), "expected": list(expected)},
        )

    return CalibrationTable(
        buckets=tuple(
            CalibrationBucket(
                bucket_id=str(row["bucket_id"]),
                n_calibration=int(row["n_calibration"]),
                cdf=tuple(float(value) for value in row["cdf"]),
                sigma_grid=tuple(float(value) for value in row["sigma_grid"]),
                back_off_to=None if row["back_off_to"] is None else str(row["back_off_to"]),
                schema_version=str(row["schema_version"]),
            )
            for row in arrow_table.to_pylist()
        )
    )

write_calibration_table

write_calibration_table(table: CalibrationTable, path: str | Path) -> Path

Write a calibration table to calibration.parquet.

Source code in geno_lewm/surprise/calibration.py
def write_calibration_table(table: CalibrationTable, path: str | Path) -> Path:
    """Write a calibration table to ``calibration.parquet``."""
    pa, pq = _require_pyarrow()
    destination = Path(path)
    destination.parent.mkdir(parents=True, exist_ok=True)
    arrow_table = pa.Table.from_pydict(
        {
            "bucket_id": [bucket.bucket_id for bucket in table.buckets],
            "n_calibration": [bucket.n_calibration for bucket in table.buckets],
            "cdf": [list(bucket.cdf) for bucket in table.buckets],
            "sigma_grid": [list(bucket.sigma_grid) for bucket in table.buckets],
            "back_off_to": [bucket.back_off_to for bucket in table.buckets],
            "schema_version": [bucket.schema_version for bucket in table.buckets],
        },
        schema=_arrow_schema(pa),
    )
    pq.write_table(arrow_table, destination, compression="zstd", compression_level=9)
    return destination

backoff_chain

backoff_chain(label_or_bucket: ContextLabel | str) -> tuple[str, ...]

Return fixed parent-bucket IDs ending in *.

Full buckets back off as region|gc|repeat -> region|gc -> region -> *. Parent buckets can also be passed directly.

Source code in geno_lewm/surprise/context.py
def backoff_chain(label_or_bucket: ContextLabel | str) -> tuple[str, ...]:
    """Return fixed parent-bucket IDs ending in ``*``.

    Full buckets back off as ``region|gc|repeat`` -> ``region|gc`` ->
    ``region`` -> ``*``. Parent buckets can also be passed directly.
    """
    parts = _bucket_parts(label_or_bucket)
    if not parts:
        return (UNKNOWN_BUCKET_ID,)
    parents = tuple(_join_bucket_parts(parts[:i]) for i in range(len(parts), 0, -1))
    return (*parents, UNKNOWN_BUCKET_ID)

classify_context

classify_context(*, region: str | Sequence[str] | None, gc_window: str, repeat: str | Sequence[str] | None = None, low_gc_cutoff: float = DEFAULT_GC_LOW_CUTOFF, high_gc_cutoff: float = DEFAULT_GC_HIGH_CUTOFF) -> ContextLabel

Build a canonical context label from annotation terms and a DNA window.

region and repeat accept upstream annotation labels such as VEP/SnpEff consequences or repeat-masker class strings. gc_window is the sequence window around the variant locus.

Source code in geno_lewm/surprise/context.py
def classify_context(
    *,
    region: str | Sequence[str] | None,
    gc_window: str,
    repeat: str | Sequence[str] | None = None,
    low_gc_cutoff: float = DEFAULT_GC_LOW_CUTOFF,
    high_gc_cutoff: float = DEFAULT_GC_HIGH_CUTOFF,
) -> ContextLabel:
    """Build a canonical context label from annotation terms and a DNA window.

    ``region`` and ``repeat`` accept upstream annotation labels such as
    VEP/SnpEff consequences or repeat-masker class strings. ``gc_window``
    is the sequence window around the variant locus.
    """
    return ContextLabel(
        region_class=classify_region(region),
        gc_bin=classify_gc_bin(
            gc_window,
            low_cutoff=low_gc_cutoff,
            high_cutoff=high_gc_cutoff,
        ),
        repeat_class=classify_repeat(repeat),
    )

classify_gc_bin

classify_gc_bin(sequence: str, *, low_cutoff: float = DEFAULT_GC_LOW_CUTOFF, high_cutoff: float = DEFAULT_GC_HIGH_CUTOFF) -> str

Return low, mid, or high for a DNA window's GC fraction.

Source code in geno_lewm/surprise/context.py
def classify_gc_bin(
    sequence: str,
    *,
    low_cutoff: float = DEFAULT_GC_LOW_CUTOFF,
    high_cutoff: float = DEFAULT_GC_HIGH_CUTOFF,
) -> str:
    """Return ``low``, ``mid``, or ``high`` for a DNA window's GC fraction."""
    low = _validate_cutoff("low_cutoff", low_cutoff)
    high = _validate_cutoff("high_cutoff", high_cutoff)
    if low >= high:
        raise InputError(
            "low_cutoff must be less than high_cutoff",
            details={"low_cutoff": low, "high_cutoff": high},
        )

    fraction = gc_fraction(sequence)
    if fraction <= low:
        return "low"
    if fraction >= high:
        return "high"
    return "mid"

classify_region

classify_region(annotation: str | Sequence[str] | None) -> str

Return the canonical region_class for annotation term(s).

Source code in geno_lewm/surprise/context.py
def classify_region(annotation: str | Sequence[str] | None) -> str:
    """Return the canonical ``region_class`` for annotation term(s)."""
    terms = _annotation_terms(annotation, field="region")
    if not terms:
        return "other"

    for region_class, aliases in _REGION_ALIAS_GROUPS:
        if any(term == region_class or term in aliases for term in terms):
            return region_class
    return "other"

classify_repeat

classify_repeat(annotation: str | Sequence[str] | None) -> str

Return the canonical repeat_class for repeat annotation term(s).

Source code in geno_lewm/surprise/context.py
def classify_repeat(annotation: str | Sequence[str] | None) -> str:
    """Return the canonical ``repeat_class`` for repeat annotation term(s)."""
    terms = _annotation_terms(annotation, field="repeat")
    if not terms:
        return "none"

    for repeat_class, aliases in _REPEAT_ALIAS_GROUPS:
        if any(term == repeat_class or term in aliases for term in terms):
            return repeat_class

    raise InputError(
        "repeat annotation does not map to a known repeat_class",
        details={"annotation": list(terms), "allowed": list(REPEAT_CLASSES)},
        remediation="normalize the repeat track to none/simple/low_complexity/transposon/segmental_dup",
    )

gc_fraction

gc_fraction(sequence: str) -> float

Return GC fraction over called A/C/G/T bases in sequence.

N bases are valid in reference windows but are excluded from the denominator because their GC status is unknown. A window containing no called bases is rejected.

Source code in geno_lewm/surprise/context.py
def gc_fraction(sequence: str) -> float:
    """Return GC fraction over called A/C/G/T bases in ``sequence``.

    ``N`` bases are valid in reference windows but are excluded from the
    denominator because their GC status is unknown. A window containing
    no called bases is rejected.
    """
    canonical = canonicalize_dna(sequence)
    called_count = sum(base in _CALLED_BASES for base in canonical)
    if called_count == 0:
        raise InputError(
            "GC window contains no called A/C/G/T bases",
            details={"length": len(canonical)},
        )
    gc_count = sum(base in _GC_BASES for base in canonical)
    return gc_count / called_count

make_bucket_id

make_bucket_id(region_class: str, gc_bin: str, repeat_class: str) -> str

Return the stable full calibration bucket ID for a context tuple.

Source code in geno_lewm/surprise/context.py
def make_bucket_id(region_class: str, gc_bin: str, repeat_class: str) -> str:
    """Return the stable full calibration bucket ID for a context tuple."""
    return _join_bucket_parts(
        (
            _require_member("region_class", region_class, REGION_CLASSES),
            _require_member("gc_bin", gc_bin, GC_BINS),
            _require_member("repeat_class", repeat_class, REPEAT_CLASSES),
        )
    )

select_backoff_bucket

select_backoff_bucket(label_or_bucket: ContextLabel | str, bucket_sizes: Mapping[str, int], *, min_count: int = DEFAULT_MIN_BUCKET_SIZE) -> str

Return the first bucket in the backoff chain with enough calibration rows.

If every specific parent is sparse, the catch-all * bucket is returned. Downstream calibration code can still report low confidence based on that bucket's own count.

Source code in geno_lewm/surprise/context.py
def select_backoff_bucket(
    label_or_bucket: ContextLabel | str,
    bucket_sizes: Mapping[str, int],
    *,
    min_count: int = DEFAULT_MIN_BUCKET_SIZE,
) -> str:
    """Return the first bucket in the backoff chain with enough calibration rows.

    If every specific parent is sparse, the catch-all ``*`` bucket is
    returned. Downstream calibration code can still report low
    confidence based on that bucket's own count.
    """
    threshold = _validate_positive_int("min_count", min_count)
    counts = _validate_bucket_sizes(bucket_sizes)
    chain = backoff_chain(label_or_bucket)
    for bucket_id in chain[:-1]:
        if counts.get(bucket_id, 0) >= threshold:
            return bucket_id
    return chain[-1]

score_variant

score_variant(variant: EditSpec, encoder: object, action_encoder: object, predictor: object, calibration: CalibrationTable, *, reference_window: str, window_start_bp: int = 0, region: str | Sequence[str] | None = None, repeat: str | Sequence[str] | None = None, aggregation: str = 'mean', min_bucket_size: int = DEFAULT_MIN_BUCKET_SIZE) -> SurpriseResult

Score one edit against a caller-supplied reference window.

The scorer is intentionally model-object agnostic: callers can pass the concrete training-time modules or small deterministic fakes. FASTA-backed window extraction is available through :func:score_vcf; checkpoint loading is owned by higher runtime layers.

Source code in geno_lewm/surprise/score.py
def score_variant(
    variant: EditSpec,
    encoder: object,
    action_encoder: object,
    predictor: object,
    calibration: CalibrationTable,
    *,
    reference_window: str,
    window_start_bp: int = 0,
    region: str | Sequence[str] | None = None,
    repeat: str | Sequence[str] | None = None,
    aggregation: str = "mean",
    min_bucket_size: int = DEFAULT_MIN_BUCKET_SIZE,
) -> SurpriseResult:
    """Score one edit against a caller-supplied reference window.

    The scorer is intentionally model-object agnostic: callers can pass
    the concrete training-time modules or small deterministic fakes.
    FASTA-backed window extraction is available through :func:`score_vcf`;
    checkpoint loading is owned by higher runtime layers.
    """
    _require_calibration_table(calibration)
    min_size = _require_positive_int("min_bucket_size", min_bucket_size)
    bucket_id, sigma_raw = _raw_surprise(
        variant,
        encoder,
        action_encoder,
        predictor,
        reference_window=reference_window,
        window_start_bp=window_start_bp,
        region=region,
        repeat=repeat,
        aggregation=aggregation,
    )
    bucket = calibration.resolve(bucket_id, min_bucket_size=min_size)
    return SurpriseResult(
        sigma_raw=sigma_raw,
        sigma_calibrated=_cdf_percentile(bucket, sigma_raw),
        bucket_id=bucket.bucket_id,
        confidence=bucket.confidence,
        low_confidence=bucket.low_confidence,
    )

score_vcf

score_vcf(vcf_path: str | Path, encoder: object, action_encoder: object, predictor: object, calibration: CalibrationTable, output_path: str | Path, *, reference_windows: Mapping[str, str] | None = None, reference_fasta: str | Path | None = None, window_bp: int = DEFAULT_WINDOW_BP, window_start_bp: int = 0, region: str | Sequence[str] | None = None, repeat: str | Sequence[str] | None = None, aggregation: str = 'mean', show_progress: bool = True, batch_size: int = 64, min_bucket_size: int = DEFAULT_MIN_BUCKET_SIZE) -> Path

Score VCF rows and write one JSON object per scored alternate.

Pass reference_fasta for local FASTA-backed window extraction. reference_windows remains useful for tests and already-extracted windows. Mapping keys are tried in this order: chrom:pos:ref:alt, chrom:pos, then chrom.

Source code in geno_lewm/surprise/score.py
def score_vcf(
    vcf_path: str | Path,
    encoder: object,
    action_encoder: object,
    predictor: object,
    calibration: CalibrationTable,
    output_path: str | Path,
    *,
    reference_windows: Mapping[str, str] | None = None,
    reference_fasta: str | Path | None = None,
    window_bp: int = DEFAULT_WINDOW_BP,
    window_start_bp: int = 0,
    region: str | Sequence[str] | None = None,
    repeat: str | Sequence[str] | None = None,
    aggregation: str = "mean",
    show_progress: bool = True,
    batch_size: int = 64,
    min_bucket_size: int = DEFAULT_MIN_BUCKET_SIZE,
) -> Path:
    """Score VCF rows and write one JSON object per scored alternate.

    Pass ``reference_fasta`` for local FASTA-backed window extraction.
    ``reference_windows`` remains useful for tests and already-extracted
    windows. Mapping keys are tried in this order:
    ``chrom:pos:ref:alt``, ``chrom:pos``, then ``chrom``.
    """
    if not isinstance(show_progress, bool):
        raise InputError(
            "show_progress must be a bool",
            details={"type": type(show_progress).__name__},
        )
    del show_progress

    output = Path(output_path)
    output.parent.mkdir(parents=True, exist_ok=True)
    with output.open("w", encoding="utf-8") as handle:
        for record in _iter_vcf_scores(
            vcf_path,
            encoder,
            action_encoder,
            predictor,
            calibration,
            reference_windows=reference_windows,
            reference_fasta=reference_fasta,
            window_bp=window_bp,
            window_start_bp=window_start_bp,
            region=region,
            repeat=repeat,
            aggregation=aggregation,
            batch_size=batch_size,
            min_bucket_size=min_bucket_size,
        ):
            variant = record.variant
            handle.write(
                json.dumps(
                    {
                        "schema_version": SCORE_JSONL_SCHEMA_VERSION,
                        "generated_by": SCORE_JSONL_GENERATED_BY,
                        "chrom": variant.chrom,
                        "pos": variant.pos,
                        "ref": variant.ref,
                        "alt": variant.alt,
                        **record.result.to_dict(),
                    },
                    sort_keys=True,
                )
                + "\n"
            )
    return output