Skip to content

geno_lewm.observability

observability

Structured logging for GenoLeWM.

This module is the single source of truth for the JSONL logging format defined in docs/spec/05-observability.md and RFC-0013.

What it provides:

  • :data:EVENTS — the immutable registry of event names, severities, and summaries. Renaming an event name is a MAJOR change.
  • :func:get_logger — factory returning a :class:GenoLeWMLogger bound to a component (subsystem). Loggers share the same sinks per run_id so concurrent components write to one ordered stream.
  • :func:logged_run — context manager that opens / closes the per-run sink, emits run.start / run.end style book-ends if asked, and flushes the buffer on any exception so records survive a crash.

What it does NOT provide (deferred to follow-up issues):

  • Redaction filter (#24). The logger currently accepts whatever payload it is given; the filter will plug into :class:GenoLeWMLogger._emit.
  • Metrics registry / Prometheus exporter (#25).
  • registered_event_name AST linter (#27).
  • OpenTelemetry sinks (RFC-0013 §"Sinks").

The interface ships first so dependent subsystems can take a hard dependency on the event registry today.

EventSpec dataclass

EventSpec(name: str, severity: Severity, summary: str, allowed_keys: frozenset[str] = frozenset())

A single row in the :data:EVENTS registry.

allowed_keys lists the data keys that the per-event redaction allowlist permits (RFC-0013 §3.5). Standardized fields (step, epoch, phase, duration_ms, trace_id, span_id, error_code) are promoted out of data before redaction and are always allowed at the top level — they need not appear here.

LogRecord dataclass

LogRecord(ts: str, severity: Severity, event: str, run_id: str, component: str, data: dict[str, Any] = dict(), step: int | None = None, epoch: int | None = None, phase: str | None = None, duration_ms: int | None = None, trace_id: str | None = None, span_id: str | None = None, error_code: str | None = None)

One row written by the logger.

The record carries the spec-required fields directly and stashes event-specific structured fields under :attr:data. to_dict returns the exact wire shape — keys are stable across versions.

GenoLeWMLogger

GenoLeWMLogger(component: str, *, run_id: str, log_dir: Path, sink: _Sink, level: Severity = 'info', pretty: bool = False)

Component-scoped structured logger.

Loggers are cheap to construct (cached by (component, run_id)) and thread-safe: the underlying sink serializes writes.

Source code in geno_lewm/observability.py
def __init__(
    self,
    component: str,
    *,
    run_id: str,
    log_dir: Path,
    sink: _Sink,
    level: Severity = "info",
    pretty: bool = False,
) -> None:
    self.component = component
    self.run_id = run_id
    self.log_dir = log_dir
    self._sink = sink
    self._level = level
    self._pretty = pretty

current_trace_context

current_trace_context() -> tuple[str | None, str | None]

Return (trace_id, span_id) from the current context.

Source code in geno_lewm/observability.py
def current_trace_context() -> tuple[str | None, str | None]:
    """Return ``(trace_id, span_id)`` from the current context."""
    return _TRACE_ID.get(), _SPAN_ID.get()

set_trace_context

set_trace_context(*, trace_id: str | None, span_id: str | None) -> Iterator[None]

Push (trace_id, span_id) into the contextvar for the block.

Source code in geno_lewm/observability.py
@contextlib.contextmanager
def set_trace_context(*, trace_id: str | None, span_id: str | None) -> Iterator[None]:
    """Push ``(trace_id, span_id)`` into the contextvar for the block."""
    t_tok = _TRACE_ID.set(trace_id)
    s_tok = _SPAN_ID.set(span_id)
    try:
        yield
    finally:
        _TRACE_ID.reset(t_tok)
        _SPAN_ID.reset(s_tok)

get_logger

get_logger(component: str, *, run_id: str | None = None, log_dir: str | PathLike[str] | None = None, level: Severity | None = None, pretty: bool | None = None) -> GenoLeWMLogger

Return a logger bound to component.

Loggers are cached by (component, run_id, log_dir); calling get_logger twice with the same arguments returns the same instance, so independent subsystems share one ordered stream per run.

Defaults:

  • run_id: $GENO_LEWM_RUN_ID or a random run-<hex>.
  • log_dir: $GENO_LEWM_LOG_DIR or ~/.geno-lewm/logs.
  • level: $GENO_LEWM_LOG_LEVEL (default info).
  • pretty: TTY-detected, overridable by $GENO_LEWM_LOG_FORMAT.
Source code in geno_lewm/observability.py
def get_logger(
    component: str,
    *,
    run_id: str | None = None,
    log_dir: str | os.PathLike[str] | None = None,
    level: Severity | None = None,
    pretty: bool | None = None,
) -> GenoLeWMLogger:
    """Return a logger bound to ``component``.

    Loggers are cached by ``(component, run_id, log_dir)``; calling
    ``get_logger`` twice with the same arguments returns the same
    instance, so independent subsystems share one ordered stream per
    run.

    Defaults:

    - ``run_id``: ``$GENO_LEWM_RUN_ID`` or a random ``run-<hex>``.
    - ``log_dir``: ``$GENO_LEWM_LOG_DIR`` or ``~/.geno-lewm/logs``.
    - ``level``: ``$GENO_LEWM_LOG_LEVEL`` (default ``info``).
    - ``pretty``: TTY-detected, overridable by ``$GENO_LEWM_LOG_FORMAT``.
    """
    rid = run_id or os.environ.get("GENO_LEWM_RUN_ID") or _new_run_id()
    ldir = _resolve_log_dir(log_dir)
    sink = _open_sink(rid, ldir)
    lvl: Severity = level if level is not None else _env_level()
    pp = pretty if pretty is not None else _env_pretty()
    project = _resolve_wandb_project(None)
    if project is not None:
        _ensure_wandb_sink(run_id=rid, project=project)

    key = (component, rid, str(ldir.resolve()))
    with _LOGGERS_LOCK:
        existing = _LOGGERS.get(key)
        if existing is not None:
            return existing
        logger = GenoLeWMLogger(
            component=component,
            run_id=rid,
            log_dir=ldir,
            sink=sink,
            level=lvl,
            pretty=pp,
        )
        _LOGGERS[key] = logger
        return logger

logged_run

logged_run(component: str = 'runtime', *, run_id: str | None = None, log_dir: str | PathLike[str] | None = None, start_event: str | None = None, end_event: str | None = None, start_data: Mapping[str, Any] | None = None) -> Iterator[GenoLeWMLogger]

Open a sink for the run; flush on exit; never swallow exceptions.

The wrapper guarantees that any records emitted up to a crash are flushed to disk (INV-OBS-6: "a crash before logger init still produces a sanitized minimal record"). Optional start_event / end_event book-end the run. If the block raises and the exception is a geno_lewm.errors.GenoLeWMError, an error record is emitted before the exception propagates.

Source code in geno_lewm/observability.py
@contextlib.contextmanager
def logged_run(
    component: str = "runtime",
    *,
    run_id: str | None = None,
    log_dir: str | os.PathLike[str] | None = None,
    start_event: str | None = None,
    end_event: str | None = None,
    start_data: Mapping[str, Any] | None = None,
) -> Iterator[GenoLeWMLogger]:
    """Open a sink for the run; flush on exit; never swallow exceptions.

    The wrapper guarantees that any records emitted up to a crash are
    flushed to disk (INV-OBS-6: "a crash before logger init still
    produces a sanitized minimal record"). Optional ``start_event`` /
    ``end_event`` book-end the run. If the block raises and the
    exception is a ``geno_lewm.errors.GenoLeWMError``, an ``error``
    record is emitted before the exception propagates.
    """
    logger = get_logger(component, run_id=run_id, log_dir=log_dir)
    if start_event:
        logger.info(start_event, **(dict(start_data) if start_data else {}))
    try:
        yield logger
    except BaseException as exc:
        from geno_lewm.errors import GenoLeWMError  # local import to avoid cycle

        if isinstance(exc, GenoLeWMError):
            logger.error(
                "error",
                error_code=exc.code,
                message=exc.message,
                details=exc.details,
                remediation=exc.remediation,
            )
        # Always flush the sink before the exception unwinds the stack.
        with contextlib.suppress(Exception):
            logger._sink.flush()
        raise
    else:
        if end_event:
            logger.info(end_event)

shutdown_run

shutdown_run(run_id: str, log_dir: str | PathLike[str] | None = None) -> None

Flush and close the sink for run_id.

Primarily used in tests; production callers can leave sinks open for the process lifetime.

Source code in geno_lewm/observability.py
def shutdown_run(run_id: str, log_dir: str | os.PathLike[str] | None = None) -> None:
    """Flush and close the sink for ``run_id``.

    Primarily used in tests; production callers can leave sinks open
    for the process lifetime.
    """
    ldir = _resolve_log_dir(log_dir)
    _close_sink(run_id, ldir)
    _close_wandb_sink(run_id)
    # Also drop any cached loggers bound to this run.
    with _LOGGERS_LOCK:
        for k in [k for k in _LOGGERS if k[1] == run_id]:
            _LOGGERS.pop(k, None)