Source code for geoprior.utils.shapes

# License: Apache-2.0
# Copyright (c) 2026-present
# Author: LKouadio <etanoyau@gmail.com>
r"""Shape utility helpers for arrays and tensors."""

from __future__ import annotations

from collections.abc import Callable, Sequence
from typing import Any

import numpy as np

__all__ = [
    "canonicalize_BHQO",
    "canonicalize_BHQO_quantiles_np",
]


[docs] def canonicalize_BHQO( y_pred: Any, *, y_true: Any | None = None, q_values: Sequence[float] = (0.1, 0.5, 0.9), n_q: int | None = None, layout: str | None = None, enforce_monotone: bool = True, return_layout: bool = False, verbose: int = 0, log_fn: Callable[[str], None] = print, ) -> Any: """ Canonicalize quantile outputs to (B, H, Q, O). Supported layouts (rank-4): - BHQO: (B, H, Q, O) -> unchanged - BQHO: (B, Q, H, O) -> transpose(0, 2, 1, 3) - BHOQ: (B, H, O, Q) -> transpose(0, 1, 3, 2) If ambiguous (e.g., H == Q), and y_true is given, pick the transform with smallest MAE for q50. If y_true is not given, fallback is: 1) use `layout` if provided 2) else prefer BHQO if plausible 3) else pick by min crossing score Parameters ---------- y_pred: Quantile tensor, NumPy array or TF tensor. y_true: Target tensor (B, H, O) or (B, H, 1). Used only to resolve ambiguity robustly. q_values: Quantiles in order, e.g. (0.1, 0.5, 0.9). n_q: Number of quantiles. Defaults to len(q_values). layout: Force interpretation: "BHQO", "BQHO", "BHOQ". Use "auto" (or None) to infer. enforce_monotone: Sort along Q axis after canonicalization. return_layout: If True, return (arr, chosen_layout). verbose, log_fn: Logging controls. Returns ------- arr or (arr, layout) Canonical (B, H, Q, O) and optionally the layout. """ # Accept explicit "auto" to mean "infer". if layout is not None: lay = str(layout).strip().lower() if lay in {"auto", "infer"}: layout = None tf = _maybe_tf() if tf is not None and tf.is_tensor(y_pred): out = _canonicalize_tf( y_pred, y_true=y_true, q_values=q_values, n_q=n_q, layout=layout, enforce_monotone=enforce_monotone, verbose=verbose, log_fn=log_fn, ) if return_layout: return out return out[0] out = _canonicalize_np( y_pred, y_true=y_true, q_values=q_values, n_q=n_q, layout=layout, enforce_monotone=enforce_monotone, verbose=verbose, log_fn=log_fn, ) if return_layout: return out return out[0]
# --------------------------------------------------------------------- # NumPy backend # --------------------------------------------------------------------- def _canonicalize_np( y_pred: Any, *, y_true: Any | None, q_values: Sequence[float], n_q: int | None, layout: str | None, enforce_monotone: bool, verbose: int, log_fn: Callable[[str], None], ) -> tuple[np.ndarray, str]: y = np.asarray(y_pred) y = _ensure_rank4_np( y, verbose=verbose, log_fn=log_fn, ) if y.ndim != 4: return y, "UNCHANGED" n_q = int(n_q or len(q_values)) if n_q <= 0: raise ValueError("n_q must be > 0.") if layout is not None: arr = _apply_layout_np(y, layout) arr = _post_np(arr, enforce_monotone) return arr, layout opts = _build_options_np(y, n_q) if not opts: if verbose: log_fn( "canonicalize_BHQO: no options; " "return unchanged." ) return y, "UNCHANGED" if len(opts) == 1: name, arr = opts[0] arr = _post_np(arr, enforce_monotone) return arr, name yt = None if y_true is not None: yt = np.asarray(y_true) yt = _ensure_ytrue_np(yt) med = _median_q_index(q_values) best = _pick_best_np( opts, y_true=yt, med=med, verbose=verbose, log_fn=log_fn, ) name, arr = best arr = _post_np(arr, enforce_monotone) return arr, name def _build_options_np( y: np.ndarray, n_q: int, ) -> list[tuple[str, np.ndarray]]: opts: list[tuple[str, np.ndarray]] = [] if y.shape[2] == n_q: opts.append(("BHQO", y)) if y.shape[1] == n_q: opts.append( ( "BQHO", np.transpose(y, (0, 2, 1, 3)), ) ) if y.shape[3] == n_q: opts.append( ( "BHOQ", np.transpose(y, (0, 1, 3, 2)), ) ) return opts def _pick_best_np( opts: list[tuple[str, np.ndarray]], *, y_true: np.ndarray | None, med: int, verbose: int, log_fn: Callable[[str], None], ) -> tuple[str, np.ndarray]: best_name = opts[0][0] best_arr = opts[0][1] best_score = float("inf") for name, arr in opts: mae = None if y_true is not None: mae = _mae_q50_np(arr, y_true, med) cross = _cross_score_np(arr) if y_true is not None: score = float(mae) else: score = float(cross) if verbose >= 2: log_fn( "canonicalize_BHQO: " f"cand={name} " f"mae={_fmt(mae)} " f"cross={cross:.6f}" ) if score < best_score: best_score = score best_name = name best_arr = arr if verbose: log_fn( "canonicalize_BHQO: " f"chose={best_name} " f"score={best_score:.6f}" ) return best_name, best_arr def _cross_score_np(arr: np.ndarray) -> float: q10 = arr[:, :, 0, :] q50 = arr[:, :, 1, :] q90 = arr[:, :, 2, :] if q10.shape[-1] == 1: q10 = q10[..., 0] q50 = q50[..., 0] q90 = q90[..., 0] c1 = float(np.mean(q10 > q50)) c2 = float(np.mean(q50 > q90)) c3 = float(np.mean(q10 > q90)) return c1 + c2 + c3 def _mae_q50_np( arr: np.ndarray, y_true: np.ndarray, med: int, ) -> float: q50 = arr[:, :, med, :] yt = y_true if q50.shape != yt.shape: return float("inf") return float(np.mean(np.abs(q50 - yt))) def _ensure_rank4_np( y: np.ndarray, *, verbose: int, log_fn: Callable[[str], None], ) -> np.ndarray: if y.ndim == 3: return y[..., None] if y.ndim != 4: if verbose: log_fn( f"canonicalize_BHQO: rank={y.ndim}, skipping." ) return y def _ensure_ytrue_np(y: np.ndarray) -> np.ndarray: if y.ndim == 2: return y[..., None] return y def _apply_layout_np( y: np.ndarray, layout: str, ) -> np.ndarray: lay = layout.upper().strip() if lay == "BHQO": return y if lay == "BQHO": return np.transpose(y, (0, 2, 1, 3)) if lay == "BHOQ": return np.transpose(y, (0, 1, 3, 2)) raise ValueError( "layout must be one of: BHQO, BQHO, BHOQ." ) def _post_np( arr: np.ndarray, enforce_monotone: bool, ) -> np.ndarray: if enforce_monotone: return np.sort(arr, axis=2) return arr # --------------------------------------------------------------------- # TensorFlow backend (lazy import; eager-first) # --------------------------------------------------------------------- def _canonicalize_tf( y_pred: Any, *, y_true: Any | None, q_values: Sequence[float], n_q: int | None, layout: str | None, enforce_monotone: bool, verbose: int, log_fn: Callable[[str], None], ) -> tuple[Any, str]: tf = _maybe_tf() if tf is None: arr = np.asarray(y_pred) return arr, "UNCHANGED" y = y_pred if y.shape.rank == 3: y = tf.expand_dims(y, axis=-1) if y.shape.rank != 4: if verbose: log_fn( "canonicalize_BHQO: " f"rank={y.shape.rank}, skipping." ) return y, "UNCHANGED" n_q = int(n_q or len(q_values)) if n_q <= 0: raise ValueError("n_q must be > 0.") if layout is not None: arr = _apply_layout_tf(y, layout) arr = _post_tf(arr, enforce_monotone) return arr, layout opts = _build_options_tf(y, n_q) if not opts: if verbose: log_fn( "canonicalize_BHQO: no options; unchanged." ) return y, "UNCHANGED" if len(opts) == 1: name, arr = opts[0] arr = _post_tf(arr, enforce_monotone) return arr, name if not tf.executing_eagerly(): name, arr = _prefer_bhqo(opts) arr = _post_tf(arr, enforce_monotone) return arr, name yt = None if y_true is not None: yt = y_true if not tf.is_tensor(yt): yt = tf.convert_to_tensor(yt) if yt.shape.rank == 2: yt = tf.expand_dims(yt, axis=-1) med = _median_q_index(q_values) best_name, best_arr = _pick_best_tf( opts, y_true=yt, med=med, verbose=verbose, log_fn=log_fn, ) best_arr = _post_tf(best_arr, enforce_monotone) return best_arr, best_name def _build_options_tf( y: Any, n_q: int, ) -> list[tuple[str, Any]]: opts: list[tuple[str, Any]] = [] shp = y.shape if shp[2] == n_q: opts.append(("BHQO", y)) if shp[1] == n_q: opts.append( ( "BQHO", _maybe_tf().transpose(y, [0, 2, 1, 3]), ) ) if shp[3] == n_q: opts.append( ( "BHOQ", _maybe_tf().transpose(y, [0, 1, 3, 2]), ) ) return opts def _pick_best_tf( opts: list[tuple[str, Any]], *, y_true: Any | None, med: int, verbose: int, log_fn: Callable[[str], None], ) -> tuple[str, Any]: # tf = _maybe_tf() best_name = opts[0][0] best_arr = opts[0][1] best_score = float("inf") for name, arr in opts: mae = None if y_true is not None: mae = _mae_q50_tf(arr, y_true, med) mae = float(mae.numpy()) cross = float(_cross_score_tf(arr).numpy()) score = float(mae) if y_true is not None else cross if verbose >= 2: log_fn( "canonicalize_BHQO: " f"cand={name} " f"mae={_fmt(mae)} " f"cross={cross:.6f}" ) if score < best_score: best_score = score best_name = name best_arr = arr if verbose: log_fn( "canonicalize_BHQO: " f"chose={best_name} " f"score={best_score:.6f}" ) return best_name, best_arr def _cross_score_tf(arr: Any) -> Any: tf = _maybe_tf() q10 = arr[:, :, 0, :] q50 = arr[:, :, 1, :] q90 = arr[:, :, 2, :] if q10.shape.rank is not None and q10.shape[-1] == 1: q10 = q10[..., 0] q50 = q50[..., 0] q90 = q90[..., 0] c1 = tf.reduce_mean(tf.cast(q10 > q50, tf.float32)) c2 = tf.reduce_mean(tf.cast(q50 > q90, tf.float32)) c3 = tf.reduce_mean(tf.cast(q10 > q90, tf.float32)) return c1 + c2 + c3 def _mae_q50_tf( arr: Any, y_true: Any, med: int, ) -> Any: tf = _maybe_tf() q50 = arr[:, :, med, :] if q50.shape.rank != y_true.shape.rank: return tf.constant(np.inf, dtype=tf.float32) if ( q50.shape.rank is not None and y_true.shape.rank is not None and q50.shape.rank == 3 ): if q50.shape[1] != y_true.shape[1]: return tf.constant(np.inf, dtype=tf.float32) return tf.reduce_mean(tf.abs(q50 - y_true)) def _apply_layout_tf( y: Any, layout: str, ) -> Any: tf = _maybe_tf() lay = layout.upper().strip() if lay == "BHQO": return y if lay == "BQHO": return tf.transpose(y, [0, 2, 1, 3]) if lay == "BHOQ": return tf.transpose(y, [0, 1, 3, 2]) raise ValueError( "layout must be one of: BHQO, BQHO, BHOQ." ) def _post_tf( arr: Any, enforce_monotone: bool, ) -> Any: tf = _maybe_tf() if enforce_monotone: return tf.sort(arr, axis=2) return arr def _prefer_bhqo( opts: list[tuple[str, Any]], ) -> tuple[str, Any]: for name, arr in opts: if name == "BHQO": return name, arr return opts[0] def canonicalize_to_BHQO_using_contract( s_pred, *, q_values=(0.1, 0.5, 0.9), enforce_monotone=True, verbose=0, log_fn=print, ): """ Canonicalize quantile tensor to (B,H,Q,O). Accepts common layouts: - (B,H,Q,O) : unchanged - (B,Q,H,O) : transpose -> (B,H,Q,O) - (B,H,O,Q) : transpose -> (B,H,Q,O) - rank-3 (B,H,Q) / (B,Q,H) / (B,H,O): expanded with O=1, then canonicalized If enforce_monotone=True, quantiles are sorted along axis=2 (Q axis). Notes ----- - TF backend returns a tf.Tensor. - NumPy backend returns a np.ndarray. """ def _log(level: int, msg: str) -> None: if verbose >= level: log_fn(msg) tf = _maybe_tf() n_q = len(q_values) if q_values else 0 # ================================================== # NumPy backend # ================================================== if tf is None: s = np.asarray(s_pred) _log(1, f"canon_BHQO[np]: in={s.shape}") if n_q <= 0: _log(1, "canon_BHQO[np]: n_q<=0, keep.") return s # Rank-3 -> add O axis (O=1). if s.ndim == 3: s = np.expand_dims(s, axis=-1) _log( 2, "canon_BHQO[np]: " f"expanded rank-3 -> {s.shape}", ) if s.ndim != 4: _log( 1, f"canon_BHQO[np]: rank={s.ndim}, unchanged.", ) return s b, d1, d2, d3 = s.shape _log( 2, "canon_BHQO[np]: " f"d1={d1} d2={d2} d3={d3} n_q={n_q}", ) # Accept BHQO if d2 == n_q: out = s _log(1, "canon_BHQO[np]: BHQO keep.") # Accept BQHO -> transpose to BHQO elif d1 == n_q: out = np.transpose(s, (0, 2, 1, 3)) _log(1, "canon_BHQO[np]: BQHO swap.") # Accept BHOQ -> transpose to BHQO elif d3 == n_q: out = np.transpose(s, (0, 1, 3, 2)) _log(1, "canon_BHQO[np]: BHOQ move.") else: raise ValueError( "Cannot locate quantile axis. " f"shape={s.shape}, n_q={n_q}" ) if enforce_monotone: out = np.sort(out, axis=2) _log(2, "canon_BHQO[np]: sorted axis=2.") _log(1, f"canon_BHQO[np]: out={out.shape}") return out # TensorFlow backend (safe conversion + verbose) if n_q <= 0: _log(1, "canon_BHQO[tf]: n_q<=0, keep.") return s_pred if not tf.is_tensor(s_pred): s_pred = tf.convert_to_tensor(s_pred) rank = int(s_pred.shape.rank or 0) _log( 1, "canon_BHQO[tf]: " f"in rank={rank} " f"shape={tuple(s_pred.shape.as_list())}", ) # Rank-3 -> add O axis (O=1). if s_pred.shape.rank == 3: s_pred = tf.expand_dims(s_pred, axis=-1) _log( 2, "canon_BHQO[tf]: " f"expanded rank-3 -> " f"{tuple(s_pred.shape.as_list())}", ) if s_pred.shape.rank != 4: return s_pred # Accept BHQO if int(s_pred.shape[2]) == n_q: out = s_pred _log(1, "canon_BHQO[tf]: BHQO keep.") # Accept BQHO -> transpose to BHQO elif int(s_pred.shape[1]) == n_q: out = tf.transpose(s_pred, [0, 2, 1, 3]) _log(1, "canon_BHQO[tf]: BQHO swap.") # Accept BHOQ -> transpose to BHQO elif int(s_pred.shape[3]) == n_q: out = tf.transpose(s_pred, [0, 1, 3, 2]) _log(1, "canon_BHQO[tf]: BHOQ move.") else: raise ValueError( "Cannot locate quantile axis. " f"shape={s_pred.shape}, n_q={n_q}" ) if enforce_monotone: out = tf.sort(out, axis=2) _log(2, "canon_BHQO[tf]: sorted axis=2.") return out def ensure_subs_bhq( s_pred_b, *, y_true_b, q_values, enforce_monotone=True, verbose=0, log_fn=print, ): """ Ensure subsidence quantile predictions are in (B,H,Q,O). Expected quantile mode shape is rank-4, typically one of: - (B,H,Q,O) : canonical - (B,Q,H,O) : swap axes 1<->2 Ambiguous case (often H == Q == n_q): - choose orientation by q50 MAE vs y_true_b - if MAE cannot be computed (shape mismatch), fallback to a quantile-crossing score (smaller is better) Notes ----- - If enforce_monotone=True, we sort along Q axis (axis=2). - NumPy backend returns np.ndarray. """ def _log(level: int, msg: str) -> None: if verbose >= level: log_fn(msg) def _mae_safe_np(a: np.ndarray, b: np.ndarray) -> float: if a.shape != b.shape: return float("inf") mask = np.isfinite(a) & np.isfinite(b) if int(mask.sum()) == 0: return float("inf") return float(np.mean(np.abs(a[mask] - b[mask]))) def _cross_score_np(arr: np.ndarray) -> float: # arr is expected (B,H,Q,O) if arr.ndim != 4 or arr.shape[2] < 2: return float("inf") # Adjacent crossings + end-to-end crossing. a = arr[:, :, :-1, :] b = arr[:, :, 1:, :] c_adj = float(np.mean(a > b)) c_end = float( np.mean(arr[:, :, 0, :] > arr[:, :, -1, :]) ) return c_adj + c_end def _ensure_ytrue_bho_np( y_true, *, n_q: int, med: int, ) -> np.ndarray | None: if y_true is None: return None try: yt = np.asarray(y_true) except Exception: return None # (B,H) -> (B,H,1) if yt.ndim == 2: return yt[..., None] # (B,H,O) OK if yt.ndim == 3: return yt # If y_true was tiled to quantile shape, drop to med. if yt.ndim == 4: if yt.shape[2] == n_q: return yt[:, :, med, :] if yt.shape[1] == n_q: yt2 = np.transpose(yt, (0, 2, 1, 3)) return yt2[:, :, med, :] if yt.shape[3] == n_q: yt2 = np.transpose(yt, (0, 1, 3, 2)) return yt2[:, :, med, :] # Last-resort squeeze trailing singleton. if yt.shape[-1] == 1: return yt[..., 0] return None tf = _maybe_tf() # NumPy backend if tf is None: s = np.asarray(s_pred_b) _log(1, f"ensure_subs_bhq[np]: in={s.shape}") # Expect rank-4 in quantile mode. if s.ndim != 4: _log( 1, "ensure_subs_bhq[np]: " f"rank={s.ndim}, unchanged.", ) return s n_q = len(q_values) if q_values else 0 if n_q <= 0: _log( 1, "ensure_subs_bhq[np]: n_q<=0, unchanged.", ) return s d1 = int(s.shape[1]) d2 = int(s.shape[2]) _log( 2, f"ensure_subs_bhq[np]: d1={d1} d2={d2} n_q={n_q}", ) if (d2 == n_q) and (d1 != n_q): out = s _log(1, "ensure_subs_bhq[np]: chose keep.") elif (d1 == n_q) and (d2 != n_q): out = np.transpose(s, (0, 2, 1, 3)) _log(1, "ensure_subs_bhq[np]: chose swap.") else: # Ambiguous: decide by q50 MAE vs y_true. q = np.asarray(q_values, dtype=float) med = int(np.argmin(np.abs(q - 0.5))) _log( 2, "ensure_subs_bhq[np]: " f"ambiguous, med={med} q={_fmt(q[med])}", ) keep = s swap = np.transpose(s, (0, 2, 1, 3)) yt = _ensure_ytrue_bho_np( y_true_b, n_q=n_q, med=med, ) if yt is None: mae_keep = float("inf") mae_swap = float("inf") else: mae_keep = _mae_safe_np( keep[:, :, med, :], yt, ) mae_swap = _mae_safe_np( swap[:, :, med, :], yt, ) _log( 2, "ensure_subs_bhq[np]: " f"mae_keep={_fmt(mae_keep)} " f"mae_swap={_fmt(mae_swap)}", ) if np.isfinite(mae_keep) or np.isfinite(mae_swap): if mae_swap < mae_keep: out = swap _log( 1, "ensure_subs_bhq[np]: swap (mae)." ) else: out = keep _log( 1, "ensure_subs_bhq[np]: keep (mae)." ) else: # Fallback: use quantile crossing score. cs_keep = _cross_score_np(keep) cs_swap = _cross_score_np(swap) _log( 2, "ensure_subs_bhq[np]: " f"cross_keep={cs_keep:.6f} " f"cross_swap={cs_swap:.6f}", ) if cs_swap < cs_keep: out = swap _log( 1, "ensure_subs_bhq[np]: swap (cross).", ) else: out = keep _log( 1, "ensure_subs_bhq[np]: keep (cross).", ) if enforce_monotone: out = np.sort(out, axis=2) _log( 2, "ensure_subs_bhq[np]: sorted along axis=2.", ) _log(1, f"ensure_subs_bhq[np]: out={out.shape}") return out # TensorFlow backend (original logic + safe conversion) if not tf.is_tensor(s_pred_b): s_pred_b = tf.convert_to_tensor(s_pred_b) _log( 1, "ensure_subs_bhq[tf]: " f"in rank={int(s_pred_b.shape.rank or 0)} " f"shape={tuple(s_pred_b.shape.as_list())}", ) # Expect rank-4 in quantile mode. if s_pred_b.shape.rank != 4: return s_pred_b n_q = len(q_values) if q_values else 0 if n_q <= 0: return s_pred_b # Easy cases when dims differ. d1 = int(s_pred_b.shape[1]) d2 = int(s_pred_b.shape[2]) if (d2 == n_q) and (d1 != n_q): out = s_pred_b _log(1, "ensure_subs_bhq[tf]: chose keep.") elif (d1 == n_q) and (d2 != n_q): out = tf.transpose(s_pred_b, [0, 2, 1, 3]) _log(1, "ensure_subs_bhq[tf]: chose swap.") else: # Ambiguous (often H==Q): choose by median MAE vs y_true. q = np.asarray(q_values, dtype=float) med = int(np.argmin(np.abs(q - 0.5))) keep = s_pred_b swap = tf.transpose(s_pred_b, [0, 2, 1, 3]) if not tf.is_tensor(y_true_b): y_true_b = tf.convert_to_tensor(y_true_b) mae_keep = tf.reduce_mean( tf.abs(keep[:, :, med, :] - y_true_b) ) mae_swap = tf.reduce_mean( tf.abs(swap[:, :, med, :] - y_true_b) ) if verbose >= 2: _log( 2, "ensure_subs_bhq[tf]: " f"mae_keep={_fmt(float(mae_keep.numpy()))} " f"mae_swap={_fmt(float(mae_swap.numpy()))}", ) out = tf.cond( mae_swap < mae_keep, lambda: swap, lambda: keep, ) if enforce_monotone: out = tf.sort(out, axis=2) return out # --------------------------------------------------------------------- # Small helpers # --------------------------------------------------------------------- def _maybe_tf() -> Any | None: try: import tensorflow as tf # type: ignore except Exception: return None return tf def _median_q_index( q_values: Sequence[float], ) -> int: q = np.asarray(q_values, dtype=float) return int(np.argmin(np.abs(q - 0.5))) def _fmt(v: Any) -> str: if v is None: return "None" try: return f"{float(v):.6f}" except Exception: return str(v)
[docs] def canonicalize_BHQO_quantiles_np( y: Any, n_q: int = 3, *, verbose: int = 0, log_fn: Callable[[str], None] = print, ) -> Any: """ Return y in canonical (B,H,Q,O). Accepts common layouts: - (B,H,Q,O) -> unchanged - (B,Q,H,O) -> transpose(0,2,1,3) - (B,H,O,Q) -> transpose(0,1,3,2) If ambiguous (multiple axes match n_q), choose the transform with minimal quantile crossing score. """ y_np = np.asarray(y) # Not quantile tensor (we only canonicalize rank-4). if y_np.ndim != 4: if verbose: log_fn( "canonicalize_BHQO_quantiles_np: " f"rank={y_np.ndim}, skipping." ) return y n_q = int(n_q) if n_q <= 0: raise ValueError("n_q must be a positive integer.") # candidates: interpret which axis is Q among {1,2,3} cand = [ax for ax in (1, 2, 3) if y_np.shape[ax] == n_q] # No axis matches n_q => not quantile mode, return unchanged. if not cand: if verbose: log_fn( "canonicalize_BHQO_quantiles_np: " f"no axis matches n_q={n_q}, " "return unchanged." ) return y_np options: list[tuple[str, np.ndarray]] = [] # already (B,H,Q,O): q_axis=2 and O axis=3 if y_np.shape[2] == n_q: # Keep as-is. This is the canonical layout. options.append(("BHQO", y_np)) # (B,Q,H,O) -> (B,H,Q,O) if y_np.shape[1] == n_q: # Swap axes 1 and 2. options.append( ( "BQHO->BHQO", _safe_transpose(y_np, (0, 2, 1, 3)), ) ) # (B,H,O,Q) -> (B,H,Q,O) if y_np.shape[3] == n_q: # Swap axes 2 and 3. options.append( ( "BHOQ->BHQO", _safe_transpose(y_np, (0, 1, 3, 2)), ) ) # If nothing matched the supported transforms, keep unchanged. if not options: if verbose: log_fn( "canonicalize_BHQO_quantiles_np: " "no supported transform matched; " "return unchanged." ) return y_np # If only one option, pick it directly. if len(options) == 1: name, arr = options[0] if verbose: log_fn( "canonicalize_BHQO_quantiles_np: " f"chose={name} (only option)." ) return arr # Multiple candidates (e.g., H==Q==3): # pick best by minimal quantile crossing score. best_name: str | None = None best_arr: np.ndarray | None = None best_score = float("inf") for name, arr in options: # score in canonical BHQO along axis=2 sc = _mean_crossing_score(arr, q_axis=2) if verbose >= 2: log_fn( "canonicalize_BHQO_quantiles_np: " f"candidate={name} score={sc:.6f}" ) if sc < best_score: best_score = sc best_name = name best_arr = arr if best_arr is None: # Defensive fallback; should not happen. if verbose: log_fn( "canonicalize_BHQO_quantiles_np: " "no best candidate found; " "return unchanged." ) return y_np if verbose: log_fn( "canonicalize_BHQO_quantiles_np: " f"chose={best_name} score={best_score:.6f}" ) return best_arr
def _mean_crossing_score( arr: np.ndarray, q_axis: int, q_idx: Sequence[int] = (0, 1, 2), ) -> float: # Compute quantile crossing score: # mean(q10>q50) + mean(q50>q90) + mean(q10>q90) q10 = np.take(arr, int(q_idx[0]), axis=q_axis) q50 = np.take(arr, int(q_idx[1]), axis=q_axis) q90 = np.take(arr, int(q_idx[2]), axis=q_axis) # squeeze last dim if O=1 if q10.ndim >= 1 and q10.shape[-1] == 1: q10 = q10[..., 0] q50 = q50[..., 0] q90 = q90[..., 0] c1 = float(np.mean(q10 > q50)) c2 = float(np.mean(q50 > q90)) c3 = float(np.mean(q10 > q90)) return c1 + c2 + c3 def _safe_transpose( y: np.ndarray, axes: tuple[int, int, int, int], ) -> np.ndarray: return np.transpose(y, axes)