Skip to content

Core

The mathematical kernel of Versor.

Algebra

CliffordAlgebra

Bases: Module

Differentiable Clifford algebra kernel with memory-optimized blocked accumulation.

Extends nn.Module so that all Cayley tables are registered as non-persistent buffers. This means model.to(device) automatically moves tables

Supports degenerate (null) dimensions via the r parameter: Cl(p, q, r) has p positive, q negative, and r null basis vectors (e_i^2 = 0).

Attributes:

Name Type Description
p int

Positive signature dimensions.

q int

Negative signature dimensions.

r int

Degenerate (null) dimensions.

n int

Total dimensions (p + q + r).

dim int

Total basis elements (2^n).

Source code in core/algebra.py
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
class CliffordAlgebra(nn.Module):
    """Differentiable Clifford algebra kernel with memory-optimized blocked accumulation.

    Extends ``nn.Module`` so that all Cayley tables are registered as
    non-persistent buffers. This means ``model.to(device)`` automatically
    moves tables

    Supports degenerate (null) dimensions via the ``r`` parameter:
    ``Cl(p, q, r)`` has ``p`` positive, ``q`` negative, and ``r`` null
    basis vectors (``e_i^2 = 0``).

    Attributes:
        p (int): Positive signature dimensions.
        q (int): Negative signature dimensions.
        r (int): Degenerate (null) dimensions.
        n (int): Total dimensions (p + q + r).
        dim (int): Total basis elements (2^n).
    """
    _CACHED_TABLES = {}

    def __init__(self, p: int, q: int = 0, r: int = 0, device='cuda',
                 dtype: torch.dtype = torch.float32,
                 exp_policy: str = 'auto'):
        """Initialize the algebra and cache the Cayley table.

        Args:
            p (int): Positive dimensions (+1).
            q (int, optional): Negative dimensions (-1). Defaults to 0.
            r (int, optional): Degenerate dimensions (0). Defaults to 0.
            device (str, optional): The device on which computations are performed. Defaults to 'cuda'.
            dtype (torch.dtype, optional): Floating-point dtype for algebra tables.
                Defaults to ``torch.float32``.  Pass ``torch.bfloat16`` or
                ``torch.float16`` when the model will be trained in a reduced
                precision (e.g. AMP on CUDA bfloat16 mode).
            exp_policy (str or ExpPolicy, optional): Bivector exp policy.
                ``'auto'`` (default), ``'fast'``, or ``'exact'``.
                See :class:`core.decomposition.ExpPolicy`.
        """
        super().__init__()

        assert p >= 0, f"p must be non-negative, got {p}"
        assert q >= 0, f"q must be non-negative, got {q}"
        assert r >= 0, f"r must be non-negative, got {r}"
        assert p + q + r <= 12, f"p + q + r must be <= 12, got {p + q + r}"

        self.p, self.q, self.r = p, q, r
        self.n = p + q + r
        self.dim = 2 ** self.n

        # Exp regime: dispatch at init
        if p == 0 or q == 0:
            self._exp_regime = 'elliptic'
        elif p == 1 and q == 1 and r == 0:
            self._exp_regime = 'hyperbolic'
        else:
            self._exp_regime = 'mixed'

        # Exp policy: controls decomposition strategy
        from core.decomposition import ExpPolicy
        if isinstance(exp_policy, str):
            self._exp_policy = ExpPolicy(exp_policy)
        else:
            self._exp_policy = exp_policy

        # Cache Cayley tables to avoid recomputation
        cache_key = (p, q, r, str(device), str(dtype))
        if cache_key not in CliffordAlgebra._CACHED_TABLES:
            CliffordAlgebra._CACHED_TABLES[cache_key] = self._generate_cayley_table(device, dtype)

        (
            cayley_indices, cayley_signs, gp_signs, grade_masks_list,
            rev_signs, bv_sq_scalar, wedge_gp_signs, inner_gp_signs,
            grade_index, rc_action, lc_gp_signs, conj_signs,
            comm_gp_signs, anti_comm_gp_signs,
        ) = CliffordAlgebra._CACHED_TABLES[cache_key]

        # Register all tables as non-persistent buffers so .to(device) moves them
        self.register_buffer('cayley_indices', cayley_indices, persistent=False)
        self.register_buffer('cayley_signs', cayley_signs, persistent=False)
        self.register_buffer('gp_signs', gp_signs, persistent=False)
        self.register_buffer('rev_signs', rev_signs, persistent=False)
        self.register_buffer('bv_sq_scalar', bv_sq_scalar, persistent=False)
        self.register_buffer('wedge_gp_signs', wedge_gp_signs, persistent=False)
        self.register_buffer('inner_gp_signs', inner_gp_signs, persistent=False)
        self.register_buffer('grade_index', grade_index, persistent=False)
        self.register_buffer('rc_action', rc_action, persistent=False)
        self.register_buffer('lc_gp_signs', lc_gp_signs, persistent=False)
        self.register_buffer('conj_signs', conj_signs, persistent=False)
        self.register_buffer('comm_gp_signs', comm_gp_signs, persistent=False)
        self.register_buffer('anti_comm_gp_signs', anti_comm_gp_signs, persistent=False)

        # Grade involution signs: (-1)^k per basis element
        inv_signs = ((-1.0) ** grade_index.float()).to(dtype=conj_signs.dtype)
        self.register_buffer('_involution_signs', inv_signs, persistent=False)

        # Stack grade masks: [n+1, dim] bool and float
        stacked = torch.stack(grade_masks_list)                 # [n+1, dim]
        self.register_buffer('_grade_masks', stacked, persistent=False)
        self.register_buffer('_grade_masks_float', stacked.to(dtype=cayley_signs.dtype), persistent=False)

        # Bivector indices
        if self.n >= 2:
            bv_idx = stacked[2].nonzero(as_tuple=False).squeeze(-1)
        else:
            bv_idx = torch.zeros(0, dtype=torch.long, device=device)
        self.register_buffer('_bv_indices', bv_idx, persistent=False)

        # Pre-initialize derived tables (sandwich_product / pseudoscalar_product)
        self._init_derived_tables()

        # Precomputed finfo-derived tolerances for dtype-aware numerical guards.
        # Plain floats for zero-overhead usage in clamp/where operations.
        _finfo = torch.finfo(self.cayley_signs.dtype)
        self.eps: float = float(_finfo.eps)
        self.eps_sq: float = float(_finfo.eps ** 2)

    @property
    def device(self):
        """Return the device of the algebra tables."""
        return self.cayley_indices.device

    @property
    def dtype(self) -> torch.dtype:
        """Return the floating-point dtype of the algebra tables.

        Reflects the current state — updated automatically when the algebra
        is moved via ``.to(dtype=...)``.
        """
        return self.cayley_signs.dtype

    def _apply(self, fn):
        """Propagate device/dtype moves and keep eps tolerances in sync."""
        result = super()._apply(fn)
        _finfo = torch.finfo(self.cayley_signs.dtype)
        self.eps = float(_finfo.eps)
        self.eps_sq = float(_finfo.eps ** 2)
        return result

    @property
    def grade_masks(self):
        """Grade masks indexed by grade: ``grade_masks[k]`` -> ``[dim]`` bool."""
        return self._grade_masks

    @property
    def grade_masks_float(self):
        """Float grade masks indexed by grade: ``grade_masks_float[k]`` -> ``[dim]`` float."""
        return self._grade_masks_float

    @property
    def exp_policy(self):
        """Active :class:`ExpPolicy` controlling ``exp()`` dispatch."""
        return self._exp_policy

    @exp_policy.setter
    def exp_policy(self, value):
        from core.decomposition import ExpPolicy
        if isinstance(value, str):
            value = ExpPolicy(value)
        self._exp_policy = value

    def _init_derived_tables(self):
        """Precompute derived tables for sandwich_product and pseudoscalar_product.

        Called once from ``__init__``. Tables move automatically via
        ``register_buffer`` when ``.to(device)`` is called.
        """
        D = self.dim
        k_range = torch.arange(D, device=self.cayley_indices.device)
        ci = self.cayley_indices

        # For sandwich_product: left-sign table
        ls = self.gp_signs[ci, k_range.unsqueeze(0).expand(D, D)]
        self.register_buffer('_left_sign_T', ls.T.contiguous(), persistent=False)

        # For pseudoscalar_product: permutation and signs
        self.register_buffer('_ps_source', k_range ^ (D - 1), persistent=False)
        self.register_buffer('_ps_signs', self.gp_signs[self._ps_source, k_range], persistent=False)

        # Diagonal of cayley_signs: sign of e_I^2 for each basis element
        cayley_diag = torch.diagonal(self.cayley_signs).clone()
        self.register_buffer('_cayley_diag', cayley_diag, persistent=False)

        # Pre-merged signs for norm_sq: rev_signs * cayley_diag
        self.register_buffer('_norm_sq_signs',
                             (self.rev_signs * cayley_diag).clone(), persistent=False)

        # Hermitian signs: conj_signs * cayley_diag
        # Equivalent to the full _hermitian_signs() computation but vectorized
        self.register_buffer('_hermitian_signs',
                             (self.conj_signs * cayley_diag).clone(), persistent=False)

    @property
    def num_grades(self) -> int:
        """Counts the number of grades (n + 1).

        Returns:
            int: Number of grades.
        """
        return self.n + 1

    def embed_vector(self, vectors: torch.Tensor) -> torch.Tensor:
        """Injects vectors into the Grade-1 subspace.

        Args:
            vectors (torch.Tensor): Raw vectors [..., n].

        Returns:
            torch.Tensor: Multivector coefficients [..., dim].
        """
        g1_idx = (1 << torch.arange(self.n, device=vectors.device)).long()
        mv = torch.zeros(*vectors.shape[:-1], self.dim,
                          device=vectors.device, dtype=vectors.dtype)
        mv.scatter_(-1, g1_idx.expand_as(vectors), vectors)
        return mv

    def get_grade_norms(self, mv: torch.Tensor) -> torch.Tensor:
        """Calculates norms per grade. Useful for invariant features.

        Vectorized via scatter_add.

        Args:
            mv (torch.Tensor): Input multivector [..., dim].

        Returns:
            torch.Tensor: Grade norms [..., num_grades].
        """
        gi = self.grade_index
        batch_shape = mv.shape[:-1]
        sq = mv.pow(2)
        flat = sq.reshape(-1, self.dim)
        idx = gi.unsqueeze(0).expand_as(flat)
        result = torch.zeros(flat.shape[0], self.num_grades, device=mv.device, dtype=mv.dtype)
        result.scatter_add_(1, idx, flat)
        return result.reshape(*batch_shape, self.num_grades).clamp(min=self.eps).sqrt()

    def _generate_cayley_table(self, device, dtype: torch.dtype = torch.float32):
        """Precompute the Cayley table, grade masks, and reversion signs.

        Args:
            device: The device to create tensors on.
            dtype: Floating-point dtype for sign tables.

        Returns:
            tuple: Cached tensors for algebra operations.
        """
        indices = torch.arange(self.dim, device=device)

        # Result index = A XOR B
        cayley_indices = indices.unsqueeze(0) ^ indices.unsqueeze(1)
        cayley_signs = self._compute_signs(indices, device, dtype)

        # Precompute signs for geometric_product accumulation
        gp_signs = torch.gather(cayley_signs, 1, cayley_indices)

        # Grade index: maps each basis element to its grade (popcount)
        # Compute once, derive grade_masks and rev_signs from it.
        grade_index = torch.tensor([bin(i).count('1') for i in range(self.dim)],
                                   dtype=torch.long, device=device)

        # Grade masks: one bool tensor per grade
        grade_masks = [grade_index == k for k in range(self.n + 1)]

        # Reverse signs: blade i gets sign (-1)^(k(k-1)/2) where k = grade(i)
        gk = grade_index
        rev_signs = ((-1.0) ** (gk * (gk - 1) // 2)).to(dtype=cayley_signs.dtype)

        # Bivector squared scalars: for each basis bivector e_ab,
        # (e_ab)^2 = -s_a * s_b where s_i = +1 if i < p, -1 if p <= i < p+q, 0 if i >= p+q.
        # Used by closed-form exp for arbitrary signature.
        if self.n >= 2:
            bv_mask = grade_masks[2]
            bv_indices = bv_mask.nonzero(as_tuple=False).squeeze(-1)
            bv_sq_scalar = torch.zeros(len(bv_indices), dtype=cayley_signs.dtype,
                                       device=device)
            for idx_pos, blade_idx in enumerate(bv_indices.tolist()):
                bits = []
                for bit in range(self.n):
                    if blade_idx & (1 << bit):
                        bits.append(bit)
                if len(bits) == 2:
                    a, b = bits
                    s_a = 1.0 if a < self.p else (-1.0 if a < self.p + self.q else 0.0)
                    s_b = 1.0 if b < self.p else (-1.0 if b < self.p + self.q else 0.0)
                    bv_sq_scalar[idx_pos] = -s_a * s_b
        else:
            bv_sq_scalar = torch.zeros(0, dtype=cayley_signs.dtype,
                                       device=device)

        # Precomputed signs for single-pass wedge and inner product
        # wedge(A,B) = (AB - BA)/2 uses antisymmetric part of signs
        # inner(A,B) = (AB + BA)/2 uses symmetric part of signs
        wedge_cayley_signs = (cayley_signs - cayley_signs.T) / 2.0
        inner_cayley_signs = (cayley_signs + cayley_signs.T) / 2.0
        wedge_gp_signs = torch.gather(wedge_cayley_signs, 1, cayley_indices)
        inner_gp_signs = torch.gather(inner_cayley_signs, 1, cayley_indices)

        # Precomputed signs for commutator [A,B] = AB - BA and
        # anti-commutator {A,B} = AB + BA (no 1/2 factor).
        comm_cayley_signs = cayley_signs - cayley_signs.T
        anti_comm_cayley_signs = cayley_signs + cayley_signs.T
        comm_gp_signs = torch.gather(comm_cayley_signs, 1, cayley_indices)
        anti_comm_gp_signs = torch.gather(anti_comm_cayley_signs, 1, cayley_indices)

        # Precomputed right-contraction action matrices for bivector-vector case
        # rc_action[b, i, j] encodes how basis bivector b acts on grade-1 vectors
        if self.n >= 2:
            bv_mask_idx = bv_indices.tolist()
            n = self.n
            rc_action = torch.zeros(len(bv_mask_idx), n, n,
                                    dtype=cayley_signs.dtype, device=device)
            for idx_pos, blade_idx in enumerate(bv_mask_idx):
                bits = []
                for bit in range(n):
                    if blade_idx & (1 << bit):
                        bits.append(bit)
                if len(bits) == 2:
                    a, b_bit = bits
                    s_a = 1.0 if a < self.p else (-1.0 if a < self.p + self.q else 0.0)
                    s_b = 1.0 if b_bit < self.p else (-1.0 if b_bit < self.p + self.q else 0.0)
                    # e_{ab} . e_b = s_b * e_a  (right contraction)
                    rc_action[idx_pos, a, b_bit] = s_b
                    # e_{ab} . e_a = -s_a * e_b
                    rc_action[idx_pos, b_bit, a] = -s_a
        else:
            rc_action = torch.zeros(0, self.n, self.n,
                                    dtype=cayley_signs.dtype, device=device)

        # Precomputed signs for left contraction: A _| B
        # In the [i, k] indexing (where j = i^k): valid when
        # grade(i) <= grade(j=i^k) and grade(k) = grade(j) - grade(i)
        gi = grade_index.unsqueeze(1)      # [D, 1] - grade of summation index i
        gj = grade_index[cayley_indices]   # [D, D] - grade of j = i^k
        gk = grade_index.unsqueeze(0)      # [1, D] - grade of result index k
        lc_valid = (gi <= gj) & (gk == gj - gi)
        lc_gp_signs = gp_signs * lc_valid.to(dtype=gp_signs.dtype)

        # Clifford conjugation signs: (-1)^k * (-1)^{k(k-1)/2}
        conj_signs = (((-1.0) ** grade_index) * rev_signs).to(
            dtype=cayley_signs.dtype)

        return (cayley_indices, cayley_signs, gp_signs, grade_masks,
                rev_signs, bv_sq_scalar, wedge_gp_signs, inner_gp_signs,
                grade_index, rc_action, lc_gp_signs, conj_signs,
                comm_gp_signs, anti_comm_gp_signs)

    def _compute_signs(self, indices: torch.Tensor, device,
                       dtype: torch.dtype = torch.float32) -> torch.Tensor:
        """Compute the sign matrix from commutation parity and metric signature.

        Handles three signature types:
        - Positive (i < p): e_i^2 = +1
        - Negative (p <= i < p+q): e_i^2 = -1
        - Null (i >= p+q): e_i^2 = 0

        Args:
            indices (torch.Tensor): Basis indices.
            device: The device to create tensors on.
            dtype: Floating-point dtype for the returned sign matrix.

        Returns:
            torch.Tensor: Sign matrix.
        """
        # 1. Commutation Sign: Count swaps needed to reorder basis vectors
        # A bit-wise comparison counts inversions
        A = indices.unsqueeze(1) # Row
        B = indices.unsqueeze(0) # Col

        swap_counts = torch.zeros((self.dim, self.dim), dtype=torch.long, device=device)
        for i in range(self.n):
            a_i = (A >> i) & 1
            # Count set bits in B strictly lower than bit i
            lower_mask = (1 << i) - 1
            b_lower = (B & lower_mask)

            # Count bits in b_lower
            b_lower_cnt = torch.zeros_like(B)
            temp_b = b_lower
            for _ in range(self.n):
                b_lower_cnt += (temp_b & 1)
                temp_b = temp_b >> 1

            swap_counts += a_i * b_lower_cnt

        commutator_sign = (-1) ** swap_counts

        # 2. Metric Sign: e_i^2 = -1 if p <= i < p+q, 0 if i >= p+q
        intersection = A & B

        # Mask for negative signature dimensions (p <= i < p+q)
        q_mask = 0
        for i in range(self.p, self.p + self.q):
            q_mask |= (1 << i)

        neg_intersection = intersection & q_mask

        # Count set bits in negative intersection
        neg_cnt = torch.zeros_like(neg_intersection)
        temp_neg = neg_intersection
        for _ in range(self.n):
            neg_cnt += (temp_neg & 1)
            temp_neg = temp_neg >> 1

        metric_sign = (-1) ** neg_cnt

        # 3. Null dimensions: if any null basis vector appears in the intersection
        # (i.e., e_i^2 = 0 for i >= p+q), the entire product is killed.
        if self.r > 0:
            r_mask = 0
            for i in range(self.p + self.q, self.n):
                r_mask |= (1 << i)
            null_intersection = intersection & r_mask
            # Any set bit in null_intersection means a null vector is squared -> 0
            has_null = (null_intersection != 0).to(metric_sign.dtype)
            metric_sign = metric_sign * (1 - has_null)

        return (commutator_sign * metric_sign).to(dtype=dtype)

    def geometric_product(self, A: torch.Tensor, B: torch.Tensor) -> torch.Tensor:
        """Computes the Geometric Product.

        Uses vectorized gather + broadcast multiply + sum.

        Args:
            A (torch.Tensor): Left operand [..., Dim].
            B (torch.Tensor): Right operand [..., Dim].

        Returns:
            torch.Tensor: The product AB [..., Dim].
        """
        check_multivector(A, self, "geometric_product(A)")
        check_multivector(B, self, "geometric_product(B)")

        # Gather B coefficients according to Cayley table: B_gathered[..., i, k] = B[..., cayley[i,k]]
        B_gathered = B[..., self.cayley_indices]  # [..., D, D]

        # result[..., k] = sum_i A[..., i] * B[..., cayley[i,k]] * signs[i,k]
        return torch.matmul(A.unsqueeze(-2), B_gathered * self.gp_signs).squeeze(-2)

    def grade_projection(self, mv: torch.Tensor, grade: int) -> torch.Tensor:
        """Isolates a specific grade.

        Uses multiplicative masking (mv * float_mask) instead of boolean
        indexing to avoid ``nonzero`` calls that break ``torch.compile``.

        Args:
            mv (torch.Tensor): Multivector [..., Dim].
            grade (int): Target grade.

        Returns:
            torch.Tensor: Projected multivector [..., Dim].
        """
        mask = self.grade_masks_float[grade]
        if mask.dtype != mv.dtype:
            mask = mask.to(dtype=mv.dtype)
        return mv * mask

    def reverse(self, mv: torch.Tensor) -> torch.Tensor:
        """Computes the reversion. The Clifford conjugate.

        Args:
            mv (torch.Tensor): Input multivector [..., Dim].

        Returns:
            torch.Tensor: Reversed multivector [..., Dim].
        """
        rev = self.rev_signs
        if rev.dtype != mv.dtype:
            rev = rev.to(dtype=mv.dtype)
        return mv * rev

    def wedge(self, A: torch.Tensor, B: torch.Tensor) -> torch.Tensor:
        """Computes the wedge (outer) product: A ^ B = (AB - BA)/2.

        Single-pass implementation using precomputed antisymmetric signs.

        Reference:
            Pence, T., Yamada, D., & Singh, V. (2025). "Composing Linear Layers
            from Irreducibles." arXiv:2507.11688v1 [cs.LG]

        Args:
            A (torch.Tensor): Left operand [..., dim].
            B (torch.Tensor): Right operand [..., dim].

        Returns:
            torch.Tensor: Wedge product A ^ B [..., dim].
        """
        B_gathered = B[..., self.cayley_indices]
        return torch.matmul(A.unsqueeze(-2), B_gathered * self.wedge_gp_signs).squeeze(-2)

    def right_contraction(self, A: torch.Tensor, B: torch.Tensor) -> torch.Tensor:
        """Computes the right contraction: A _| B.

        Fast path for bivector-vector case using precomputed skew-symmetric
        action matrices (avoids full geometric product + grade projection).

        Reference:
            Pence, T., Yamada, D., & Singh, V. (2025). "Composing Linear Layers
            from Irreducibles." arXiv:2507.11688v1 [cs.LG], Algorithm 2

        Args:
            A (torch.Tensor): Left operand (bivector) [..., dim].
            B (torch.Tensor): Right operand (vector) [..., dim].

        Returns:
            torch.Tensor: Right contraction A _| B [..., dim].
        """
        # Use gather instead of boolean indexing (compile-friendly)
        bv_idx_exp = self._bv_indices.expand(*A.shape[:-1], -1)
        bv_coeffs = torch.gather(A, -1, bv_idx_exp)          # [..., num_bv]

        # Grade-1 indices: powers of 2 for basis vectors
        g1_idx = torch.arange(self.n, device=A.device)
        g1_idx = (1 << g1_idx).long()                         # [n]
        g1_idx_exp = g1_idx.expand(*B.shape[:-1], -1)
        v_coeffs = torch.gather(B, -1, g1_idx_exp)            # [..., n]

        rc = self.rc_action
        if rc.dtype != A.dtype:
            rc = rc.to(dtype=A.dtype)

        # M[..., i, j] = sum_b bv_coeffs[..., b] * rc_action[b, i, j]
        M = torch.einsum('...b, bij -> ...ij', bv_coeffs, rc)  # [..., n, n]
        result_v = torch.matmul(M, v_coeffs.unsqueeze(-1)).squeeze(-1)  # [..., n]

        result = torch.zeros_like(A)
        result.scatter_(-1, g1_idx_exp, result_v)
        return result

    def inner_product(self, A: torch.Tensor, B: torch.Tensor) -> torch.Tensor:
        """Computes the inner product: A . B = (AB + BA)/2.

        Single-pass implementation using precomputed symmetric signs.

        Reference:
            Pence, T., Yamada, D., & Singh, V. (2025). "Composing Linear Layers
            from Irreducibles." arXiv:2507.11688v1 [cs.LG]

        Args:
            A (torch.Tensor): Left operand [..., dim].
            B (torch.Tensor): Right operand [..., dim].

        Returns:
            torch.Tensor: Inner product A . B [..., dim].
        """
        B_gathered = B[..., self.cayley_indices]
        return torch.matmul(A.unsqueeze(-2), B_gathered * self.inner_gp_signs).squeeze(-2)

    def commutator(self, A: torch.Tensor, B: torch.Tensor) -> torch.Tensor:
        """Computes the commutator (Lie bracket): [A, B] = AB - BA.

        Single-pass implementation using precomputed antisymmetric signs
        (same structure as :meth:`wedge` but without the 1/2 factor).

        Args:
            A (torch.Tensor): Left operand [..., dim].
            B (torch.Tensor): Right operand [..., dim].

        Returns:
            torch.Tensor: Commutator [A, B] [..., dim].
        """
        B_gathered = B[..., self.cayley_indices]
        return torch.matmul(A.unsqueeze(-2), B_gathered * self.comm_gp_signs).squeeze(-2)

    def anti_commutator(self, A: torch.Tensor, B: torch.Tensor) -> torch.Tensor:
        """Computes the anti-commutator: {A, B} = AB + BA.

        Single-pass implementation using precomputed symmetric signs
        (same structure as :meth:`inner_product` but without the 1/2 factor).

        Args:
            A (torch.Tensor): Left operand [..., dim].
            B (torch.Tensor): Right operand [..., dim].

        Returns:
            torch.Tensor: Anti-commutator {A, B} [..., dim].
        """
        B_gathered = B[..., self.cayley_indices]
        return torch.matmul(A.unsqueeze(-2), B_gathered * self.anti_comm_gp_signs).squeeze(-2)

    def blade_inverse(self, blade: torch.Tensor) -> torch.Tensor:
        """Compute the inverse of a blade: B^{-1} = B_rev / <B * B_rev>_0.

        Works for any simple blade (non-degenerate). For null blades the
        scalar denominator is clamped to avoid division by zero.

        Args:
            blade (torch.Tensor): Blade multivector [..., dim].

        Returns:
            torch.Tensor: Inverse blade [..., dim].
        """
        blade_rev = self.reverse(blade)
        blade_sq = self.geometric_product(blade, blade_rev)
        scalar = blade_sq[..., 0:1].clamp(min=self.eps_sq)
        return blade_rev / scalar

    def sandwich_product(self, R: torch.Tensor, x: torch.Tensor,
                         R_rev: torch.Tensor = None) -> torch.Tensor:
        """Optimized sandwich product R x R~ via action matrix.

        Builds a [N, D, D] sandwich action matrix from the rotor, then applies
        it to all C channels via a single batched matmul.  This is much faster
        than two separate ``geometric_product`` calls when x has extra channel
        dimensions that R does not.

        Memory: O(N*D*D) where N = batch (without channels).
        Compare to naive: O(N*C*D*D) - a factor of C improvement.

        Args:
            R: Rotors [N, D] (2-D, batch-flattened).
            x: Multivectors [N, C, D] (3-D, C channels per rotor).
            R_rev: Optional precomputed reverse of R [N, D].

        Returns:
            Sandwiched result [N, C, D].
        """
        if R_rev is None:
            R_rev = self.reverse(R)

        ci = self.cayley_indices  # [D, D], ci[i, j] = i ^ j

        # Left-multiplication matrix L_R:  L_R[n, k, j] = R[n, j^k] * gp_signs[j^k, k]
        R_gathered = R[:, ci]  # [N, D(j), D(k)]
        L_R = R_gathered.permute(0, 2, 1) * self._left_sign_T.unsqueeze(0)

        # Right-multiplication matrix R_{R~}:  R_Rr[n, k, i] = R~[n, i^k] * gp_signs[i, k]
        gp_T = self.gp_signs.T
        Rr_gathered = R_rev[:, ci]  # [N, D(i), D(k)]
        R_Rr = Rr_gathered.permute(0, 2, 1) * gp_T.unsqueeze(0)

        # Sandwich matrix:  M = R_Rr @ L_R   ->   (R x R~)[k] = sum_j M[k, j] * x[j]
        M = torch.bmm(R_Rr, L_R)  # [N, D, D]

        # Apply to all channels:  result[n, c, k] = sum_j M[n, k, j] * x[n, c, j]
        return torch.matmul(x, M.transpose(-2, -1))

    def per_channel_sandwich(self, R: torch.Tensor, x: torch.Tensor,
                              R_rev: torch.Tensor = None) -> torch.Tensor:
        """Sandwich product with per-channel rotors via action matrices.

        Each channel c has its own rotor R[c]. Builds a [C, D, D] action matrix
        (one per channel), then applies to all batch elements in one matmul.

        Memory: O(C*D*D + B*C*D) vs naive two-GP: O(2*B*C*D*D).

        Args:
            R: Per-channel rotors [C, D].
            x: Batched multivectors [B, C, D].
            R_rev: Optional precomputed reverse of R [C, D].

        Returns:
            Sandwiched result [B, C, D].
        """
        if R_rev is None:
            R_rev = self.reverse(R)

        ci = self.cayley_indices  # [D, D]

        # Build per-channel action matrices M[c, k, j]
        R_gathered = R[:, ci]                                    # [C, D, D]
        L_R = R_gathered.permute(0, 2, 1) * self._left_sign_T.unsqueeze(0)

        gp_T = self.gp_signs.T
        Rr_gathered = R_rev[:, ci]                               # [C, D, D]
        R_Rr = Rr_gathered.permute(0, 2, 1) * gp_T.unsqueeze(0)

        M = torch.bmm(R_Rr, L_R)  # [C, D, D]

        # Apply: result[b, c, k] = sum_j M[c, j, k] * x[b, c, j]
        # x: [B, C, D], M.T: [C, D, D] -> einsum or matmul with broadcast
        return torch.einsum('bcd,cdk->bck', x, M.transpose(-2, -1))

    def multi_rotor_sandwich(self, R: torch.Tensor, x: torch.Tensor,
                              R_rev: torch.Tensor = None) -> torch.Tensor:
        """Sandwich product with K rotors applied to C-channel input.

        Builds K action matrices [K, D, D] once, then applies all K
        rotors to x in a single einsum.  This replaces the naive
        two-sequential-geometric-product approach used by MultiRotorLayer.

        Memory: O(K*D*D) setup + O(B*C*K*D) apply.
        Compare to naive two-GP: O(2*B*C*K*D*D).

        Args:
            R: Per-rotor versors [K, D].
            x: Batched multivectors [B, C, D].
            R_rev: Optional precomputed reverse/inverse of R [K, D].

        Returns:
            Per-rotor sandwiched result [B, C, K, D].
        """
        if R_rev is None:
            R_rev = self.reverse(R)

        ci = self.cayley_indices  # [D, D]

        R_gathered = R[:, ci]                                    # [K, D, D]
        L_R = R_gathered.permute(0, 2, 1) * self._left_sign_T.unsqueeze(0)

        gp_T = self.gp_signs.T
        Rr_gathered = R_rev[:, ci]                               # [K, D, D]
        R_Rr = Rr_gathered.permute(0, 2, 1) * gp_T.unsqueeze(0)

        M = torch.bmm(R_Rr, L_R)  # [K, D, D]

        # Apply all K rotors: x[B,C,D] @ M[K,D,D].T -> [B,C,K,D]
        return torch.einsum('bcd,kde->bcke', x, M.transpose(-2, -1))

    def pseudoscalar_product(self, x: torch.Tensor) -> torch.Tensor:
        """Multiply by the unit pseudoscalar: x * I.

        Maps grade-k to grade-(n-k) (Hodge dual).  Computed as a simple
        permutation with sign flips - no geometric product needed.

        Args:
            x: Multivector [..., D].

        Returns:
            Result [..., D].
        """
        ps_signs = self._ps_signs
        if ps_signs.dtype != x.dtype:
            ps_signs = ps_signs.to(dtype=x.dtype)

        return x[..., self._ps_source] * ps_signs

    def blade_project(self, mv: torch.Tensor, blade: torch.Tensor) -> torch.Tensor:
        """Project multivector onto blade subspace: (mv . B) B^{-1}.

        Args:
            mv (torch.Tensor): Multivector to project [..., dim].
            blade (torch.Tensor): Blade defining the subspace [..., dim].

        Returns:
            torch.Tensor: Projected multivector [..., dim].
        """
        inner = self.inner_product(mv, blade)
        return self.geometric_product(inner, self.blade_inverse(blade))

    def blade_reject(self, mv: torch.Tensor, blade: torch.Tensor) -> torch.Tensor:
        """Reject multivector from blade subspace: mv - proj_B(mv).

        The orthogonal complement of the projection onto blade.

        Args:
            mv (torch.Tensor): Multivector to reject [..., dim].
            blade (torch.Tensor): Blade defining the subspace [..., dim].

        Returns:
            torch.Tensor: Rejected multivector [..., dim].
        """
        return mv - self.blade_project(mv, blade)

    def grade_involution(self, mv: torch.Tensor) -> torch.Tensor:
        """Grade involution (main involution): x_hat = sum (-1)^k <x>_k.

        Flips sign of all odd-grade components, preserves even-grade.
        This is an algebra automorphism: (AB)^ = A_hat B_hat.

        Args:
            mv (torch.Tensor): Input multivector [..., dim].

        Returns:
            torch.Tensor: Involuted multivector [..., dim].
        """
        signs = self._involution_signs
        if signs.dtype != mv.dtype:
            signs = signs.to(dtype=mv.dtype)
        return mv * signs

    def clifford_conjugation(self, mv: torch.Tensor) -> torch.Tensor:
        """Clifford conjugation: bar{x} = grade_involution(reverse(x)).

        Combines reversion and grade involution. For a k-blade:
            bar{e_I} = (-1)^k * (-1)^{k(k-1)/2} * e_I

        This is an anti-automorphism: bar{AB} = bar{B} bar{A}.

        Args:
            mv (torch.Tensor): Input multivector [..., dim].

        Returns:
            torch.Tensor: Conjugated multivector [..., dim].
        """
        cs = self.conj_signs
        if cs.dtype != mv.dtype:
            cs = cs.to(dtype=mv.dtype)
        return mv * cs

    def norm_sq(self, mv: torch.Tensor) -> torch.Tensor:
        """Squared norm: <x * reverse(x)>_0.

        Returns the scalar (grade-0) part of the product of a multivector
        with its reverse. For blades, this equals the square of the
        magnitude with the appropriate sign from the metric.

        Optimized: the scalar component of A*~A is ``sum_i a_i^2 * rev_signs[i]
        * cayley_signs[i, i]``. No full geometric product needed.

        Args:
            mv (torch.Tensor): Input multivector [..., dim].

        Returns:
            torch.Tensor: Scalar norm squared [..., 1].
        """
        # <x ~x>_0 = sum_i x_i^2 * rev_signs[i] * diag_signs[i]
        signs = self._norm_sq_signs
        if signs.dtype != mv.dtype:
            signs = signs.to(dtype=mv.dtype)
        return (mv * mv * signs).sum(dim=-1, keepdim=True)

    def left_contraction(self, A: torch.Tensor, B: torch.Tensor) -> torch.Tensor:
        """Left contraction: A _| B.

        Selects components where grade(A) <= grade(B) and
        grade(result) = grade(B) - grade(A). This is the standard
        contraction used in GA for projection-like operations.

        Args:
            A (torch.Tensor): Left operand [..., dim].
            B (torch.Tensor): Right operand [..., dim].

        Returns:
            torch.Tensor: Left contraction A _| B [..., dim].
        """
        B_gathered = B[..., self.cayley_indices]
        return torch.matmul(A.unsqueeze(-2), B_gathered * self.lc_gp_signs).squeeze(-2)

    def dual(self, mv: torch.Tensor) -> torch.Tensor:
        """Hodge dual: x* = x I^{-1}, maps grade-k to grade-(n-k).

        Equivalent to ``pseudoscalar_product`` but with conventional name.

        Args:
            mv (torch.Tensor): Input multivector [..., dim].

        Returns:
            torch.Tensor: Dual multivector [..., dim].
        """
        return self.pseudoscalar_product(mv)

    def reflect(self, x: torch.Tensor, n: torch.Tensor) -> torch.Tensor:
        """Reflect x through the hyperplane orthogonal to vector n.

        Implements the versor reflection: x' = -n x n^{-1}.

        Uses the sandwich product action-matrix when x has a channel
        dimension (3-D input), falling back to two sequential geometric
        products for general shapes.

        Args:
            x (torch.Tensor): Multivector to reflect [..., dim].
            n (torch.Tensor): Normal vector (grade-1) [..., dim].

        Returns:
            torch.Tensor: Reflected multivector [..., dim].
        """
        n_hat = self.grade_involution(n)   # -n for grade-1
        n_inv = self.blade_inverse(n)

        # Use sandwich machinery when shapes allow it
        if x.dim() == 3 and n.dim() == 2 and x.shape[0] != n.shape[0]:
            # n: [C, D], x: [B, C, D] -> per_channel_sandwich
            return self.per_channel_sandwich(n_hat, x, n_inv)
        if x.dim() == 3 and n.dim() == 2 and x.shape[0] == n.shape[0]:
            # n: [N, D], x: [N, C, D] -> sandwich_product
            return self.sandwich_product(n_hat, x, n_inv)

        return self.geometric_product(
            self.geometric_product(n_hat, x), n_inv
        )

    def versor_product(self, V: torch.Tensor, x: torch.Tensor) -> torch.Tensor:
        """General versor transformation: V x V^{-1} or hat{V} x V^{-1}.

        For even versors (rotors), this is the sandwich product.
        For odd versors (reflections), the grade involution is applied.
        The parity is determined from the grade structure of V.

        In practice, this computes: grade_involution(V) * x * V^{-1},
        which correctly handles both even and odd versors.

        Args:
            V (torch.Tensor): Versor [..., dim].
            x (torch.Tensor): Multivector to transform [..., dim].

        Returns:
            torch.Tensor: Transformed multivector [..., dim].
        """
        V_inv = self.blade_inverse(V)
        V_hat = self.grade_involution(V)
        return self.geometric_product(
            self.geometric_product(V_hat, x), V_inv
        )

    def exp(self, mv: torch.Tensor) -> torch.Tensor:
        """Exponentiates a bivector to produce a rotor.

        Dispatches by :attr:`exp_policy`:

        - ``FAST``  -- closed-form only (fastest, approximate for
          non-simple bivectors in n >= 4).
        - ``AUTO``  (default) -- closed-form for n <= 3 (exact),
          compiled-safe decomposition for n >= 4.
        - ``EXACT`` -- always compiled-safe decomposition (exact for
          all bivectors, ``torch.compile``-safe).

        Three signature regimes handled in the closed-form path:
            - Elliptic  (B^2 < 0): exp(B) = cos(th) + sin(th)/th * B
            - Hyperbolic (B^2 > 0): exp(B) = cosh(th) + sinh(th)/th * B
            - Parabolic  (B^2 ~ 0): exp(B) ~ 1 + B

        Args:
            mv (torch.Tensor): Pure bivector [..., dim].

        Returns:
            torch.Tensor: Rotor exp(mv) [..., dim].
        """
        from core.decomposition import ExpPolicy

        policy = self._exp_policy

        if policy == ExpPolicy.FAST:
            return self._exp_bivector_closed(mv)

        if policy == ExpPolicy.AUTO:
            if self.n <= 3:
                return self._exp_bivector_closed(mv)
            return self._exp_compiled_safe(mv)

        # ExpPolicy.EXACT
        return self._exp_compiled_safe(mv)

    def _exp_bivector_closed(self, B: torch.Tensor) -> torch.Tensor:
        """Closed-form exponential for bivectors in arbitrary signature.

        For a bivector B, computes B^2 (scalar part) using the metric:
            B^2_scalar = Sum_k b_k^2 . (e_k)^2   where (e_ab)^2 = -s_a.s_b

        Three regimes:
            - B^2 < 0 (elliptic): exp(B) = cos(theta) + sin(theta)/theta . B,  theta = Sqrt(-B^2)
            - B^2 > 0 (hyperbolic): exp(B) = cosh(theta) + sinh(theta)/theta . B,  theta = Sqrt(B^2)
            - B^2 ~= 0 (parabolic): exp(B) ~= 1 + B

        Uses zero geometric products. Exact for simple bivectors in any
        Clifford algebra Cl(p,q,r).

        Args:
            B (torch.Tensor): Pure bivector [..., dim].

        Returns:
            torch.Tensor: Rotor exp(B) [..., dim].
        """
        idx_expanded = self._bv_indices.expand(*B.shape[:-1], -1)
        bv_coeffs = torch.gather(B, -1, idx_expanded)  # [..., num_bivectors]

        # Signed squared norm: alpha = Sum_k b_k^2 . (e_k)^2
        # alpha < 0 -> elliptic (Euclidean-like), alpha > 0 -> hyperbolic
        alpha = (bv_coeffs * bv_coeffs * self.bv_sq_scalar).sum(dim=-1, keepdim=True)

        abs_alpha = alpha.abs().clamp(min=self.eps_sq)
        theta = torch.sqrt(abs_alpha)  # [..., 1]

        g0_mask = self.grade_masks_float[0]
        if g0_mask.dtype != B.dtype:
            g0_mask = g0_mask.to(dtype=B.dtype)

        # Dispatch by signature regime (Python branch, no graph break)
        if self._exp_regime == 'elliptic':
            # Pure Euclidean: alpha is always negative, only cos/sinc needed
            cos_theta = torch.cos(theta)
            sinc_theta = torch.where(
                theta > self.eps,
                torch.sin(theta) / theta,
                1.0 - abs_alpha / 6.0,
            )
            return cos_theta * g0_mask + sinc_theta * B

        if self._exp_regime == 'hyperbolic':
            # Pure negative: alpha is always positive, only cosh/sinhc needed
            cosh_theta = torch.cosh(theta)
            sinhc_theta = torch.where(
                theta > self.eps,
                torch.sinh(theta) / theta,
                1.0 + abs_alpha / 6.0,
            )
            return cosh_theta * g0_mask + sinhc_theta * B

        # Mixed signature: need both branches + runtime select
        cos_theta = torch.cos(theta)
        sinc_theta = torch.where(
            theta > self.eps,
            torch.sin(theta) / theta,
            1.0 - abs_alpha / 6.0,
        )
        cosh_theta = torch.cosh(theta)
        sinhc_theta = torch.where(
            theta > self.eps,
            torch.sinh(theta) / theta,
            1.0 + abs_alpha / 6.0,
        )

        is_elliptic = alpha < -self.eps_sq
        is_hyperbolic = alpha > self.eps_sq

        scalar_part = torch.where(
            is_elliptic, cos_theta,
            torch.where(is_hyperbolic, cosh_theta, torch.ones_like(theta))
        )
        coeff_part = torch.where(
            is_elliptic, sinc_theta,
            torch.where(is_hyperbolic, sinhc_theta, torch.ones_like(theta))
        )

        return scalar_part * g0_mask + coeff_part * B

    def _exp_compiled_safe(self, B: torch.Tensor) -> torch.Tensor:
        """Compiled-safe exponential: runs both closed-form and decomposed,
        selects per-element via ``torch.where`` based on simplicity.

        A bivector is simple iff ``B*B`` has no grade-4 component (i.e.
        ``B^2`` is purely scalar).  Both paths are computed unconditionally
        so there is no data-dependent branching.

        Args:
            B (torch.Tensor): Pure bivector [..., dim].

        Returns:
            torch.Tensor: Rotor exp(B) [..., dim].
        """
        from core.decomposition import compiled_safe_decomposed_exp

        R_closed = self._exp_bivector_closed(B)
        R_decomposed = compiled_safe_decomposed_exp(self, B)

        BB = self.geometric_product(B, B)
        # Subtract scalar part, check if residual is negligible
        scalar_part = self.grade_projection(BB, 0)
        non_scalar_energy = (BB - scalar_part).norm(dim=-1, keepdim=True)
        is_simple = non_scalar_energy < self.eps * 100

        return torch.where(is_simple, R_closed, R_decomposed)

    def _exp_taylor(self, mv: torch.Tensor, order: int = 8) -> torch.Tensor:
        """Taylor series exponential with scaling-and-squaring (fallback).

        Args:
            mv (torch.Tensor): General multivector [..., dim].
            order (int, optional): Taylor order. Defaults to 8.

        Returns:
            torch.Tensor: exp(mv) [..., dim].
        """
        norm = mv.norm(dim=-1, keepdim=True)
        k = torch.ceil(torch.log2(torch.clamp(norm, min=1.0))).int()

        max_k = k.max().item()
        if max_k > 0:
            mv_scaled = mv / (2.0 ** max_k)
        else:
            mv_scaled = mv

        res = torch.zeros_like(mv)
        res[..., 0] = 1.0

        term = torch.zeros_like(mv)
        term[..., 0] = 1.0

        for i in range(1, order + 1):
            term = self.geometric_product(term, mv_scaled)
            res = res + term / math.factorial(i)

        if max_k > 0:
            for _ in range(int(max_k)):
                res = self.geometric_product(res, res)

        return res

    def exp_decomposed(self, mv: torch.Tensor, **kwargs) -> torch.Tensor:
        """Exponentiates a bivector via decomposition into simple components.

        .. deprecated::
            Set :attr:`exp_policy` and call :meth:`exp` instead.

        Args:
            mv (torch.Tensor): Pure bivector [..., dim].
            **kwargs: Legacy kwargs (``use_decomposition``, ``k``, etc.).

        Returns:
            torch.Tensor: Rotor exp(mv) [..., dim].
        """
        import warnings
        warnings.warn(
            "exp_decomposed() is deprecated. Set algebra.exp_policy and "
            "use algebra.exp() instead.",
            DeprecationWarning,
            stacklevel=2,
        )
        # Legacy compat: use_decomposition=False means closed-form
        if kwargs.get('use_decomposition') is False:
            return self._exp_bivector_closed(mv)
        return self.exp(mv)

device property

Return the device of the algebra tables.

dtype property

Return the floating-point dtype of the algebra tables.

Reflects the current state — updated automatically when the algebra is moved via .to(dtype=...).

grade_masks property

Grade masks indexed by grade: grade_masks[k] -> [dim] bool.

grade_masks_float property

Float grade masks indexed by grade: grade_masks_float[k] -> [dim] float.

exp_policy property writable

Active :class:ExpPolicy controlling exp() dispatch.

num_grades property

Counts the number of grades (n + 1).

Returns:

Name Type Description
int int

Number of grades.

__init__(p, q=0, r=0, device='cuda', dtype=torch.float32, exp_policy='auto')

Initialize the algebra and cache the Cayley table.

Parameters:

Name Type Description Default
p int

Positive dimensions (+1).

required
q int

Negative dimensions (-1). Defaults to 0.

0
r int

Degenerate dimensions (0). Defaults to 0.

0
device str

The device on which computations are performed. Defaults to 'cuda'.

'cuda'
dtype dtype

Floating-point dtype for algebra tables. Defaults to torch.float32. Pass torch.bfloat16 or torch.float16 when the model will be trained in a reduced precision (e.g. AMP on CUDA bfloat16 mode).

float32
exp_policy str or ExpPolicy

Bivector exp policy. 'auto' (default), 'fast', or 'exact'. See :class:core.decomposition.ExpPolicy.

'auto'
Source code in core/algebra.py
def __init__(self, p: int, q: int = 0, r: int = 0, device='cuda',
             dtype: torch.dtype = torch.float32,
             exp_policy: str = 'auto'):
    """Initialize the algebra and cache the Cayley table.

    Args:
        p (int): Positive dimensions (+1).
        q (int, optional): Negative dimensions (-1). Defaults to 0.
        r (int, optional): Degenerate dimensions (0). Defaults to 0.
        device (str, optional): The device on which computations are performed. Defaults to 'cuda'.
        dtype (torch.dtype, optional): Floating-point dtype for algebra tables.
            Defaults to ``torch.float32``.  Pass ``torch.bfloat16`` or
            ``torch.float16`` when the model will be trained in a reduced
            precision (e.g. AMP on CUDA bfloat16 mode).
        exp_policy (str or ExpPolicy, optional): Bivector exp policy.
            ``'auto'`` (default), ``'fast'``, or ``'exact'``.
            See :class:`core.decomposition.ExpPolicy`.
    """
    super().__init__()

    assert p >= 0, f"p must be non-negative, got {p}"
    assert q >= 0, f"q must be non-negative, got {q}"
    assert r >= 0, f"r must be non-negative, got {r}"
    assert p + q + r <= 12, f"p + q + r must be <= 12, got {p + q + r}"

    self.p, self.q, self.r = p, q, r
    self.n = p + q + r
    self.dim = 2 ** self.n

    # Exp regime: dispatch at init
    if p == 0 or q == 0:
        self._exp_regime = 'elliptic'
    elif p == 1 and q == 1 and r == 0:
        self._exp_regime = 'hyperbolic'
    else:
        self._exp_regime = 'mixed'

    # Exp policy: controls decomposition strategy
    from core.decomposition import ExpPolicy
    if isinstance(exp_policy, str):
        self._exp_policy = ExpPolicy(exp_policy)
    else:
        self._exp_policy = exp_policy

    # Cache Cayley tables to avoid recomputation
    cache_key = (p, q, r, str(device), str(dtype))
    if cache_key not in CliffordAlgebra._CACHED_TABLES:
        CliffordAlgebra._CACHED_TABLES[cache_key] = self._generate_cayley_table(device, dtype)

    (
        cayley_indices, cayley_signs, gp_signs, grade_masks_list,
        rev_signs, bv_sq_scalar, wedge_gp_signs, inner_gp_signs,
        grade_index, rc_action, lc_gp_signs, conj_signs,
        comm_gp_signs, anti_comm_gp_signs,
    ) = CliffordAlgebra._CACHED_TABLES[cache_key]

    # Register all tables as non-persistent buffers so .to(device) moves them
    self.register_buffer('cayley_indices', cayley_indices, persistent=False)
    self.register_buffer('cayley_signs', cayley_signs, persistent=False)
    self.register_buffer('gp_signs', gp_signs, persistent=False)
    self.register_buffer('rev_signs', rev_signs, persistent=False)
    self.register_buffer('bv_sq_scalar', bv_sq_scalar, persistent=False)
    self.register_buffer('wedge_gp_signs', wedge_gp_signs, persistent=False)
    self.register_buffer('inner_gp_signs', inner_gp_signs, persistent=False)
    self.register_buffer('grade_index', grade_index, persistent=False)
    self.register_buffer('rc_action', rc_action, persistent=False)
    self.register_buffer('lc_gp_signs', lc_gp_signs, persistent=False)
    self.register_buffer('conj_signs', conj_signs, persistent=False)
    self.register_buffer('comm_gp_signs', comm_gp_signs, persistent=False)
    self.register_buffer('anti_comm_gp_signs', anti_comm_gp_signs, persistent=False)

    # Grade involution signs: (-1)^k per basis element
    inv_signs = ((-1.0) ** grade_index.float()).to(dtype=conj_signs.dtype)
    self.register_buffer('_involution_signs', inv_signs, persistent=False)

    # Stack grade masks: [n+1, dim] bool and float
    stacked = torch.stack(grade_masks_list)                 # [n+1, dim]
    self.register_buffer('_grade_masks', stacked, persistent=False)
    self.register_buffer('_grade_masks_float', stacked.to(dtype=cayley_signs.dtype), persistent=False)

    # Bivector indices
    if self.n >= 2:
        bv_idx = stacked[2].nonzero(as_tuple=False).squeeze(-1)
    else:
        bv_idx = torch.zeros(0, dtype=torch.long, device=device)
    self.register_buffer('_bv_indices', bv_idx, persistent=False)

    # Pre-initialize derived tables (sandwich_product / pseudoscalar_product)
    self._init_derived_tables()

    # Precomputed finfo-derived tolerances for dtype-aware numerical guards.
    # Plain floats for zero-overhead usage in clamp/where operations.
    _finfo = torch.finfo(self.cayley_signs.dtype)
    self.eps: float = float(_finfo.eps)
    self.eps_sq: float = float(_finfo.eps ** 2)

embed_vector(vectors)

Injects vectors into the Grade-1 subspace.

Parameters:

Name Type Description Default
vectors Tensor

Raw vectors [..., n].

required

Returns:

Type Description
Tensor

torch.Tensor: Multivector coefficients [..., dim].

Source code in core/algebra.py
def embed_vector(self, vectors: torch.Tensor) -> torch.Tensor:
    """Injects vectors into the Grade-1 subspace.

    Args:
        vectors (torch.Tensor): Raw vectors [..., n].

    Returns:
        torch.Tensor: Multivector coefficients [..., dim].
    """
    g1_idx = (1 << torch.arange(self.n, device=vectors.device)).long()
    mv = torch.zeros(*vectors.shape[:-1], self.dim,
                      device=vectors.device, dtype=vectors.dtype)
    mv.scatter_(-1, g1_idx.expand_as(vectors), vectors)
    return mv

get_grade_norms(mv)

Calculates norms per grade. Useful for invariant features.

Vectorized via scatter_add.

Parameters:

Name Type Description Default
mv Tensor

Input multivector [..., dim].

required

Returns:

Type Description
Tensor

torch.Tensor: Grade norms [..., num_grades].

Source code in core/algebra.py
def get_grade_norms(self, mv: torch.Tensor) -> torch.Tensor:
    """Calculates norms per grade. Useful for invariant features.

    Vectorized via scatter_add.

    Args:
        mv (torch.Tensor): Input multivector [..., dim].

    Returns:
        torch.Tensor: Grade norms [..., num_grades].
    """
    gi = self.grade_index
    batch_shape = mv.shape[:-1]
    sq = mv.pow(2)
    flat = sq.reshape(-1, self.dim)
    idx = gi.unsqueeze(0).expand_as(flat)
    result = torch.zeros(flat.shape[0], self.num_grades, device=mv.device, dtype=mv.dtype)
    result.scatter_add_(1, idx, flat)
    return result.reshape(*batch_shape, self.num_grades).clamp(min=self.eps).sqrt()

geometric_product(A, B)

Computes the Geometric Product.

Uses vectorized gather + broadcast multiply + sum.

Parameters:

Name Type Description Default
A Tensor

Left operand [..., Dim].

required
B Tensor

Right operand [..., Dim].

required

Returns:

Type Description
Tensor

torch.Tensor: The product AB [..., Dim].

Source code in core/algebra.py
def geometric_product(self, A: torch.Tensor, B: torch.Tensor) -> torch.Tensor:
    """Computes the Geometric Product.

    Uses vectorized gather + broadcast multiply + sum.

    Args:
        A (torch.Tensor): Left operand [..., Dim].
        B (torch.Tensor): Right operand [..., Dim].

    Returns:
        torch.Tensor: The product AB [..., Dim].
    """
    check_multivector(A, self, "geometric_product(A)")
    check_multivector(B, self, "geometric_product(B)")

    # Gather B coefficients according to Cayley table: B_gathered[..., i, k] = B[..., cayley[i,k]]
    B_gathered = B[..., self.cayley_indices]  # [..., D, D]

    # result[..., k] = sum_i A[..., i] * B[..., cayley[i,k]] * signs[i,k]
    return torch.matmul(A.unsqueeze(-2), B_gathered * self.gp_signs).squeeze(-2)

grade_projection(mv, grade)

Isolates a specific grade.

Uses multiplicative masking (mv * float_mask) instead of boolean indexing to avoid nonzero calls that break torch.compile.

Parameters:

Name Type Description Default
mv Tensor

Multivector [..., Dim].

required
grade int

Target grade.

required

Returns:

Type Description
Tensor

torch.Tensor: Projected multivector [..., Dim].

Source code in core/algebra.py
def grade_projection(self, mv: torch.Tensor, grade: int) -> torch.Tensor:
    """Isolates a specific grade.

    Uses multiplicative masking (mv * float_mask) instead of boolean
    indexing to avoid ``nonzero`` calls that break ``torch.compile``.

    Args:
        mv (torch.Tensor): Multivector [..., Dim].
        grade (int): Target grade.

    Returns:
        torch.Tensor: Projected multivector [..., Dim].
    """
    mask = self.grade_masks_float[grade]
    if mask.dtype != mv.dtype:
        mask = mask.to(dtype=mv.dtype)
    return mv * mask

reverse(mv)

Computes the reversion. The Clifford conjugate.

Parameters:

Name Type Description Default
mv Tensor

Input multivector [..., Dim].

required

Returns:

Type Description
Tensor

torch.Tensor: Reversed multivector [..., Dim].

Source code in core/algebra.py
def reverse(self, mv: torch.Tensor) -> torch.Tensor:
    """Computes the reversion. The Clifford conjugate.

    Args:
        mv (torch.Tensor): Input multivector [..., Dim].

    Returns:
        torch.Tensor: Reversed multivector [..., Dim].
    """
    rev = self.rev_signs
    if rev.dtype != mv.dtype:
        rev = rev.to(dtype=mv.dtype)
    return mv * rev

wedge(A, B)

Computes the wedge (outer) product: A ^ B = (AB - BA)/2.

Single-pass implementation using precomputed antisymmetric signs.

Reference

Pence, T., Yamada, D., & Singh, V. (2025). "Composing Linear Layers from Irreducibles." arXiv:2507.11688v1 [cs.LG]

Parameters:

Name Type Description Default
A Tensor

Left operand [..., dim].

required
B Tensor

Right operand [..., dim].

required

Returns:

Type Description
Tensor

torch.Tensor: Wedge product A ^ B [..., dim].

Source code in core/algebra.py
def wedge(self, A: torch.Tensor, B: torch.Tensor) -> torch.Tensor:
    """Computes the wedge (outer) product: A ^ B = (AB - BA)/2.

    Single-pass implementation using precomputed antisymmetric signs.

    Reference:
        Pence, T., Yamada, D., & Singh, V. (2025). "Composing Linear Layers
        from Irreducibles." arXiv:2507.11688v1 [cs.LG]

    Args:
        A (torch.Tensor): Left operand [..., dim].
        B (torch.Tensor): Right operand [..., dim].

    Returns:
        torch.Tensor: Wedge product A ^ B [..., dim].
    """
    B_gathered = B[..., self.cayley_indices]
    return torch.matmul(A.unsqueeze(-2), B_gathered * self.wedge_gp_signs).squeeze(-2)

right_contraction(A, B)

Computes the right contraction: A _| B.

Fast path for bivector-vector case using precomputed skew-symmetric action matrices (avoids full geometric product + grade projection).

Reference

Pence, T., Yamada, D., & Singh, V. (2025). "Composing Linear Layers from Irreducibles." arXiv:2507.11688v1 [cs.LG], Algorithm 2

Parameters:

Name Type Description Default
A Tensor

Left operand (bivector) [..., dim].

required
B Tensor

Right operand (vector) [..., dim].

required

Returns:

Type Description
Tensor

torch.Tensor: Right contraction A _| B [..., dim].

Source code in core/algebra.py
def right_contraction(self, A: torch.Tensor, B: torch.Tensor) -> torch.Tensor:
    """Computes the right contraction: A _| B.

    Fast path for bivector-vector case using precomputed skew-symmetric
    action matrices (avoids full geometric product + grade projection).

    Reference:
        Pence, T., Yamada, D., & Singh, V. (2025). "Composing Linear Layers
        from Irreducibles." arXiv:2507.11688v1 [cs.LG], Algorithm 2

    Args:
        A (torch.Tensor): Left operand (bivector) [..., dim].
        B (torch.Tensor): Right operand (vector) [..., dim].

    Returns:
        torch.Tensor: Right contraction A _| B [..., dim].
    """
    # Use gather instead of boolean indexing (compile-friendly)
    bv_idx_exp = self._bv_indices.expand(*A.shape[:-1], -1)
    bv_coeffs = torch.gather(A, -1, bv_idx_exp)          # [..., num_bv]

    # Grade-1 indices: powers of 2 for basis vectors
    g1_idx = torch.arange(self.n, device=A.device)
    g1_idx = (1 << g1_idx).long()                         # [n]
    g1_idx_exp = g1_idx.expand(*B.shape[:-1], -1)
    v_coeffs = torch.gather(B, -1, g1_idx_exp)            # [..., n]

    rc = self.rc_action
    if rc.dtype != A.dtype:
        rc = rc.to(dtype=A.dtype)

    # M[..., i, j] = sum_b bv_coeffs[..., b] * rc_action[b, i, j]
    M = torch.einsum('...b, bij -> ...ij', bv_coeffs, rc)  # [..., n, n]
    result_v = torch.matmul(M, v_coeffs.unsqueeze(-1)).squeeze(-1)  # [..., n]

    result = torch.zeros_like(A)
    result.scatter_(-1, g1_idx_exp, result_v)
    return result

inner_product(A, B)

Computes the inner product: A . B = (AB + BA)/2.

Single-pass implementation using precomputed symmetric signs.

Reference

Pence, T., Yamada, D., & Singh, V. (2025). "Composing Linear Layers from Irreducibles." arXiv:2507.11688v1 [cs.LG]

Parameters:

Name Type Description Default
A Tensor

Left operand [..., dim].

required
B Tensor

Right operand [..., dim].

required

Returns:

Type Description
Tensor

torch.Tensor: Inner product A . B [..., dim].

Source code in core/algebra.py
def inner_product(self, A: torch.Tensor, B: torch.Tensor) -> torch.Tensor:
    """Computes the inner product: A . B = (AB + BA)/2.

    Single-pass implementation using precomputed symmetric signs.

    Reference:
        Pence, T., Yamada, D., & Singh, V. (2025). "Composing Linear Layers
        from Irreducibles." arXiv:2507.11688v1 [cs.LG]

    Args:
        A (torch.Tensor): Left operand [..., dim].
        B (torch.Tensor): Right operand [..., dim].

    Returns:
        torch.Tensor: Inner product A . B [..., dim].
    """
    B_gathered = B[..., self.cayley_indices]
    return torch.matmul(A.unsqueeze(-2), B_gathered * self.inner_gp_signs).squeeze(-2)

commutator(A, B)

Computes the commutator (Lie bracket): [A, B] = AB - BA.

Single-pass implementation using precomputed antisymmetric signs (same structure as :meth:wedge but without the 1/2 factor).

Parameters:

Name Type Description Default
A Tensor

Left operand [..., dim].

required
B Tensor

Right operand [..., dim].

required

Returns:

Type Description
Tensor

torch.Tensor: Commutator [A, B][..., dim].

Source code in core/algebra.py
def commutator(self, A: torch.Tensor, B: torch.Tensor) -> torch.Tensor:
    """Computes the commutator (Lie bracket): [A, B] = AB - BA.

    Single-pass implementation using precomputed antisymmetric signs
    (same structure as :meth:`wedge` but without the 1/2 factor).

    Args:
        A (torch.Tensor): Left operand [..., dim].
        B (torch.Tensor): Right operand [..., dim].

    Returns:
        torch.Tensor: Commutator [A, B] [..., dim].
    """
    B_gathered = B[..., self.cayley_indices]
    return torch.matmul(A.unsqueeze(-2), B_gathered * self.comm_gp_signs).squeeze(-2)

anti_commutator(A, B)

Computes the anti-commutator: {A, B} = AB + BA.

Single-pass implementation using precomputed symmetric signs (same structure as :meth:inner_product but without the 1/2 factor).

Parameters:

Name Type Description Default
A Tensor

Left operand [..., dim].

required
B Tensor

Right operand [..., dim].

required

Returns:

Type Description
Tensor

torch.Tensor: Anti-commutator {A, B} [..., dim].

Source code in core/algebra.py
def anti_commutator(self, A: torch.Tensor, B: torch.Tensor) -> torch.Tensor:
    """Computes the anti-commutator: {A, B} = AB + BA.

    Single-pass implementation using precomputed symmetric signs
    (same structure as :meth:`inner_product` but without the 1/2 factor).

    Args:
        A (torch.Tensor): Left operand [..., dim].
        B (torch.Tensor): Right operand [..., dim].

    Returns:
        torch.Tensor: Anti-commutator {A, B} [..., dim].
    """
    B_gathered = B[..., self.cayley_indices]
    return torch.matmul(A.unsqueeze(-2), B_gathered * self.anti_comm_gp_signs).squeeze(-2)

blade_inverse(blade)

Compute the inverse of a blade: B^{-1} = B_rev / _0.

Works for any simple blade (non-degenerate). For null blades the scalar denominator is clamped to avoid division by zero.

Parameters:

Name Type Description Default
blade Tensor

Blade multivector [..., dim].

required

Returns:

Type Description
Tensor

torch.Tensor: Inverse blade [..., dim].

Source code in core/algebra.py
def blade_inverse(self, blade: torch.Tensor) -> torch.Tensor:
    """Compute the inverse of a blade: B^{-1} = B_rev / <B * B_rev>_0.

    Works for any simple blade (non-degenerate). For null blades the
    scalar denominator is clamped to avoid division by zero.

    Args:
        blade (torch.Tensor): Blade multivector [..., dim].

    Returns:
        torch.Tensor: Inverse blade [..., dim].
    """
    blade_rev = self.reverse(blade)
    blade_sq = self.geometric_product(blade, blade_rev)
    scalar = blade_sq[..., 0:1].clamp(min=self.eps_sq)
    return blade_rev / scalar

sandwich_product(R, x, R_rev=None)

Optimized sandwich product R x R~ via action matrix.

Builds a [N, D, D] sandwich action matrix from the rotor, then applies it to all C channels via a single batched matmul. This is much faster than two separate geometric_product calls when x has extra channel dimensions that R does not.

Memory: O(NDD) where N = batch (without channels). Compare to naive: O(NCD*D) - a factor of C improvement.

Parameters:

Name Type Description Default
R Tensor

Rotors [N, D] (2-D, batch-flattened).

required
x Tensor

Multivectors [N, C, D] (3-D, C channels per rotor).

required
R_rev Tensor

Optional precomputed reverse of R [N, D].

None

Returns:

Type Description
Tensor

Sandwiched result [N, C, D].

Source code in core/algebra.py
def sandwich_product(self, R: torch.Tensor, x: torch.Tensor,
                     R_rev: torch.Tensor = None) -> torch.Tensor:
    """Optimized sandwich product R x R~ via action matrix.

    Builds a [N, D, D] sandwich action matrix from the rotor, then applies
    it to all C channels via a single batched matmul.  This is much faster
    than two separate ``geometric_product`` calls when x has extra channel
    dimensions that R does not.

    Memory: O(N*D*D) where N = batch (without channels).
    Compare to naive: O(N*C*D*D) - a factor of C improvement.

    Args:
        R: Rotors [N, D] (2-D, batch-flattened).
        x: Multivectors [N, C, D] (3-D, C channels per rotor).
        R_rev: Optional precomputed reverse of R [N, D].

    Returns:
        Sandwiched result [N, C, D].
    """
    if R_rev is None:
        R_rev = self.reverse(R)

    ci = self.cayley_indices  # [D, D], ci[i, j] = i ^ j

    # Left-multiplication matrix L_R:  L_R[n, k, j] = R[n, j^k] * gp_signs[j^k, k]
    R_gathered = R[:, ci]  # [N, D(j), D(k)]
    L_R = R_gathered.permute(0, 2, 1) * self._left_sign_T.unsqueeze(0)

    # Right-multiplication matrix R_{R~}:  R_Rr[n, k, i] = R~[n, i^k] * gp_signs[i, k]
    gp_T = self.gp_signs.T
    Rr_gathered = R_rev[:, ci]  # [N, D(i), D(k)]
    R_Rr = Rr_gathered.permute(0, 2, 1) * gp_T.unsqueeze(0)

    # Sandwich matrix:  M = R_Rr @ L_R   ->   (R x R~)[k] = sum_j M[k, j] * x[j]
    M = torch.bmm(R_Rr, L_R)  # [N, D, D]

    # Apply to all channels:  result[n, c, k] = sum_j M[n, k, j] * x[n, c, j]
    return torch.matmul(x, M.transpose(-2, -1))

per_channel_sandwich(R, x, R_rev=None)

Sandwich product with per-channel rotors via action matrices.

Each channel c has its own rotor R[c]. Builds a [C, D, D] action matrix (one per channel), then applies to all batch elements in one matmul.

Memory: O(CDD + BCD) vs naive two-GP: O(2BCDD).

Parameters:

Name Type Description Default
R Tensor

Per-channel rotors [C, D].

required
x Tensor

Batched multivectors [B, C, D].

required
R_rev Tensor

Optional precomputed reverse of R [C, D].

None

Returns:

Type Description
Tensor

Sandwiched result [B, C, D].

Source code in core/algebra.py
def per_channel_sandwich(self, R: torch.Tensor, x: torch.Tensor,
                          R_rev: torch.Tensor = None) -> torch.Tensor:
    """Sandwich product with per-channel rotors via action matrices.

    Each channel c has its own rotor R[c]. Builds a [C, D, D] action matrix
    (one per channel), then applies to all batch elements in one matmul.

    Memory: O(C*D*D + B*C*D) vs naive two-GP: O(2*B*C*D*D).

    Args:
        R: Per-channel rotors [C, D].
        x: Batched multivectors [B, C, D].
        R_rev: Optional precomputed reverse of R [C, D].

    Returns:
        Sandwiched result [B, C, D].
    """
    if R_rev is None:
        R_rev = self.reverse(R)

    ci = self.cayley_indices  # [D, D]

    # Build per-channel action matrices M[c, k, j]
    R_gathered = R[:, ci]                                    # [C, D, D]
    L_R = R_gathered.permute(0, 2, 1) * self._left_sign_T.unsqueeze(0)

    gp_T = self.gp_signs.T
    Rr_gathered = R_rev[:, ci]                               # [C, D, D]
    R_Rr = Rr_gathered.permute(0, 2, 1) * gp_T.unsqueeze(0)

    M = torch.bmm(R_Rr, L_R)  # [C, D, D]

    # Apply: result[b, c, k] = sum_j M[c, j, k] * x[b, c, j]
    # x: [B, C, D], M.T: [C, D, D] -> einsum or matmul with broadcast
    return torch.einsum('bcd,cdk->bck', x, M.transpose(-2, -1))

multi_rotor_sandwich(R, x, R_rev=None)

Sandwich product with K rotors applied to C-channel input.

Builds K action matrices [K, D, D] once, then applies all K rotors to x in a single einsum. This replaces the naive two-sequential-geometric-product approach used by MultiRotorLayer.

Memory: O(KDD) setup + O(BCKD) apply. Compare to naive two-GP: O(2BCKDD).

Parameters:

Name Type Description Default
R Tensor

Per-rotor versors [K, D].

required
x Tensor

Batched multivectors [B, C, D].

required
R_rev Tensor

Optional precomputed reverse/inverse of R [K, D].

None

Returns:

Type Description
Tensor

Per-rotor sandwiched result [B, C, K, D].

Source code in core/algebra.py
def multi_rotor_sandwich(self, R: torch.Tensor, x: torch.Tensor,
                          R_rev: torch.Tensor = None) -> torch.Tensor:
    """Sandwich product with K rotors applied to C-channel input.

    Builds K action matrices [K, D, D] once, then applies all K
    rotors to x in a single einsum.  This replaces the naive
    two-sequential-geometric-product approach used by MultiRotorLayer.

    Memory: O(K*D*D) setup + O(B*C*K*D) apply.
    Compare to naive two-GP: O(2*B*C*K*D*D).

    Args:
        R: Per-rotor versors [K, D].
        x: Batched multivectors [B, C, D].
        R_rev: Optional precomputed reverse/inverse of R [K, D].

    Returns:
        Per-rotor sandwiched result [B, C, K, D].
    """
    if R_rev is None:
        R_rev = self.reverse(R)

    ci = self.cayley_indices  # [D, D]

    R_gathered = R[:, ci]                                    # [K, D, D]
    L_R = R_gathered.permute(0, 2, 1) * self._left_sign_T.unsqueeze(0)

    gp_T = self.gp_signs.T
    Rr_gathered = R_rev[:, ci]                               # [K, D, D]
    R_Rr = Rr_gathered.permute(0, 2, 1) * gp_T.unsqueeze(0)

    M = torch.bmm(R_Rr, L_R)  # [K, D, D]

    # Apply all K rotors: x[B,C,D] @ M[K,D,D].T -> [B,C,K,D]
    return torch.einsum('bcd,kde->bcke', x, M.transpose(-2, -1))

pseudoscalar_product(x)

Multiply by the unit pseudoscalar: x * I.

Maps grade-k to grade-(n-k) (Hodge dual). Computed as a simple permutation with sign flips - no geometric product needed.

Parameters:

Name Type Description Default
x Tensor

Multivector [..., D].

required

Returns:

Type Description
Tensor

Result [..., D].

Source code in core/algebra.py
def pseudoscalar_product(self, x: torch.Tensor) -> torch.Tensor:
    """Multiply by the unit pseudoscalar: x * I.

    Maps grade-k to grade-(n-k) (Hodge dual).  Computed as a simple
    permutation with sign flips - no geometric product needed.

    Args:
        x: Multivector [..., D].

    Returns:
        Result [..., D].
    """
    ps_signs = self._ps_signs
    if ps_signs.dtype != x.dtype:
        ps_signs = ps_signs.to(dtype=x.dtype)

    return x[..., self._ps_source] * ps_signs

blade_project(mv, blade)

Project multivector onto blade subspace: (mv . B) B^{-1}.

Parameters:

Name Type Description Default
mv Tensor

Multivector to project [..., dim].

required
blade Tensor

Blade defining the subspace [..., dim].

required

Returns:

Type Description
Tensor

torch.Tensor: Projected multivector [..., dim].

Source code in core/algebra.py
def blade_project(self, mv: torch.Tensor, blade: torch.Tensor) -> torch.Tensor:
    """Project multivector onto blade subspace: (mv . B) B^{-1}.

    Args:
        mv (torch.Tensor): Multivector to project [..., dim].
        blade (torch.Tensor): Blade defining the subspace [..., dim].

    Returns:
        torch.Tensor: Projected multivector [..., dim].
    """
    inner = self.inner_product(mv, blade)
    return self.geometric_product(inner, self.blade_inverse(blade))

blade_reject(mv, blade)

Reject multivector from blade subspace: mv - proj_B(mv).

The orthogonal complement of the projection onto blade.

Parameters:

Name Type Description Default
mv Tensor

Multivector to reject [..., dim].

required
blade Tensor

Blade defining the subspace [..., dim].

required

Returns:

Type Description
Tensor

torch.Tensor: Rejected multivector [..., dim].

Source code in core/algebra.py
def blade_reject(self, mv: torch.Tensor, blade: torch.Tensor) -> torch.Tensor:
    """Reject multivector from blade subspace: mv - proj_B(mv).

    The orthogonal complement of the projection onto blade.

    Args:
        mv (torch.Tensor): Multivector to reject [..., dim].
        blade (torch.Tensor): Blade defining the subspace [..., dim].

    Returns:
        torch.Tensor: Rejected multivector [..., dim].
    """
    return mv - self.blade_project(mv, blade)

grade_involution(mv)

Grade involution (main involution): x_hat = sum (-1)^k _k.

Flips sign of all odd-grade components, preserves even-grade. This is an algebra automorphism: (AB)^ = A_hat B_hat.

Parameters:

Name Type Description Default
mv Tensor

Input multivector [..., dim].

required

Returns:

Type Description
Tensor

torch.Tensor: Involuted multivector [..., dim].

Source code in core/algebra.py
def grade_involution(self, mv: torch.Tensor) -> torch.Tensor:
    """Grade involution (main involution): x_hat = sum (-1)^k <x>_k.

    Flips sign of all odd-grade components, preserves even-grade.
    This is an algebra automorphism: (AB)^ = A_hat B_hat.

    Args:
        mv (torch.Tensor): Input multivector [..., dim].

    Returns:
        torch.Tensor: Involuted multivector [..., dim].
    """
    signs = self._involution_signs
    if signs.dtype != mv.dtype:
        signs = signs.to(dtype=mv.dtype)
    return mv * signs

clifford_conjugation(mv)

Clifford conjugation: bar{x} = grade_involution(reverse(x)).

Combines reversion and grade involution. For a k-blade: bar{e_I} = (-1)^k * (-1)^{k(k-1)/2} * e_I

This is an anti-automorphism: bar{AB} = bar{B} bar{A}.

Parameters:

Name Type Description Default
mv Tensor

Input multivector [..., dim].

required

Returns:

Type Description
Tensor

torch.Tensor: Conjugated multivector [..., dim].

Source code in core/algebra.py
def clifford_conjugation(self, mv: torch.Tensor) -> torch.Tensor:
    """Clifford conjugation: bar{x} = grade_involution(reverse(x)).

    Combines reversion and grade involution. For a k-blade:
        bar{e_I} = (-1)^k * (-1)^{k(k-1)/2} * e_I

    This is an anti-automorphism: bar{AB} = bar{B} bar{A}.

    Args:
        mv (torch.Tensor): Input multivector [..., dim].

    Returns:
        torch.Tensor: Conjugated multivector [..., dim].
    """
    cs = self.conj_signs
    if cs.dtype != mv.dtype:
        cs = cs.to(dtype=mv.dtype)
    return mv * cs

norm_sq(mv)

Squared norm: _0.

Returns the scalar (grade-0) part of the product of a multivector with its reverse. For blades, this equals the square of the magnitude with the appropriate sign from the metric.

Optimized: the scalar component of A*~A is sum_i a_i^2 * rev_signs[i] * cayley_signs[i, i]. No full geometric product needed.

Parameters:

Name Type Description Default
mv Tensor

Input multivector [..., dim].

required

Returns:

Type Description
Tensor

torch.Tensor: Scalar norm squared [..., 1].

Source code in core/algebra.py
def norm_sq(self, mv: torch.Tensor) -> torch.Tensor:
    """Squared norm: <x * reverse(x)>_0.

    Returns the scalar (grade-0) part of the product of a multivector
    with its reverse. For blades, this equals the square of the
    magnitude with the appropriate sign from the metric.

    Optimized: the scalar component of A*~A is ``sum_i a_i^2 * rev_signs[i]
    * cayley_signs[i, i]``. No full geometric product needed.

    Args:
        mv (torch.Tensor): Input multivector [..., dim].

    Returns:
        torch.Tensor: Scalar norm squared [..., 1].
    """
    # <x ~x>_0 = sum_i x_i^2 * rev_signs[i] * diag_signs[i]
    signs = self._norm_sq_signs
    if signs.dtype != mv.dtype:
        signs = signs.to(dtype=mv.dtype)
    return (mv * mv * signs).sum(dim=-1, keepdim=True)

left_contraction(A, B)

Left contraction: A _| B.

Selects components where grade(A) <= grade(B) and grade(result) = grade(B) - grade(A). This is the standard contraction used in GA for projection-like operations.

Parameters:

Name Type Description Default
A Tensor

Left operand [..., dim].

required
B Tensor

Right operand [..., dim].

required

Returns:

Type Description
Tensor

torch.Tensor: Left contraction A _| B [..., dim].

Source code in core/algebra.py
def left_contraction(self, A: torch.Tensor, B: torch.Tensor) -> torch.Tensor:
    """Left contraction: A _| B.

    Selects components where grade(A) <= grade(B) and
    grade(result) = grade(B) - grade(A). This is the standard
    contraction used in GA for projection-like operations.

    Args:
        A (torch.Tensor): Left operand [..., dim].
        B (torch.Tensor): Right operand [..., dim].

    Returns:
        torch.Tensor: Left contraction A _| B [..., dim].
    """
    B_gathered = B[..., self.cayley_indices]
    return torch.matmul(A.unsqueeze(-2), B_gathered * self.lc_gp_signs).squeeze(-2)

dual(mv)

Hodge dual: x* = x I^{-1}, maps grade-k to grade-(n-k).

Equivalent to pseudoscalar_product but with conventional name.

Parameters:

Name Type Description Default
mv Tensor

Input multivector [..., dim].

required

Returns:

Type Description
Tensor

torch.Tensor: Dual multivector [..., dim].

Source code in core/algebra.py
def dual(self, mv: torch.Tensor) -> torch.Tensor:
    """Hodge dual: x* = x I^{-1}, maps grade-k to grade-(n-k).

    Equivalent to ``pseudoscalar_product`` but with conventional name.

    Args:
        mv (torch.Tensor): Input multivector [..., dim].

    Returns:
        torch.Tensor: Dual multivector [..., dim].
    """
    return self.pseudoscalar_product(mv)

reflect(x, n)

Reflect x through the hyperplane orthogonal to vector n.

Implements the versor reflection: x' = -n x n^{-1}.

Uses the sandwich product action-matrix when x has a channel dimension (3-D input), falling back to two sequential geometric products for general shapes.

Parameters:

Name Type Description Default
x Tensor

Multivector to reflect [..., dim].

required
n Tensor

Normal vector (grade-1) [..., dim].

required

Returns:

Type Description
Tensor

torch.Tensor: Reflected multivector [..., dim].

Source code in core/algebra.py
def reflect(self, x: torch.Tensor, n: torch.Tensor) -> torch.Tensor:
    """Reflect x through the hyperplane orthogonal to vector n.

    Implements the versor reflection: x' = -n x n^{-1}.

    Uses the sandwich product action-matrix when x has a channel
    dimension (3-D input), falling back to two sequential geometric
    products for general shapes.

    Args:
        x (torch.Tensor): Multivector to reflect [..., dim].
        n (torch.Tensor): Normal vector (grade-1) [..., dim].

    Returns:
        torch.Tensor: Reflected multivector [..., dim].
    """
    n_hat = self.grade_involution(n)   # -n for grade-1
    n_inv = self.blade_inverse(n)

    # Use sandwich machinery when shapes allow it
    if x.dim() == 3 and n.dim() == 2 and x.shape[0] != n.shape[0]:
        # n: [C, D], x: [B, C, D] -> per_channel_sandwich
        return self.per_channel_sandwich(n_hat, x, n_inv)
    if x.dim() == 3 and n.dim() == 2 and x.shape[0] == n.shape[0]:
        # n: [N, D], x: [N, C, D] -> sandwich_product
        return self.sandwich_product(n_hat, x, n_inv)

    return self.geometric_product(
        self.geometric_product(n_hat, x), n_inv
    )

versor_product(V, x)

General versor transformation: V x V^{-1} or hat{V} x V^{-1}.

For even versors (rotors), this is the sandwich product. For odd versors (reflections), the grade involution is applied. The parity is determined from the grade structure of V.

In practice, this computes: grade_involution(V) * x * V^{-1}, which correctly handles both even and odd versors.

Parameters:

Name Type Description Default
V Tensor

Versor [..., dim].

required
x Tensor

Multivector to transform [..., dim].

required

Returns:

Type Description
Tensor

torch.Tensor: Transformed multivector [..., dim].

Source code in core/algebra.py
def versor_product(self, V: torch.Tensor, x: torch.Tensor) -> torch.Tensor:
    """General versor transformation: V x V^{-1} or hat{V} x V^{-1}.

    For even versors (rotors), this is the sandwich product.
    For odd versors (reflections), the grade involution is applied.
    The parity is determined from the grade structure of V.

    In practice, this computes: grade_involution(V) * x * V^{-1},
    which correctly handles both even and odd versors.

    Args:
        V (torch.Tensor): Versor [..., dim].
        x (torch.Tensor): Multivector to transform [..., dim].

    Returns:
        torch.Tensor: Transformed multivector [..., dim].
    """
    V_inv = self.blade_inverse(V)
    V_hat = self.grade_involution(V)
    return self.geometric_product(
        self.geometric_product(V_hat, x), V_inv
    )

exp(mv)

Exponentiates a bivector to produce a rotor.

Dispatches by :attr:exp_policy:

  • FAST -- closed-form only (fastest, approximate for non-simple bivectors in n >= 4).
  • AUTO (default) -- closed-form for n <= 3 (exact), compiled-safe decomposition for n >= 4.
  • EXACT -- always compiled-safe decomposition (exact for all bivectors, torch.compile-safe).
Three signature regimes handled in the closed-form path
  • Elliptic (B^2 < 0): exp(B) = cos(th) + sin(th)/th * B
  • Hyperbolic (B^2 > 0): exp(B) = cosh(th) + sinh(th)/th * B
  • Parabolic (B^2 ~ 0): exp(B) ~ 1 + B

Parameters:

Name Type Description Default
mv Tensor

Pure bivector [..., dim].

required

Returns:

Type Description
Tensor

torch.Tensor: Rotor exp(mv) [..., dim].

Source code in core/algebra.py
def exp(self, mv: torch.Tensor) -> torch.Tensor:
    """Exponentiates a bivector to produce a rotor.

    Dispatches by :attr:`exp_policy`:

    - ``FAST``  -- closed-form only (fastest, approximate for
      non-simple bivectors in n >= 4).
    - ``AUTO``  (default) -- closed-form for n <= 3 (exact),
      compiled-safe decomposition for n >= 4.
    - ``EXACT`` -- always compiled-safe decomposition (exact for
      all bivectors, ``torch.compile``-safe).

    Three signature regimes handled in the closed-form path:
        - Elliptic  (B^2 < 0): exp(B) = cos(th) + sin(th)/th * B
        - Hyperbolic (B^2 > 0): exp(B) = cosh(th) + sinh(th)/th * B
        - Parabolic  (B^2 ~ 0): exp(B) ~ 1 + B

    Args:
        mv (torch.Tensor): Pure bivector [..., dim].

    Returns:
        torch.Tensor: Rotor exp(mv) [..., dim].
    """
    from core.decomposition import ExpPolicy

    policy = self._exp_policy

    if policy == ExpPolicy.FAST:
        return self._exp_bivector_closed(mv)

    if policy == ExpPolicy.AUTO:
        if self.n <= 3:
            return self._exp_bivector_closed(mv)
        return self._exp_compiled_safe(mv)

    # ExpPolicy.EXACT
    return self._exp_compiled_safe(mv)

exp_decomposed(mv, **kwargs)

Exponentiates a bivector via decomposition into simple components.

.. deprecated:: Set :attr:exp_policy and call :meth:exp instead.

Parameters:

Name Type Description Default
mv Tensor

Pure bivector [..., dim].

required
**kwargs

Legacy kwargs (use_decomposition, k, etc.).

{}

Returns:

Type Description
Tensor

torch.Tensor: Rotor exp(mv) [..., dim].

Source code in core/algebra.py
def exp_decomposed(self, mv: torch.Tensor, **kwargs) -> torch.Tensor:
    """Exponentiates a bivector via decomposition into simple components.

    .. deprecated::
        Set :attr:`exp_policy` and call :meth:`exp` instead.

    Args:
        mv (torch.Tensor): Pure bivector [..., dim].
        **kwargs: Legacy kwargs (``use_decomposition``, ``k``, etc.).

    Returns:
        torch.Tensor: Rotor exp(mv) [..., dim].
    """
    import warnings
    warnings.warn(
        "exp_decomposed() is deprecated. Set algebra.exp_policy and "
        "use algebra.exp() instead.",
        DeprecationWarning,
        stacklevel=2,
    )
    # Legacy compat: use_decomposition=False means closed-form
    if kwargs.get('use_decomposition') is False:
        return self._exp_bivector_closed(mv)
    return self.exp(mv)

Multivector

Multivector

Object-oriented multivector wrapper with operator overloading.

Wraps a raw coefficient tensor and its parent CliffordAlgebra, exposing every core algebra operation as a method or Python operator.

Attributes:

Name Type Description
algebra CliffordAlgebra

The backend.

tensor Tensor

The raw data [..., Dim].

Source code in core/multivector.py
class Multivector:
    """Object-oriented multivector wrapper with operator overloading.

    Wraps a raw coefficient tensor and its parent ``CliffordAlgebra``,
    exposing every core algebra operation as a method or Python operator.

    Attributes:
        algebra (CliffordAlgebra): The backend.
        tensor (torch.Tensor): The raw data [..., Dim].
    """

    __slots__ = ("algebra", "tensor")

    def __init__(self, algebra: CliffordAlgebra, tensor: torch.Tensor):
        self.algebra = algebra
        self.tensor = tensor

    @classmethod
    def from_vectors(cls, algebra: CliffordAlgebra, vectors: torch.Tensor) -> Multivector:
        """Promotes vectors to multivectors (Grade 1)."""
        return cls(algebra, algebra.embed_vector(vectors))

    @classmethod
    def scalar(cls, algebra: CliffordAlgebra, value: float | torch.Tensor,
               batch_shape: tuple[int, ...] = ()) -> Multivector:
        """Creates a scalar multivector (grade 0 only)."""
        dim = 2 ** algebra.n
        t = torch.zeros(*batch_shape, dim, device=algebra.device,
                         dtype=algebra.dtype)
        t[..., 0] = value
        return cls(algebra, t)

    def __repr__(self):
        return (f"Multivector(shape={self.tensor.shape}, "
                f"algebra=Cl({self.algebra.p},{self.algebra.q},{self.algebra.r}))")

    def _check_algebra(self, other: Multivector) -> None:
        s, o = self.algebra, other.algebra
        if (s.p, s.q, s.r) != (o.p, o.q, o.r):
            raise ValueError(
                f"Algebra mismatch: Cl({s.p},{s.q},{s.r}) "
                f"vs Cl({o.p},{o.q},{o.r})")

    def _wrap(self, tensor: torch.Tensor) -> Multivector:
        return Multivector(self.algebra, tensor)

    def __add__(self, other):
        if isinstance(other, Multivector):
            self._check_algebra(other)
            return self._wrap(self.tensor + other.tensor)
        if isinstance(other, (int, float, torch.Tensor)):
            return self._wrap(self.tensor + other)
        return NotImplemented

    def __radd__(self, other):
        return self.__add__(other)

    def __sub__(self, other):
        if isinstance(other, Multivector):
            self._check_algebra(other)
            return self._wrap(self.tensor - other.tensor)
        if isinstance(other, (int, float, torch.Tensor)):
            return self._wrap(self.tensor - other)
        return NotImplemented

    def __rsub__(self, other):
        if isinstance(other, (int, float, torch.Tensor)):
            return self._wrap(other - self.tensor)
        return NotImplemented

    def __neg__(self):
        return self._wrap(-self.tensor)

    def __mul__(self, other):
        """Geometric product ``A * B``, or scalar scaling."""
        if isinstance(other, Multivector):
            self._check_algebra(other)
            return self._wrap(self.algebra.geometric_product(self.tensor, other.tensor))
        if isinstance(other, (int, float)):
            return self._wrap(self.tensor * other)
        if isinstance(other, torch.Tensor):
            return self._wrap(self.tensor * other)
        return NotImplemented

    def __rmul__(self, other):
        if isinstance(other, (int, float, torch.Tensor)):
            return self._wrap(self.tensor * other)
        return NotImplemented

    def __truediv__(self, other):
        if isinstance(other, (int, float)):
            return self._wrap(self.tensor / other)
        if isinstance(other, torch.Tensor):
            return self._wrap(self.tensor / other)
        return NotImplemented

    def __xor__(self, other):
        """Wedge (outer) product ``A ^ B``."""
        if isinstance(other, Multivector):
            self._check_algebra(other)
            return self._wrap(self.algebra.wedge(self.tensor, other.tensor))
        return NotImplemented

    def __or__(self, other):
        """Inner product ``A | B``."""
        if isinstance(other, Multivector):
            self._check_algebra(other)
            return self._wrap(self.algebra.inner_product(self.tensor, other.tensor))
        return NotImplemented

    def __invert__(self):
        """Reversion ``~A``."""
        return self._wrap(self.algebra.reverse(self.tensor))

    def grade(self, k: int) -> Multivector:
        """Extract the grade-k component."""
        return self._wrap(self.algebra.grade_projection(self.tensor, k))

    def reverse(self) -> Multivector:
        """Reversion (same as ``~self``)."""
        return self._wrap(self.algebra.reverse(self.tensor))

    def grade_involution(self) -> Multivector:
        """Grade involution (main involution): flips odd-grade signs."""
        return self._wrap(self.algebra.grade_involution(self.tensor))

    def clifford_conjugation(self) -> Multivector:
        """Clifford conjugation: grade_involution(reverse(x))."""
        return self._wrap(self.algebra.clifford_conjugation(self.tensor))

    def dual(self) -> Multivector:
        """Hodge dual: maps grade-k to grade-(n-k)."""
        return self._wrap(self.algebra.dual(self.tensor))

    def inverse(self) -> Multivector:
        """Blade inverse: B^{-1} = ~B / <B~B>_0."""
        return self._wrap(self.algebra.blade_inverse(self.tensor))

    def geometric_product(self, other: Multivector) -> Multivector:
        """Explicit geometric product (same as ``self * other``)."""
        self._check_algebra(other)
        return self._wrap(self.algebra.geometric_product(self.tensor, other.tensor))

    def wedge(self, other: Multivector) -> Multivector:
        """Wedge (outer) product (same as ``self ^ other``)."""
        self._check_algebra(other)
        return self._wrap(self.algebra.wedge(self.tensor, other.tensor))

    def inner(self, other: Multivector) -> Multivector:
        """Inner product (same as ``self | other``)."""
        self._check_algebra(other)
        return self._wrap(self.algebra.inner_product(self.tensor, other.tensor))

    def left_contraction(self, other: Multivector) -> Multivector:
        """Left contraction: ``self _| other``."""
        self._check_algebra(other)
        return self._wrap(self.algebra.left_contraction(self.tensor, other.tensor))

    def right_contraction(self, other: Multivector) -> Multivector:
        """Right contraction: ``self |_ other``."""
        self._check_algebra(other)
        return self._wrap(self.algebra.right_contraction(self.tensor, other.tensor))

    def commutator(self, other: Multivector) -> Multivector:
        """Commutator (Lie bracket): ``[self, other] = self*other - other*self``."""
        self._check_algebra(other)
        return self._wrap(self.algebra.commutator(self.tensor, other.tensor))

    def anti_commutator(self, other: Multivector) -> Multivector:
        """Anti-commutator: ``{self, other} = self*other + other*self``."""
        self._check_algebra(other)
        return self._wrap(self.algebra.anti_commutator(self.tensor, other.tensor))

    def norm(self) -> torch.Tensor:
        """Induced metric norm (returns scalar tensor)."""
        from core.metric import induced_norm
        return induced_norm(self.algebra, self.tensor)

    def norm_sq(self) -> torch.Tensor:
        """Squared norm: <x * ~x>_0 (returns scalar tensor)."""
        return self.algebra.norm_sq(self.tensor)

    def get_grade_norms(self) -> torch.Tensor:
        """Per-grade L2 norms."""
        return self.algebra.get_grade_norms(self.tensor)

    def exp(self) -> Multivector:
        """Exponential map (bivector -> rotor)."""
        return self._wrap(self.algebra.exp(self.tensor))

    def sandwich(self, x: Multivector) -> Multivector:
        """Sandwich product: ``self * x * ~self``.

        Falls back to two geometric products when the tensor shapes
        don't match the optimized [N, D] + [N, C, D] layout.
        """
        self._check_algebra(x)
        R, xt = self.tensor, x.tensor
        # Optimized path: R is [N, D], x is [N, C, D]
        if R.dim() == 2 and xt.dim() == 3:
            return self._wrap(self.algebra.sandwich_product(R, xt))
        # General fallback: two GPs
        R_rev = self.algebra.reverse(R)
        return self._wrap(
            self.algebra.geometric_product(
                self.algebra.geometric_product(R, xt), R_rev))

    def reflect(self, n: Multivector) -> Multivector:
        """Reflect self through hyperplane orthogonal to vector n."""
        self._check_algebra(n)
        return self._wrap(self.algebra.reflect(self.tensor, n.tensor))

    def versor_product(self, x: Multivector) -> Multivector:
        """General versor action: ``hat(self) * x * self^{-1}``."""
        self._check_algebra(x)
        return self._wrap(self.algebra.versor_product(self.tensor, x.tensor))

    def blade_project(self, blade: Multivector) -> Multivector:
        """Project onto blade subspace: ``(self · B) B^{-1}``."""
        self._check_algebra(blade)
        return self._wrap(self.algebra.blade_project(self.tensor, blade.tensor))

    def blade_reject(self, blade: Multivector) -> Multivector:
        """Reject from blade subspace: ``self - proj_B(self)``."""
        self._check_algebra(blade)
        return self._wrap(self.algebra.blade_reject(self.tensor, blade.tensor))

    def to(self, *args, **kwargs) -> Multivector:
        """Move/cast the underlying tensor (same API as ``torch.Tensor.to``)."""
        return self._wrap(self.tensor.to(*args, **kwargs))

    def detach(self) -> Multivector:
        """Detach from computation graph."""
        return self._wrap(self.tensor.detach())

    def clone(self) -> Multivector:
        """Clone the underlying tensor."""
        return self._wrap(self.tensor.clone())

    def requires_grad_(self, requires_grad: bool = True) -> Multivector:
        """Set requires_grad in-place."""
        self.tensor.requires_grad_(requires_grad)
        return self

    @property
    def shape(self) -> torch.Size:
        return self.tensor.shape

    @property
    def device(self) -> torch.device:
        return self.tensor.device

    @property
    def dtype(self) -> torch.dtype:
        return self.tensor.dtype

from_vectors(algebra, vectors) classmethod

Promotes vectors to multivectors (Grade 1).

Source code in core/multivector.py
@classmethod
def from_vectors(cls, algebra: CliffordAlgebra, vectors: torch.Tensor) -> Multivector:
    """Promotes vectors to multivectors (Grade 1)."""
    return cls(algebra, algebra.embed_vector(vectors))

scalar(algebra, value, batch_shape=()) classmethod

Creates a scalar multivector (grade 0 only).

Source code in core/multivector.py
@classmethod
def scalar(cls, algebra: CliffordAlgebra, value: float | torch.Tensor,
           batch_shape: tuple[int, ...] = ()) -> Multivector:
    """Creates a scalar multivector (grade 0 only)."""
    dim = 2 ** algebra.n
    t = torch.zeros(*batch_shape, dim, device=algebra.device,
                     dtype=algebra.dtype)
    t[..., 0] = value
    return cls(algebra, t)

grade(k)

Extract the grade-k component.

Source code in core/multivector.py
def grade(self, k: int) -> Multivector:
    """Extract the grade-k component."""
    return self._wrap(self.algebra.grade_projection(self.tensor, k))

reverse()

Reversion (same as ~self).

Source code in core/multivector.py
def reverse(self) -> Multivector:
    """Reversion (same as ``~self``)."""
    return self._wrap(self.algebra.reverse(self.tensor))

grade_involution()

Grade involution (main involution): flips odd-grade signs.

Source code in core/multivector.py
def grade_involution(self) -> Multivector:
    """Grade involution (main involution): flips odd-grade signs."""
    return self._wrap(self.algebra.grade_involution(self.tensor))

clifford_conjugation()

Clifford conjugation: grade_involution(reverse(x)).

Source code in core/multivector.py
def clifford_conjugation(self) -> Multivector:
    """Clifford conjugation: grade_involution(reverse(x))."""
    return self._wrap(self.algebra.clifford_conjugation(self.tensor))

dual()

Hodge dual: maps grade-k to grade-(n-k).

Source code in core/multivector.py
def dual(self) -> Multivector:
    """Hodge dual: maps grade-k to grade-(n-k)."""
    return self._wrap(self.algebra.dual(self.tensor))

inverse()

Blade inverse: B^{-1} = ~B / _0.

Source code in core/multivector.py
def inverse(self) -> Multivector:
    """Blade inverse: B^{-1} = ~B / <B~B>_0."""
    return self._wrap(self.algebra.blade_inverse(self.tensor))

geometric_product(other)

Explicit geometric product (same as self * other).

Source code in core/multivector.py
def geometric_product(self, other: Multivector) -> Multivector:
    """Explicit geometric product (same as ``self * other``)."""
    self._check_algebra(other)
    return self._wrap(self.algebra.geometric_product(self.tensor, other.tensor))

wedge(other)

Wedge (outer) product (same as self ^ other).

Source code in core/multivector.py
def wedge(self, other: Multivector) -> Multivector:
    """Wedge (outer) product (same as ``self ^ other``)."""
    self._check_algebra(other)
    return self._wrap(self.algebra.wedge(self.tensor, other.tensor))

inner(other)

Inner product (same as self | other).

Source code in core/multivector.py
def inner(self, other: Multivector) -> Multivector:
    """Inner product (same as ``self | other``)."""
    self._check_algebra(other)
    return self._wrap(self.algebra.inner_product(self.tensor, other.tensor))

left_contraction(other)

Left contraction: self _| other.

Source code in core/multivector.py
def left_contraction(self, other: Multivector) -> Multivector:
    """Left contraction: ``self _| other``."""
    self._check_algebra(other)
    return self._wrap(self.algebra.left_contraction(self.tensor, other.tensor))

right_contraction(other)

Right contraction: self |_ other.

Source code in core/multivector.py
def right_contraction(self, other: Multivector) -> Multivector:
    """Right contraction: ``self |_ other``."""
    self._check_algebra(other)
    return self._wrap(self.algebra.right_contraction(self.tensor, other.tensor))

commutator(other)

Commutator (Lie bracket): [self, other] = self*other - other*self.

Source code in core/multivector.py
def commutator(self, other: Multivector) -> Multivector:
    """Commutator (Lie bracket): ``[self, other] = self*other - other*self``."""
    self._check_algebra(other)
    return self._wrap(self.algebra.commutator(self.tensor, other.tensor))

anti_commutator(other)

Anti-commutator: {self, other} = self*other + other*self.

Source code in core/multivector.py
def anti_commutator(self, other: Multivector) -> Multivector:
    """Anti-commutator: ``{self, other} = self*other + other*self``."""
    self._check_algebra(other)
    return self._wrap(self.algebra.anti_commutator(self.tensor, other.tensor))

norm()

Induced metric norm (returns scalar tensor).

Source code in core/multivector.py
def norm(self) -> torch.Tensor:
    """Induced metric norm (returns scalar tensor)."""
    from core.metric import induced_norm
    return induced_norm(self.algebra, self.tensor)

norm_sq()

Squared norm: _0 (returns scalar tensor).

Source code in core/multivector.py
def norm_sq(self) -> torch.Tensor:
    """Squared norm: <x * ~x>_0 (returns scalar tensor)."""
    return self.algebra.norm_sq(self.tensor)

get_grade_norms()

Per-grade L2 norms.

Source code in core/multivector.py
def get_grade_norms(self) -> torch.Tensor:
    """Per-grade L2 norms."""
    return self.algebra.get_grade_norms(self.tensor)

exp()

Exponential map (bivector -> rotor).

Source code in core/multivector.py
def exp(self) -> Multivector:
    """Exponential map (bivector -> rotor)."""
    return self._wrap(self.algebra.exp(self.tensor))

sandwich(x)

Sandwich product: self * x * ~self.

Falls back to two geometric products when the tensor shapes don't match the optimized [N, D] + [N, C, D] layout.

Source code in core/multivector.py
def sandwich(self, x: Multivector) -> Multivector:
    """Sandwich product: ``self * x * ~self``.

    Falls back to two geometric products when the tensor shapes
    don't match the optimized [N, D] + [N, C, D] layout.
    """
    self._check_algebra(x)
    R, xt = self.tensor, x.tensor
    # Optimized path: R is [N, D], x is [N, C, D]
    if R.dim() == 2 and xt.dim() == 3:
        return self._wrap(self.algebra.sandwich_product(R, xt))
    # General fallback: two GPs
    R_rev = self.algebra.reverse(R)
    return self._wrap(
        self.algebra.geometric_product(
            self.algebra.geometric_product(R, xt), R_rev))

reflect(n)

Reflect self through hyperplane orthogonal to vector n.

Source code in core/multivector.py
def reflect(self, n: Multivector) -> Multivector:
    """Reflect self through hyperplane orthogonal to vector n."""
    self._check_algebra(n)
    return self._wrap(self.algebra.reflect(self.tensor, n.tensor))

versor_product(x)

General versor action: hat(self) * x * self^{-1}.

Source code in core/multivector.py
def versor_product(self, x: Multivector) -> Multivector:
    """General versor action: ``hat(self) * x * self^{-1}``."""
    self._check_algebra(x)
    return self._wrap(self.algebra.versor_product(self.tensor, x.tensor))

blade_project(blade)

Project onto blade subspace: (self · B) B^{-1}.

Source code in core/multivector.py
def blade_project(self, blade: Multivector) -> Multivector:
    """Project onto blade subspace: ``(self · B) B^{-1}``."""
    self._check_algebra(blade)
    return self._wrap(self.algebra.blade_project(self.tensor, blade.tensor))

blade_reject(blade)

Reject from blade subspace: self - proj_B(self).

Source code in core/multivector.py
def blade_reject(self, blade: Multivector) -> Multivector:
    """Reject from blade subspace: ``self - proj_B(self)``."""
    self._check_algebra(blade)
    return self._wrap(self.algebra.blade_reject(self.tensor, blade.tensor))

to(*args, **kwargs)

Move/cast the underlying tensor (same API as torch.Tensor.to).

Source code in core/multivector.py
def to(self, *args, **kwargs) -> Multivector:
    """Move/cast the underlying tensor (same API as ``torch.Tensor.to``)."""
    return self._wrap(self.tensor.to(*args, **kwargs))

detach()

Detach from computation graph.

Source code in core/multivector.py
def detach(self) -> Multivector:
    """Detach from computation graph."""
    return self._wrap(self.tensor.detach())

clone()

Clone the underlying tensor.

Source code in core/multivector.py
def clone(self) -> Multivector:
    """Clone the underlying tensor."""
    return self._wrap(self.tensor.clone())

requires_grad_(requires_grad=True)

Set requires_grad in-place.

Source code in core/multivector.py
def requires_grad_(self, requires_grad: bool = True) -> Multivector:
    """Set requires_grad in-place."""
    self.tensor.requires_grad_(requires_grad)
    return self

Metric

metric

Metric definitions for Clifford algebras.

Provides distances, norms, and inner products that respect the metric signature.

geometric_distance(algebra, A, B)

Computes geometric distance.

dist(A, B) = ||A - B||.

Parameters:

Name Type Description Default
algebra CliffordAlgebra

The algebra instance.

required
A Tensor

First multivector.

required
B Tensor

Second multivector.

required

Returns:

Type Description
Tensor

torch.Tensor: Distance.

Source code in core/metric.py
def geometric_distance(algebra: CliffordAlgebra, A: torch.Tensor, B: torch.Tensor) -> torch.Tensor:
    """Computes geometric distance.

    dist(A, B) = ||A - B||.

    Args:
        algebra (CliffordAlgebra): The algebra instance.
        A (torch.Tensor): First multivector.
        B (torch.Tensor): Second multivector.

    Returns:
        torch.Tensor: Distance.
    """
    diff = A - B
    return induced_norm(algebra, diff)

clifford_conjugate(algebra, mv)

Clifford conjugation (bar involution).

Combines reversion with grade involution

A_bar_k = (-1)^k * (-1)^{k(k-1)/2} * A_k

This is the natural *-involution on Cl(p,q). Useful for algebraic computations (e.g., spinor norms, Lipschitz groups).

Parameters:

Name Type Description Default
algebra CliffordAlgebra

The algebra instance.

required
mv Tensor

Multivector [..., Dim].

required

Returns:

Type Description
Tensor

Conjugated multivector [..., Dim].

Source code in core/metric.py
def clifford_conjugate(algebra: CliffordAlgebra, mv: torch.Tensor) -> torch.Tensor:
    """Clifford conjugation (bar involution).

    Combines reversion with grade involution:
        A_bar_k = (-1)^k * (-1)^{k(k-1)/2} * A_k

    This is the natural *-involution on Cl(p,q). Useful for
    algebraic computations (e.g., spinor norms, Lipschitz groups).

    Args:
        algebra: The algebra instance.
        mv: Multivector [..., Dim].

    Returns:
        Conjugated multivector [..., Dim].
    """
    result = mv.clone()
    for i in range(algebra.dim):
        k = bin(i).count('1')
        sign = ((-1) ** k) * ((-1) ** (k * (k - 1) // 2))
        if sign == -1:
            result[..., i] = -result[..., i]
    return result

hermitian_inner_product(algebra, A, B)

Hermitian inner product on Cl(p,q): _0.

_H = Sum_I (conj_sign_I * metric_sign_I) * a_I * b_I

Uses precomputed sign arrays so that the result equals the scalar part of the geometric product of the Clifford conjugate of A with B. For Euclidean algebras (q=0), all signs are +1 and this reduces to the simple coefficient inner product Sum a_I b_I.

Parameters:

Name Type Description Default
algebra CliffordAlgebra

The algebra instance.

required
A Tensor

First multivector [..., Dim].

required
B Tensor

Second multivector [..., Dim].

required

Returns:

Type Description
Tensor

Scalar inner product [..., 1].

Source code in core/metric.py
def hermitian_inner_product(algebra: CliffordAlgebra, A: torch.Tensor, B: torch.Tensor) -> torch.Tensor:
    """Hermitian inner product on Cl(p,q): <bar{A} B>_0.

    <A, B>_H = Sum_I (conj_sign_I * metric_sign_I) * a_I * b_I

    Uses precomputed sign arrays so that the result equals the scalar
    part of the geometric product of the Clifford conjugate of A with B.
    For Euclidean algebras (q=0), all signs are +1 and this reduces to
    the simple coefficient inner product Sum a_I b_I.

    Args:
        algebra: The algebra instance.
        A: First multivector [..., Dim].
        B: Second multivector [..., Dim].

    Returns:
        Scalar inner product [..., 1].
    """
    signs = _hermitian_signs(algebra)
    if signs.dtype != A.dtype:
        signs = signs.to(dtype=A.dtype)
    return (signs * A * B).sum(dim=-1, keepdim=True)

hermitian_norm(algebra, A)

Hermitian norm: ||A||_H = sqrt(|_H|).

Always real and non-negative for any signature. Uses abs() since the signed inner product can produce negative self-products in mixed-signature algebras.

Parameters:

Name Type Description Default
algebra CliffordAlgebra

The algebra instance.

required
A Tensor

Multivector [..., Dim].

required

Returns:

Type Description
Tensor

Norm [..., 1]. Always >= 0.

Source code in core/metric.py
def hermitian_norm(algebra: CliffordAlgebra, A: torch.Tensor) -> torch.Tensor:
    """Hermitian norm: ||A||_H = sqrt(|<A, A>_H|).

    Always real and non-negative for any signature.
    Uses abs() since the signed inner product can produce negative
    self-products in mixed-signature algebras.

    Args:
        algebra: The algebra instance.
        A: Multivector [..., Dim].

    Returns:
        Norm [..., 1]. Always >= 0.
    """
    sq = hermitian_inner_product(algebra, A, A)
    return torch.sqrt(torch.abs(sq))

hermitian_distance(algebra, A, B)

Hermitian distance: d_H(A, B) = ||A - B||_H.

Positive-definite metric distance for any signature. Satisfies: non-negativity, symmetry, triangle inequality, identity.

Parameters:

Name Type Description Default
algebra CliffordAlgebra

The algebra instance.

required
A Tensor

First multivector [..., Dim].

required
B Tensor

Second multivector [..., Dim].

required

Returns:

Type Description
Tensor

Distance [..., 1]. Always >= 0.

Source code in core/metric.py
def hermitian_distance(algebra: CliffordAlgebra, A: torch.Tensor, B: torch.Tensor) -> torch.Tensor:
    """Hermitian distance: d_H(A, B) = ||A - B||_H.

    Positive-definite metric distance for any signature.
    Satisfies: non-negativity, symmetry, triangle inequality, identity.

    Args:
        algebra: The algebra instance.
        A: First multivector [..., Dim].
        B: Second multivector [..., Dim].

    Returns:
        Distance [..., 1]. Always >= 0.
    """
    return hermitian_norm(algebra, A - B)

hermitian_angle(algebra, A, B)

Hermitian angle between multivectors.

cos(theta) = _H / (||A||_H * ||B||_H)

Parameters:

Name Type Description Default
algebra CliffordAlgebra

The algebra instance.

required
A Tensor

First multivector [..., Dim].

required
B Tensor

Second multivector [..., Dim].

required

Returns:

Type Description
Tensor

Angle in radians [..., 1].

Source code in core/metric.py
def hermitian_angle(algebra: CliffordAlgebra, A: torch.Tensor, B: torch.Tensor) -> torch.Tensor:
    """Hermitian angle between multivectors.

    cos(theta) = <A, B>_H / (||A||_H * ||B||_H)

    Args:
        algebra: The algebra instance.
        A: First multivector [..., Dim].
        B: Second multivector [..., Dim].

    Returns:
        Angle in radians [..., 1].
    """
    signs = _hermitian_signs(algebra)
    if signs.dtype != A.dtype:
        signs = signs.to(dtype=A.dtype)
    ip = (signs * A * B).sum(dim=-1, keepdim=True)
    sq_a = (signs * A * A).sum(dim=-1, keepdim=True)
    sq_b = (signs * B * B).sum(dim=-1, keepdim=True)
    # Use sqrt(sq_a * sq_b) instead of sqrt(sq_a)*sqrt(sq_b) to avoid
    # float32 precision loss from two separate sqrt operations.
    denom = torch.sqrt(torch.abs(sq_a) * torch.abs(sq_b)).clamp(min=algebra.eps)
    cos_theta = ip / denom
    cos_theta = torch.clamp(cos_theta, -1.0, 1.0)
    return torch.acos(cos_theta)

hermitian_grade_spectrum(algebra, A)

Full Hermitian grade spectrum.

Returns |_H| for each grade k = 0, ..., n. Uses abs() to ensure non-negative values in mixed signatures.

Parameters:

Name Type Description Default
algebra CliffordAlgebra

The algebra instance.

required
A Tensor

Multivector [..., Dim].

required

Returns:

Type Description
Tensor

Grade energies [..., n+1]. Each entry >= 0.

Source code in core/metric.py
def hermitian_grade_spectrum(algebra: CliffordAlgebra, A: torch.Tensor) -> torch.Tensor:
    """Full Hermitian grade spectrum.

    Returns |<A_k, A_k>_H| for each grade k = 0, ..., n.
    Uses abs() to ensure non-negative values in mixed signatures.

    Args:
        algebra: The algebra instance.
        A: Multivector [..., Dim].

    Returns:
        Grade energies [..., n+1]. Each entry >= 0.
    """
    signs = _hermitian_signs(algebra)
    if signs.dtype != A.dtype:
        signs = signs.to(dtype=A.dtype)
    spectrum = []
    for k in range(algebra.n + 1):
        A_k = algebra.grade_projection(A, k)
        sq = (signs * A_k * A_k).sum(dim=-1, keepdim=True)
        spectrum.append(torch.abs(sq))
    return torch.cat(spectrum, dim=-1)

signature_trace_form(algebra, A, B)

Signature-aware trace form: <~A B>_0.

The standard Clifford algebra scalar product. NOT positive-definite in mixed signatures. Use hermitian_inner_product for optimization.

This form is signature-aware and useful for: - Rotor normalization (R~R = 1) - Versor validation - Spinor norm computation

Parameters:

Name Type Description Default
algebra CliffordAlgebra

The algebra instance.

required
A Tensor

First multivector [..., Dim].

required
B Tensor

Second multivector [..., Dim].

required

Returns:

Type Description
Tensor

Scalar trace form [..., 1]. Can be negative in mixed signatures.

Source code in core/metric.py
def signature_trace_form(algebra: CliffordAlgebra, A: torch.Tensor, B: torch.Tensor) -> torch.Tensor:
    """Signature-aware trace form: <~A B>_0.

    The standard Clifford algebra scalar product. NOT positive-definite
    in mixed signatures. Use hermitian_inner_product for optimization.

    This form is signature-aware and useful for:
    - Rotor normalization (R~R = 1)
    - Versor validation
    - Spinor norm computation

    Args:
        algebra: The algebra instance.
        A: First multivector [..., Dim].
        B: Second multivector [..., Dim].

    Returns:
        Scalar trace form [..., 1]. Can be negative in mixed signatures.
    """
    A_rev = algebra.reverse(A)
    prod = algebra.geometric_product(A_rev, B)
    return prod[..., 0:1]

signature_norm_squared(algebra, A)

Signature-aware squared norm: _0.

Can be negative in mixed-signature algebras. Returns the raw value without absolute value, preserving causal structure information.

For Cl(n,0): always non-negative. For Cl(p,q) with q>0: sign encodes causal character.

Parameters:

Name Type Description Default
algebra CliffordAlgebra

The algebra instance.

required
A Tensor

Multivector [..., Dim].

required

Returns:

Type Description
Tensor

Signed squared norm [..., 1].

Source code in core/metric.py
def signature_norm_squared(algebra: CliffordAlgebra, A: torch.Tensor) -> torch.Tensor:
    """Signature-aware squared norm: <A~A>_0.

    Can be negative in mixed-signature algebras. Returns the raw value
    without absolute value, preserving causal structure information.

    For Cl(n,0): always non-negative.
    For Cl(p,q) with q>0: sign encodes causal character.

    Args:
        algebra: The algebra instance.
        A: Multivector [..., Dim].

    Returns:
        Signed squared norm [..., 1].
    """
    return signature_trace_form(algebra, A, A)

Decomposition

decomposition

Bivector decomposition via GA power iteration.

Decomposes a general bivector into simple (blade) components that can each be exponentiated with the closed-form formula.

Reference

Pence, T., Yamada, D., & Singh, V. (2025). "Composing Linear Layers from Irreducibles." arXiv:2507.11688v1 [cs.LG]

ExpPolicy

Bases: Enum

Policy controlling how CliffordAlgebra.exp() handles bivectors.

  • AUTO -- closed-form for n <= 3 (all bivectors simple), compiled-safe decomposition for n >= 4.
  • FAST -- closed-form only (_exp_bivector_closed). Ignores residual error for non-simple bivectors. Fastest path.
  • EXACT -- always use compiled-safe decomposition. Exact for all bivectors, torch.compile-safe (no CPU sync).
Source code in core/decomposition.py
class ExpPolicy(enum.Enum):
    """Policy controlling how ``CliffordAlgebra.exp()`` handles bivectors.

    - ``AUTO``  -- closed-form for n <= 3 (all bivectors simple),
                   compiled-safe decomposition for n >= 4.
    - ``FAST``  -- closed-form only (``_exp_bivector_closed``). Ignores
                   residual error for non-simple bivectors. Fastest path.
    - ``EXACT`` -- always use compiled-safe decomposition. Exact for all
                   bivectors, ``torch.compile``-safe (no CPU sync).
    """
    AUTO = "auto"
    FAST = "fast"
    EXACT = "exact"

ga_power_iteration(algebra, b, v_init=None, threshold=1e-06, max_iterations=100)

Find the dominant simple bivector component via power iteration.

Implements Algorithm 2 from Pence et al. (2025). Iterates v <- (b _| v) / ||b _| v|| until convergence, then recovers the simple projection b_s = sigma * (u ^ v).

Parameters:

Name Type Description Default
algebra CliffordAlgebra

CliffordAlgebra instance.

required
b Tensor

Bivector to decompose [..., dim].

required
v_init Optional[Tensor]

Initial grade-1 vector (random if None).

None
threshold float

Convergence tolerance on ||v - v_prev||.

1e-06
max_iterations int

Iteration cap.

100

Returns:

Type Description
Tensor

(b_s, v) where b_s is the simple projection and v the converged

Tensor

vector, both shaped [..., dim].

Source code in core/decomposition.py
def ga_power_iteration(
    algebra,
    b: torch.Tensor,
    v_init: Optional[torch.Tensor] = None,
    threshold: float = 1e-6,
    max_iterations: int = 100
) -> Tuple[torch.Tensor, torch.Tensor]:
    """Find the dominant simple bivector component via power iteration.

    Implements Algorithm 2 from Pence et al. (2025).  Iterates
    ``v <- (b _| v) / ||b _| v||`` until convergence, then recovers
    the simple projection ``b_s = sigma * (u ^ v)``.

    Args:
        algebra (CliffordAlgebra): CliffordAlgebra instance.
        b: Bivector to decompose [..., dim].
        v_init: Initial grade-1 vector (random if None).
        threshold: Convergence tolerance on ``||v - v_prev||``.
        max_iterations: Iteration cap.

    Returns:
        (b_s, v) where b_s is the simple projection and v the converged
        vector, both shaped [..., dim].
    """
    batch_shape = b.shape[:-1]
    device = b.device
    dtype = b.dtype

    if v_init is None:
        v_raw = torch.randn(*batch_shape, algebra.n, device=device, dtype=dtype)
        v = algebra.embed_vector(v_raw)
    else:
        v = v_init

    v_norm = v.norm(dim=-1, keepdim=True)
    v = v / v_norm.clamp(min=algebra.eps)

    for _ in range(max_iterations):
        v_prev = v
        v = algebra.right_contraction(b, v)
        v_norm = v.norm(dim=-1, keepdim=True)
        v = v / v_norm.clamp(min=algebra.eps)

        if (v - v_prev).norm(dim=-1).max() < threshold:
            break

    u = algebra.right_contraction(b, v)
    u_norm = u.norm(dim=-1, keepdim=True)
    u = u / u_norm.clamp(min=algebra.eps)

    b_s = u_norm * algebra.wedge(u, v)

    return b_s, v

differentiable_invariant_decomposition(algebra, b, k=None, threshold=1e-06, max_iterations=100)

Decompose a bivector into simple components via greedy projection.

Implements Algorithm 1 from Pence et al. (2025). Iteratively extracts the dominant simple component and subtracts it from the residual.

Parameters:

Name Type Description Default
algebra CliffordAlgebra

CliffordAlgebra instance.

required
b Tensor

Bivector [..., dim].

required
k Optional[int]

Number of components (auto = n(n-1)/2 if None).

None
threshold float

Stop when residual norm falls below this.

1e-06
max_iterations int

Per-component power iteration cap.

100

Returns:

Type Description
(decomp, vectors)

lists of simple bivectors and their

List[Tensor]

associated vectors.

Source code in core/decomposition.py
def differentiable_invariant_decomposition(
    algebra,
    b: torch.Tensor,
    k: Optional[int] = None,
    threshold: float = 1e-6,
    max_iterations: int = 100
) -> Tuple[List[torch.Tensor], List[torch.Tensor]]:
    """Decompose a bivector into simple components via greedy projection.

    Implements Algorithm 1 from Pence et al. (2025).  Iteratively
    extracts the dominant simple component and subtracts it from the
    residual.

    Args:
        algebra (CliffordAlgebra): CliffordAlgebra instance.
        b: Bivector [..., dim].
        k: Number of components (auto = n(n-1)/2 if None).
        threshold: Stop when residual norm falls below this.
        max_iterations: Per-component power iteration cap.

    Returns:
        (decomp, vectors): lists of simple bivectors and their
        associated vectors.
    """
    n = algebra.n
    k_max = (n * (n - 1)) // 2
    k = min(k, k_max) if k is not None else k_max

    decomp: List[torch.Tensor] = []
    vectors: List[torch.Tensor] = []
    residual = b.clone()

    for _ in range(k):
        if residual.norm(dim=-1).max() < threshold:
            break

        b_i, v_i = ga_power_iteration(
            algebra, residual, threshold=threshold, max_iterations=max_iterations
        )
        decomp.append(b_i)
        vectors.append(v_i)
        residual = residual - b_i

    return decomp, vectors

exp_simple_bivector(algebra, b)

Closed-form exponential of a simple bivector.

Delegates to algebra._exp_bivector_closed which handles all three signature regimes (elliptic, hyperbolic, parabolic).

Parameters:

Name Type Description Default
algebra CliffordAlgebra

CliffordAlgebra instance.

required
b Tensor

Simple bivector [..., dim].

required

Returns:

Type Description
Tensor

Rotor exp(b) [..., dim].

Source code in core/decomposition.py
def exp_simple_bivector(algebra, b: torch.Tensor) -> torch.Tensor:
    """Closed-form exponential of a simple bivector.

    Delegates to ``algebra._exp_bivector_closed`` which handles all
    three signature regimes (elliptic, hyperbolic, parabolic).

    Args:
        algebra (CliffordAlgebra): CliffordAlgebra instance.
        b: Simple bivector [..., dim].

    Returns:
        Rotor exp(b) [..., dim].
    """
    return algebra._exp_bivector_closed(b)

exp_decomposed(algebra, b, use_decomposition=True, k=None, threshold=1e-06, max_iterations=100)

Exponentiate a bivector via decomposition into simple components.

.. deprecated:: Use algebra.exp(b) with algebra.exp_policy set instead. This function is kept for backward compatibility.

When use_decomposition is True the bivector is decomposed into simple blades (via differentiable_invariant_decomposition), each is exponentiated in closed form, and the rotors are composed via geometric product.

During training the power iteration loop is detached (run in forward-only mode) so that gradients do not flow through the normalization divisions. Gradients instead flow through the closed-form exp of each component and the final GP composition. This is stable for all bivector magnitudes.

Parameters:

Name Type Description Default
algebra CliffordAlgebra

CliffordAlgebra instance.

required
b Tensor

Bivector [..., dim].

required
use_decomposition bool

Enable decomposition (False -> algebra._exp_bivector_closed).

True
k Optional[int]

Number of simple components (auto if None).

None
threshold float

Convergence threshold.

1e-06
max_iterations int

Power iteration cap.

100

Returns:

Type Description
Tensor

Rotor exp(b) [..., dim].

Source code in core/decomposition.py
def exp_decomposed(
    algebra,
    b: torch.Tensor,
    use_decomposition: bool = True,
    k: Optional[int] = None,
    threshold: float = 1e-6,
    max_iterations: int = 100
) -> torch.Tensor:
    """Exponentiate a bivector via decomposition into simple components.

    .. deprecated::
        Use ``algebra.exp(b)`` with ``algebra.exp_policy`` set instead.
        This function is kept for backward compatibility.

    When ``use_decomposition`` is True the bivector is decomposed into
    simple blades (via ``differentiable_invariant_decomposition``), each
    is exponentiated in closed form, and the rotors are composed via
    geometric product.

    During training the power iteration loop is **detached** (run in
    forward-only mode) so that gradients do not flow through the
    normalization divisions.  Gradients instead flow through the
    closed-form exp of each component and the final GP composition.
    This is stable for all bivector magnitudes.

    Args:
        algebra (CliffordAlgebra): CliffordAlgebra instance.
        b: Bivector [..., dim].
        use_decomposition: Enable decomposition (False -> ``algebra._exp_bivector_closed``).
        k: Number of simple components (auto if None).
        threshold: Convergence threshold.
        max_iterations: Power iteration cap.

    Returns:
        Rotor exp(b) [..., dim].
    """
    warnings.warn(
        "exp_decomposed() is deprecated. Set algebra.exp_policy and use "
        "algebra.exp() instead.",
        DeprecationWarning,
        stacklevel=2,
    )

    if not use_decomposition:
        return algebra._exp_bivector_closed(b)

    # Detach for decomposition (power iteration is not differentiable)
    # then re-project the original bivector onto the discovered planes.
    with torch.no_grad():
        decomp, _ = differentiable_invariant_decomposition(
            algebra, b.detach(), k=k, threshold=threshold,
            max_iterations=max_iterations
        )

    if len(decomp) == 0:
        result = torch.zeros_like(b)
        result[..., 0] = 1.0
        return result

    bv_mask = algebra.grade_masks[2]
    if bv_mask.device != b.device:
        bv_mask = bv_mask.to(b.device)

    rotors = []
    residual = b
    for b_i_detached in decomp:
        # Plane direction (unit simple bivector) -- detached
        plane_norm = b_i_detached.norm(dim=-1, keepdim=True).clamp(min=algebra.eps_sq)
        plane_dir = b_i_detached / plane_norm  # detached unit plane

        # Project the live bivector onto this plane
        bv_live = residual[..., bv_mask]
        plane_bv = plane_dir[..., bv_mask]
        coeff = (bv_live * plane_bv).sum(dim=-1, keepdim=True)

        b_i_live = coeff * plane_dir
        residual = residual - b_i_live

        rotors.append(exp_simple_bivector(algebra, b_i_live))

    result = rotors[0]
    for R_i in rotors[1:]:
        result = algebra.geometric_product(result, R_i)

    return result

compiled_safe_decomposed_exp(algebra, b, k=None, fixed_iterations=20)

Compile-safe decomposed exponential -- no CPU sync.

Decomposes b into simple blades under torch.no_grad(), re-projects the live (gradient-carrying) bivector onto each discovered plane, exponentiates each in closed form, and composes via geometric product.

Parameters:

Name Type Description Default
algebra

CliffordAlgebra instance.

required
b Tensor

Bivector [..., dim].

required
k Optional[int]

Number of simple components (default n // 2).

None
fixed_iterations int

Power iteration steps per component.

20

Returns:

Type Description
Tensor

Rotor exp(b) [..., dim].

Source code in core/decomposition.py
def compiled_safe_decomposed_exp(
    algebra,
    b: torch.Tensor,
    k: Optional[int] = None,
    fixed_iterations: int = 20,
) -> torch.Tensor:
    """Compile-safe decomposed exponential -- no CPU sync.

    Decomposes ``b`` into simple blades under ``torch.no_grad()``,
    re-projects the live (gradient-carrying) bivector onto each
    discovered plane, exponentiates each in closed form, and composes
    via geometric product.

    Args:
        algebra: CliffordAlgebra instance.
        b: Bivector [..., dim].
        k: Number of simple components (default ``n // 2``).
        fixed_iterations: Power iteration steps per component.

    Returns:
        Rotor exp(b) [..., dim].
    """
    n = algebra.n
    k_actual = k if k is not None else n // 2
    k_actual = max(k_actual, 1)

    # Identity rotor fallback
    identity = torch.zeros_like(b)
    identity[..., 0] = 1.0

    # Decompose (no grad -- power iteration not differentiable)
    with torch.no_grad():
        decomp = _decompose_compiled_safe(
            algebra, b.detach(), k=k_actual, fixed_iterations=fixed_iterations
        )

    bv_mask = algebra.grade_masks[2]

    # Re-project live bivector and compose rotors
    result = identity
    residual = b
    for b_i_detached in decomp:
        plane_norm = b_i_detached.norm(dim=-1, keepdim=True).clamp(min=algebra.eps_sq)
        plane_dir = b_i_detached / plane_norm

        bv_live = residual[..., bv_mask]
        plane_bv = plane_dir[..., bv_mask]
        coeff = (bv_live * plane_bv).sum(dim=-1, keepdim=True)

        b_i_live = coeff * plane_dir
        residual = residual - b_i_live

        R_i = algebra._exp_bivector_closed(b_i_live)
        result = algebra.geometric_product(result, R_i)

    return result

Analysis

MetricSearch

Learns optimal (p, q, r) signature via GBN probe training and bivector energy analysis.

Trains small single-rotor GBN probes on conformally-lifted data using coherence + curvature as the loss. After training, reads the learned bivector energy distribution to infer the optimal signature.

Multiple probes with biased initialization combat local minima.

Source code in core/analysis/signature.py
class MetricSearch:
    """Learns optimal (p, q, r) signature via GBN probe training and bivector
    energy analysis.

    Trains small single-rotor GBN probes on conformally-lifted data using
    coherence + curvature as the loss.  After training, reads the learned
    bivector energy distribution to infer the optimal signature.

    Multiple probes with biased initialization combat local minima.
    """

    def __init__(
        self,
        device: str = 'cpu',
        num_probes: int = 6,
        probe_epochs: int = 80,
        probe_lr: float = 0.005,
        probe_channels: int = 4,
        k: int = 8,
        energy_threshold: float = 0.05,
        curvature_weight: float = 0.3,
        sparsity_weight: float = 0.01,
        max_workers: Optional[int] = None,
        micro_batch_size: Optional[int] = None,
        early_stop_patience: int = 0,
    ):
        self.device = device
        self.num_probes = num_probes
        self.probe_epochs = probe_epochs
        self.probe_lr = probe_lr
        self.probe_channels = probe_channels
        self.k = k
        self.energy_threshold = energy_threshold
        self.curvature_weight = curvature_weight
        self.sparsity_weight = sparsity_weight
        self.max_workers = max_workers
        self.micro_batch_size = micro_batch_size
        self.early_stop_patience = early_stop_patience

    def _lift_data(self, data: torch.Tensor) -> Tuple[torch.Tensor, CliffordAlgebra]:
        """Lifts [N, X] data into Cl(X+1, 1, 0) via CGA-style embedding."""
        data = data.to(self.device)
        N, X = data.shape

        if X + 2 > 12:
            warnings.warn(
                f"Data dimension {X} yields algebra dim 2^{X+2}={2**(X+2)}. "
                f"Consider PCA pre-reduction to X <= 10 for tractable computation."
            )

        norm_sq = 0.5 * (data ** 2).sum(dim=-1, keepdim=True)
        ones = torch.ones(N, 1, device=self.device, dtype=data.dtype)
        lifted = torch.cat([data, norm_sq, ones], dim=-1)

        algebra = CliffordAlgebra(X + 1, 1, 0, device=self.device)
        mv = algebra.embed_vector(lifted)
        mv = mv.unsqueeze(1)
        return mv, algebra

    def _train_probe(
        self,
        mv_data: torch.Tensor,
        algebra: CliffordAlgebra,
        bias_type: str = 'random',
    ) -> Dict:
        """Trains a single probe and returns results."""
        probe = _SignatureProbe(algebra, channels=self.probe_channels)
        probe.to(self.device)
        _apply_biased_init(probe, algebra, bias_type)

        gf = GeodesicFlow(algebra, k=self.k)
        optimizer = torch.optim.Adam(probe.parameters(), lr=self.probe_lr)

        best_loss = float('inf')
        best_state = None
        patience_counter = 0
        N = mv_data.shape[0]

        for _ in range(self.probe_epochs):
            if self.micro_batch_size and self.micro_batch_size < N:
                idx = torch.randperm(N, device=mv_data.device)[:self.micro_batch_size]
                batch = mv_data[idx]
            else:
                batch = mv_data

            optimizer.zero_grad()
            output = probe(batch)
            output_flat = output.squeeze(1)

            coherence_t = gf._coherence_tensor(output_flat)
            curvature_t = gf._curvature_tensor(output_flat)

            sparsity = sum(
                r.sparsity_loss() for r in probe.get_rotor_layers()
            )

            loss = (
                -coherence_t
                + self.curvature_weight * curvature_t
                + self.sparsity_weight * sparsity
            )

            loss.backward()
            optimizer.step()

            loss_val = loss.item()
            if loss_val < best_loss:
                best_loss = loss_val
                best_state = copy.deepcopy(probe.state_dict())
                patience_counter = 0
            elif self.early_stop_patience > 0:
                patience_counter += 1
                if patience_counter >= self.early_stop_patience:
                    break

        if best_state is not None:
            probe.load_state_dict(best_state)

        with torch.no_grad():
            output = probe(mv_data).squeeze(1)
            coh = gf.coherence(output)
            curv = gf.curvature(output)

        return {
            'loss': best_loss,
            'coherence': coh,
            'curvature': curv,
            'probe': probe,
        }

    def _analyze_bivector_energy(
        self,
        probe: _SignatureProbe,
        algebra: CliffordAlgebra,
        original_dim: int,
    ) -> Tuple[Tuple[int, int, int], Dict]:
        """Maps learned bivector energy to (p, q, r) signature."""
        bv_sq = algebra.bv_sq_scalar
        bv_mask = algebra.grade_masks[2]
        bv_indices = bv_mask.nonzero(as_tuple=False).squeeze(-1)

        total_energy = torch.zeros(len(bv_indices), device=self.device)
        n_layers = 0
        for rotor in probe.get_rotor_layers():
            with torch.no_grad():
                energy = (rotor.bivector_weights ** 2).mean(dim=0)
                total_energy += energy
                n_layers += 1

        if n_layers > 0:
            total_energy /= n_layers

        max_energy = total_energy.max().clamp(min=algebra.eps)
        normalized_energy = total_energy / max_energy

        n = algebra.n
        base_type_energy: dict = {}
        base_active: dict = {}

        for bv_idx_pos, blade_idx in enumerate(bv_indices.tolist()):
            energy_val = normalized_energy[bv_idx_pos].item()
            bits = []
            for bit in range(n):
                if blade_idx & (1 << bit):
                    bits.append(bit)
            if len(bits) != 2:
                continue

            sq_val = bv_sq[bv_idx_pos].item()
            if sq_val < CONSTANTS.bv_sq_elliptic_bound:
                sig_type = 'elliptic'
            elif sq_val > CONSTANTS.bv_sq_hyperbolic_bound:
                sig_type = 'hyperbolic'
            else:
                sig_type = 'null'

            for b in bits:
                if b not in base_type_energy:
                    base_type_energy[b] = {
                        'elliptic': 0.0, 'hyperbolic': 0.0, 'null': 0.0,
                    }
                    base_active[b] = 0.0
                base_type_energy[b][sig_type] += energy_val
                base_active[b] = max(base_active[b], energy_val)

        active_positive = 0
        active_negative = 0
        active_null = 0

        for b_idx in range(n):
            if b_idx not in base_active or base_active[b_idx] < self.energy_threshold:
                continue
            type_energy = base_type_energy[b_idx]
            dominant = max(type_energy, key=type_energy.get)
            if dominant == 'null':
                active_null += 1
            elif dominant == 'hyperbolic':
                active_negative += 1
            else:
                active_positive += 1

        p = max(0, active_positive - 1)
        q = max(0, active_negative - 1)
        r = active_null

        total = p + q + r
        if total > original_dim:
            scale = original_dim / max(total, 1)
            p = max(1, round(p * scale))
            q = round(q * scale)
            r = round(r * scale)
            while p + q + r > original_dim:
                if r > 0:
                    r -= 1
                elif q > 0:
                    q -= 1
                else:
                    p -= 1
        elif total == 0:
            p = original_dim

        energy_breakdown = {
            'per_bivector_energy': normalized_energy.tolist(),
            'active_positive': active_positive,
            'active_negative': active_negative,
            'active_null': active_null,
            'bv_sq_scalar': bv_sq.tolist(),
        }

        return (p, q, r), energy_breakdown

    def search(self, data: torch.Tensor) -> Tuple[int, int, int]:
        """Returns optimal (p, q, r) signature for the data.

        Args:
            data (torch.Tensor): Input data [N, D].

        Returns:
            Tuple[int, int, int]: Optimal signature (p, q, r).
        """
        result = self.search_detailed(data)
        return result['signature']

    def search_detailed(self, data: torch.Tensor) -> Dict:
        """Returns signature and full diagnostics.

        Args:
            data (torch.Tensor): Input data [N, D].

        Returns:
            Dict: Diagnostics with 'signature', 'coherence', 'curvature',
                'energy_breakdown', 'per_probe_results'.
        """
        data = data.to(self.device)
        N, X = data.shape

        mv_data, algebra = self._lift_data(data)

        bias_types = ['euclidean', 'minkowski', 'projective']
        while len(bias_types) < self.num_probes:
            bias_types.append('random')
        bias_types = bias_types[:self.num_probes]

        def _run_probe(bias_type):
            return self._train_probe(mv_data, algebra, bias_type)

        if self.num_probes <= 2:
            probe_results = [_run_probe(bt) for bt in bias_types]
        else:
            max_w = self.max_workers or min(self.num_probes, 4)
            with concurrent.futures.ThreadPoolExecutor(max_workers=max_w) as pool:
                futures = [pool.submit(_run_probe, bt) for bt in bias_types]
                probe_results = [f.result() for f in futures]

        best_idx = min(
            range(len(probe_results)), key=lambda i: probe_results[i]['loss']
        )
        best = probe_results[best_idx]

        signature, energy_breakdown = self._analyze_bivector_energy(
            best['probe'], algebra, X
        )

        return {
            'signature': signature,
            'coherence': best['coherence'],
            'curvature': best['curvature'],
            'energy_breakdown': energy_breakdown,
            'per_probe_results': [
                {
                    'loss': r['loss'],
                    'coherence': r['coherence'],
                    'curvature': r['curvature'],
                }
                for r in probe_results
            ],
        }

search(data)

Returns optimal (p, q, r) signature for the data.

Parameters:

Name Type Description Default
data Tensor

Input data [N, D].

required

Returns:

Type Description
Tuple[int, int, int]

Tuple[int, int, int]: Optimal signature (p, q, r).

Source code in core/analysis/signature.py
def search(self, data: torch.Tensor) -> Tuple[int, int, int]:
    """Returns optimal (p, q, r) signature for the data.

    Args:
        data (torch.Tensor): Input data [N, D].

    Returns:
        Tuple[int, int, int]: Optimal signature (p, q, r).
    """
    result = self.search_detailed(data)
    return result['signature']

search_detailed(data)

Returns signature and full diagnostics.

Parameters:

Name Type Description Default
data Tensor

Input data [N, D].

required

Returns:

Name Type Description
Dict Dict

Diagnostics with 'signature', 'coherence', 'curvature', 'energy_breakdown', 'per_probe_results'.

Source code in core/analysis/signature.py
def search_detailed(self, data: torch.Tensor) -> Dict:
    """Returns signature and full diagnostics.

    Args:
        data (torch.Tensor): Input data [N, D].

    Returns:
        Dict: Diagnostics with 'signature', 'coherence', 'curvature',
            'energy_breakdown', 'per_probe_results'.
    """
    data = data.to(self.device)
    N, X = data.shape

    mv_data, algebra = self._lift_data(data)

    bias_types = ['euclidean', 'minkowski', 'projective']
    while len(bias_types) < self.num_probes:
        bias_types.append('random')
    bias_types = bias_types[:self.num_probes]

    def _run_probe(bias_type):
        return self._train_probe(mv_data, algebra, bias_type)

    if self.num_probes <= 2:
        probe_results = [_run_probe(bt) for bt in bias_types]
    else:
        max_w = self.max_workers or min(self.num_probes, 4)
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_w) as pool:
            futures = [pool.submit(_run_probe, bt) for bt in bias_types]
            probe_results = [f.result() for f in futures]

    best_idx = min(
        range(len(probe_results)), key=lambda i: probe_results[i]['loss']
    )
    best = probe_results[best_idx]

    signature, energy_breakdown = self._analyze_bivector_energy(
        best['probe'], algebra, X
    )

    return {
        'signature': signature,
        'coherence': best['coherence'],
        'curvature': best['curvature'],
        'energy_breakdown': energy_breakdown,
        'per_probe_results': [
            {
                'loss': r['loss'],
                'coherence': r['coherence'],
                'curvature': r['curvature'],
            }
            for r in probe_results
        ],
    }

GeodesicFlow

Geodesic flow analysis in Clifford algebra.

Interprets data points as grade-1 multivectors and computes the flow field -- a bivector at each point that encodes the direction of shortest algebraic paths to its k-nearest neighbours.

The flow is computed as the mean of connection bivectors:

B_ij = <x_i . ~x_j>_2   (grade-2 part of the geometric product)

This bivector encodes the rotational "turn" needed to map x_i toward x_j, analogous to the parallel transport connection on a Lie group.

The coherence and curvature of this field reveal whether the data has causal (directional) structure:

  • High coherence, low curvature -> the flow is smooth and aligned in one direction. Causality is visible.
  • Low coherence, high curvature -> the flow is fragmented and collides with itself. The signal is dominated by noise.
Source code in core/analysis/geodesic.py
class GeodesicFlow:
    """Geodesic flow analysis in Clifford algebra.

    Interprets data points as grade-1 multivectors and computes the *flow
    field* -- a bivector at each point that encodes the direction of shortest
    algebraic paths to its k-nearest neighbours.

    The flow is computed as the mean of *connection bivectors*:

        B_ij = <x_i . ~x_j>_2   (grade-2 part of the geometric product)

    This bivector encodes the rotational "turn" needed to map x_i toward
    x_j, analogous to the parallel transport connection on a Lie group.

    The coherence and curvature of this field reveal whether the data has
    causal (directional) structure:

    - **High coherence, low curvature** -> the flow is smooth and aligned in
      one direction.  Causality is visible.
    - **Low coherence, high curvature** -> the flow is fragmented and
      collides with itself.  The signal is dominated by noise.
    """

    def __init__(self, algebra: CliffordAlgebra, k: int = 8):
        """Initialize Geodesic Flow.

        Args:
            algebra (CliffordAlgebra): The algebra instance.
            k (int): Number of nearest neighbours for the flow field.
        """
        self.algebra = algebra
        self.k = k

    def _embed(self, data: torch.Tensor) -> torch.Tensor:
        """Embeds raw vectors into the grade-1 multivector subspace.

        Args:
            data (torch.Tensor): ``[N, d]`` where d <= algebra.n.

        Returns:
            torch.Tensor: ``[N, algebra.dim]`` grade-1 multivectors.
        """
        N, d = data.shape
        n = self.algebra.n
        if d > n:
            raise ValueError(
                f"Data dimension {d} exceeds algebra dimension {n}. "
                f"Use DimensionLifter to lift data before flow analysis."
            )
        if d < n:
            pad = torch.zeros(N, n - d, device=data.device, dtype=data.dtype)
            data = torch.cat([data, pad], dim=-1)
        return self.algebra.embed_vector(data)

    def _knn(self, mv: torch.Tensor) -> torch.Tensor:
        """Returns k-nearest neighbour indices in multivector coefficient space.

        Args:
            mv (torch.Tensor): ``[N, dim]`` multivectors.

        Returns:
            torch.Tensor: ``[N, k]`` neighbour indices.
        """
        N = mv.shape[0]
        k = min(self.k, N - 1)
        diff = mv.unsqueeze(1) - mv.unsqueeze(0)     # [N, N, dim]
        dists = diff.norm(dim=-1)                    # [N, N]
        dists.fill_diagonal_(float('inf'))
        _, idx = dists.topk(k, dim=-1, largest=False)
        return idx                                    # [N, k]

    def _connection_bivectors(self, mv: torch.Tensor) -> torch.Tensor:
        """Computes unit connection bivectors for all (point, neighbour) pairs.

        The connection bivector from x_i to x_j encodes the rotational "turn"
        in the algebra needed to map one vector toward the other:

            B_ij = unit( <x_i . ~x_j>_2 )

        Args:
            mv (torch.Tensor): ``[N, dim]`` grade-1 multivectors.

        Returns:
            torch.Tensor: ``[N, k, dim]`` unit connection bivectors.
        """
        N, D = mv.shape
        k = min(self.k, N - 1)
        nn_idx = self._knn(mv)

        neighbors = mv[nn_idx]                          # [N, k, dim]
        xi = mv.unsqueeze(1).expand(N, k, D).reshape(N * k, D)
        xj_rev = self.algebra.reverse(neighbors.reshape(N * k, D))

        # For grade-1 inputs, <xi * ~xj>_2 = wedge(xi, xj_rev) -- single pass
        bv_raw = self.algebra.wedge(xi, xj_rev)                   # [N*k, dim]
        bv_norm = bv_raw.norm(dim=-1, keepdim=True).clamp(min=self.algebra.eps)
        return (bv_raw / bv_norm).reshape(N, k, D)               # [N, k, dim]

    def flow_bivectors(self, mv: torch.Tensor) -> torch.Tensor:
        """Computes the mean flow bivector at each data point.

        For each point x_i, aggregates the unit connection bivectors to its
        k-nearest neighbours:

            B_i = mean_j { unit( <x_i . ~x_j>_2 ) }

        .. note::
            For perfectly symmetric data (e.g. a closed circle) the mean
            cancels to zero -- which is geometrically correct since there is
            no preferred flow direction.  Use :meth:`coherence` to measure
            structure without this cancellation.

        Args:
            mv (torch.Tensor): ``[N, dim]`` grade-1 multivectors.

        Returns:
            torch.Tensor: ``[N, dim]`` mean flow bivectors.
        """
        bv = self._connection_bivectors(mv)   # [N, k, dim]
        return bv.mean(dim=1)                 # [N, dim]

    def _coherence_tensor(self, mv: torch.Tensor) -> torch.Tensor:
        """Differentiable coherence -- returns a scalar tensor with grad_fn.

        Args:
            mv (torch.Tensor): ``[N, dim]`` multivectors.

        Returns:
            torch.Tensor: Scalar coherence in [0, 1].
        """
        bv = self._connection_bivectors(mv)   # [N, k, dim]
        N, k, D = bv.shape

        bi = bv.unsqueeze(2)                  # [N, k, 1, dim]
        bj = bv.unsqueeze(1)                  # [N, 1, k, dim]
        abs_cos = (bi * bj).sum(dim=-1).abs() # [N, k, k]

        mask = ~torch.eye(k, dtype=torch.bool, device=mv.device)  # [k, k]
        off_diag = abs_cos[:, mask]           # [N, k*(k-1)]
        return off_diag.mean()

    def coherence(self, mv: torch.Tensor) -> float:
        """Measures concentration of connection bivectors within each neighbourhood.

        For each point, computes the mean **absolute** cosine similarity between
        all pairs of its k connection bivectors.  This captures how consistently
        the neighbourhood connections lie on the same rotation plane.

        - **1.0**: all connections at every point are parallel or anti-parallel
          (maximally structured).
        - **1/num_bivectors** (~= baseline): connections point in random directions.

        .. note::
            In Cl(2,0) the grade-2 space is 1-dimensional (only e_12), so
            coherence is trivially 1.0 for any data -- use at least Cl(3,0)
            for meaningful discrimination.

        Args:
            mv (torch.Tensor): ``[N, dim]`` multivectors.

        Returns:
            float: Coherence score in [0, 1].
        """
        return self._coherence_tensor(mv).item()

    def _curvature_tensor(self, mv: torch.Tensor) -> torch.Tensor:
        """Differentiable curvature -- returns a scalar tensor with grad_fn.

        Args:
            mv (torch.Tensor): ``[N, dim]`` multivectors.

        Returns:
            torch.Tensor: Scalar curvature in [0, 1].
        """
        bv = self._connection_bivectors(mv)   # [N, k, dim]
        N, k, D = bv.shape
        nn_idx = self._knn(mv)                # [N, k_nn]

        bi = bv.unsqueeze(2)                             # [N, k, 1, dim]
        bj_all = bv[nn_idx]                              # [N, k_nn, k, dim]

        bj = bj_all[:, 0]                                # [N, k, dim]
        bj = bj.unsqueeze(1)                             # [N, 1, k, dim]

        cross_cos = (bi * bj).sum(dim=-1).abs()          # [N, k, k]
        alignment = cross_cos.mean(dim=(-1, -2))         # [N]

        return 1.0 - alignment.mean()

    def curvature(self, mv: torch.Tensor) -> float:
        """Measures how much connection structure changes across the manifold.

        Computes the mean **dissimilarity** of connection bivectors between
        neighbouring pairs of points:

            dissimilarity(i, j) = 1 - mean_abs_cos( {B_ia}, {B_jb} )

        where {B_ia} is the set of k unit connection bivectors at point i and
        {B_jb} at point j, and mean_abs_cos is the cross-set absolute cosine
        similarity.

        - **0.0**: all neighbouring points have the same connection structure
          (flat geodesics, smooth manifold).
        - **High**: the connection direction changes rapidly between neighbours
          (high curvature, fragmented flow).

        Args:
            mv (torch.Tensor): ``[N, dim]`` multivectors.

        Returns:
            float: Curvature score in [0, 1].
        """
        return self._curvature_tensor(mv).item()

    def interpolate(
        self,
        a: torch.Tensor,
        b: torch.Tensor,
        steps: int = 10,
    ) -> torch.Tensor:
        """Interpolates along the geodesic from ``a`` to ``b``.

        Uses the *Lie group exponential map* on the transition element:

            T = a_inv . b
            log(T) ~= <T - 1>_2     (grade-2 approximation for small angles)
            gamma(t) = a . exp(t . log(T))

        Exact when a and b are close; a first-order approximation otherwise.

        Args:
            a (torch.Tensor): Start multivector ``[dim]``.
            b (torch.Tensor): End multivector ``[dim]``.
            steps (int): Number of interpolation steps (including endpoints).

        Returns:
            torch.Tensor: ``[steps, dim]`` sequence of multivectors.
        """
        a = a.unsqueeze(0)  # [1, dim]
        b = b.unsqueeze(0)  # [1, dim]

        # a_inv = ~a / <a . ~a>_0
        a_rev = self.algebra.reverse(a)
        a_sq = (a * a_rev)[..., 0:1].clamp(min=self.algebra.eps)   # grade-0 scalar
        a_inv = a_rev / a_sq                            # [1, dim]

        # Transition element T = a_inv . b
        T = self.algebra.geometric_product(a_inv, b)    # [1, dim]

        # Log approximation: grade-2 part of (T - 1)
        T_shift = T.clone()
        T_shift[..., 0] -= 1.0
        log_T = self.algebra.grade_projection(T_shift, 2)  # [1, dim]

        # Sample t in [0, 1]
        ts = torch.linspace(0.0, 1.0, steps, device=a.device, dtype=a.dtype)

        # Batched: scale log_T by all t values, exp, then GP
        scaled = ts.unsqueeze(-1) * log_T  # [steps, dim]
        exp_all = self.algebra.exp(scaled)  # [steps, dim]
        return self.algebra.geometric_product(a, exp_all)  # [steps, dim]

    def _random_baseline_coherence(self) -> float:
        """Expected coherence for random unit vectors in the bivector subspace.

        For d-dimensional random unit vectors, ``E[|cos theta|] ~= sqrt(2/(pi*d))``.
        In Cl(n), the bivector space has ``n*(n-1)/2`` dimensions.
        """
        import math
        n = self.algebra.n
        d = n * (n - 1) // 2
        if d <= 1:
            return 1.0
        return math.sqrt(2.0 / (math.pi * d))

    def causal_report(self, data: torch.Tensor) -> Dict:
        """Full geodesic flow analysis with a causal interpretation.

        Embeds data, computes coherence and curvature, and returns a
        human-readable verdict.

        The causal threshold is adaptive: coherence must exceed the
        midpoint between random baseline and 1.0 (i.e. the measured
        coherence must be at least halfway between chance and perfect
        alignment).  Curvature must be below 0.5.

        Args:
            data (torch.Tensor): ``[N, d]`` raw data.

        Returns:
            Dict: report with keys ``coherence``, ``curvature``,
            ``causal``, ``baseline``, ``threshold``, ``label``.
        """
        mv = self._embed(data)
        coh = self.coherence(mv)
        curv = self.curvature(mv)
        baseline = self._random_baseline_coherence()
        threshold = (1.0 + baseline) / 2.0
        is_causal = (coh > threshold) and (curv < CONSTANTS.curvature_causal_threshold)
        return {
            'coherence': coh,
            'curvature': curv,
            'baseline': baseline,
            'threshold': threshold,
            'causal': is_causal,
            'label': (
                'Causal - smooth, aligned flow (low curvature)'
                if is_causal else
                'Noisy - fragmented, colliding flow (high curvature)'
            ),
        }

    def per_point_coherence(self, mv: torch.Tensor) -> torch.Tensor:
        """Per-point coherence scores for stratified sampling.

        Returns a scalar coherence value per data point, measuring how
        well-aligned that point's neighbourhood connections are.

        Args:
            mv (torch.Tensor): ``[N, dim]`` multivectors.

        Returns:
            torch.Tensor: ``[N]`` coherence scores in [0, 1].
        """
        bv = self._connection_bivectors(mv)   # [N, k, dim]
        N, k, D = bv.shape

        bi = bv.unsqueeze(2)                  # [N, k, 1, dim]
        bj = bv.unsqueeze(1)                  # [N, 1, k, dim]
        abs_cos = (bi * bj).sum(dim=-1).abs() # [N, k, k]

        mask = ~torch.eye(k, dtype=torch.bool, device=mv.device)  # [k, k]
        # Mean over off-diagonal pairs per point
        off_diag = abs_cos[:, mask].reshape(N, -1)  # [N, k*(k-1)]
        return off_diag.mean(dim=1)                 # [N]

__init__(algebra, k=8)

Initialize Geodesic Flow.

Parameters:

Name Type Description Default
algebra CliffordAlgebra

The algebra instance.

required
k int

Number of nearest neighbours for the flow field.

8
Source code in core/analysis/geodesic.py
def __init__(self, algebra: CliffordAlgebra, k: int = 8):
    """Initialize Geodesic Flow.

    Args:
        algebra (CliffordAlgebra): The algebra instance.
        k (int): Number of nearest neighbours for the flow field.
    """
    self.algebra = algebra
    self.k = k

flow_bivectors(mv)

Computes the mean flow bivector at each data point.

For each point x_i, aggregates the unit connection bivectors to its k-nearest neighbours:

B_i = mean_j { unit( <x_i . ~x_j>_2 ) }

.. note:: For perfectly symmetric data (e.g. a closed circle) the mean cancels to zero -- which is geometrically correct since there is no preferred flow direction. Use :meth:coherence to measure structure without this cancellation.

Parameters:

Name Type Description Default
mv Tensor

[N, dim] grade-1 multivectors.

required

Returns:

Type Description
Tensor

torch.Tensor: [N, dim] mean flow bivectors.

Source code in core/analysis/geodesic.py
def flow_bivectors(self, mv: torch.Tensor) -> torch.Tensor:
    """Computes the mean flow bivector at each data point.

    For each point x_i, aggregates the unit connection bivectors to its
    k-nearest neighbours:

        B_i = mean_j { unit( <x_i . ~x_j>_2 ) }

    .. note::
        For perfectly symmetric data (e.g. a closed circle) the mean
        cancels to zero -- which is geometrically correct since there is
        no preferred flow direction.  Use :meth:`coherence` to measure
        structure without this cancellation.

    Args:
        mv (torch.Tensor): ``[N, dim]`` grade-1 multivectors.

    Returns:
        torch.Tensor: ``[N, dim]`` mean flow bivectors.
    """
    bv = self._connection_bivectors(mv)   # [N, k, dim]
    return bv.mean(dim=1)                 # [N, dim]

coherence(mv)

Measures concentration of connection bivectors within each neighbourhood.

For each point, computes the mean absolute cosine similarity between all pairs of its k connection bivectors. This captures how consistently the neighbourhood connections lie on the same rotation plane.

  • 1.0: all connections at every point are parallel or anti-parallel (maximally structured).
  • 1/num_bivectors (~= baseline): connections point in random directions.

.. note:: In Cl(2,0) the grade-2 space is 1-dimensional (only e_12), so coherence is trivially 1.0 for any data -- use at least Cl(3,0) for meaningful discrimination.

Parameters:

Name Type Description Default
mv Tensor

[N, dim] multivectors.

required

Returns:

Name Type Description
float float

Coherence score in [0, 1].

Source code in core/analysis/geodesic.py
def coherence(self, mv: torch.Tensor) -> float:
    """Measures concentration of connection bivectors within each neighbourhood.

    For each point, computes the mean **absolute** cosine similarity between
    all pairs of its k connection bivectors.  This captures how consistently
    the neighbourhood connections lie on the same rotation plane.

    - **1.0**: all connections at every point are parallel or anti-parallel
      (maximally structured).
    - **1/num_bivectors** (~= baseline): connections point in random directions.

    .. note::
        In Cl(2,0) the grade-2 space is 1-dimensional (only e_12), so
        coherence is trivially 1.0 for any data -- use at least Cl(3,0)
        for meaningful discrimination.

    Args:
        mv (torch.Tensor): ``[N, dim]`` multivectors.

    Returns:
        float: Coherence score in [0, 1].
    """
    return self._coherence_tensor(mv).item()

curvature(mv)

Measures how much connection structure changes across the manifold.

Computes the mean dissimilarity of connection bivectors between neighbouring pairs of points:

dissimilarity(i, j) = 1 - mean_abs_cos( {B_ia}, {B_jb} )

where {B_ia} is the set of k unit connection bivectors at point i and {B_jb} at point j, and mean_abs_cos is the cross-set absolute cosine similarity.

  • 0.0: all neighbouring points have the same connection structure (flat geodesics, smooth manifold).
  • High: the connection direction changes rapidly between neighbours (high curvature, fragmented flow).

Parameters:

Name Type Description Default
mv Tensor

[N, dim] multivectors.

required

Returns:

Name Type Description
float float

Curvature score in [0, 1].

Source code in core/analysis/geodesic.py
def curvature(self, mv: torch.Tensor) -> float:
    """Measures how much connection structure changes across the manifold.

    Computes the mean **dissimilarity** of connection bivectors between
    neighbouring pairs of points:

        dissimilarity(i, j) = 1 - mean_abs_cos( {B_ia}, {B_jb} )

    where {B_ia} is the set of k unit connection bivectors at point i and
    {B_jb} at point j, and mean_abs_cos is the cross-set absolute cosine
    similarity.

    - **0.0**: all neighbouring points have the same connection structure
      (flat geodesics, smooth manifold).
    - **High**: the connection direction changes rapidly between neighbours
      (high curvature, fragmented flow).

    Args:
        mv (torch.Tensor): ``[N, dim]`` multivectors.

    Returns:
        float: Curvature score in [0, 1].
    """
    return self._curvature_tensor(mv).item()

interpolate(a, b, steps=10)

Interpolates along the geodesic from a to b.

Uses the Lie group exponential map on the transition element:

T = a_inv . b
log(T) ~= <T - 1>_2     (grade-2 approximation for small angles)
gamma(t) = a . exp(t . log(T))

Exact when a and b are close; a first-order approximation otherwise.

Parameters:

Name Type Description Default
a Tensor

Start multivector [dim].

required
b Tensor

End multivector [dim].

required
steps int

Number of interpolation steps (including endpoints).

10

Returns:

Type Description
Tensor

torch.Tensor: [steps, dim] sequence of multivectors.

Source code in core/analysis/geodesic.py
def interpolate(
    self,
    a: torch.Tensor,
    b: torch.Tensor,
    steps: int = 10,
) -> torch.Tensor:
    """Interpolates along the geodesic from ``a`` to ``b``.

    Uses the *Lie group exponential map* on the transition element:

        T = a_inv . b
        log(T) ~= <T - 1>_2     (grade-2 approximation for small angles)
        gamma(t) = a . exp(t . log(T))

    Exact when a and b are close; a first-order approximation otherwise.

    Args:
        a (torch.Tensor): Start multivector ``[dim]``.
        b (torch.Tensor): End multivector ``[dim]``.
        steps (int): Number of interpolation steps (including endpoints).

    Returns:
        torch.Tensor: ``[steps, dim]`` sequence of multivectors.
    """
    a = a.unsqueeze(0)  # [1, dim]
    b = b.unsqueeze(0)  # [1, dim]

    # a_inv = ~a / <a . ~a>_0
    a_rev = self.algebra.reverse(a)
    a_sq = (a * a_rev)[..., 0:1].clamp(min=self.algebra.eps)   # grade-0 scalar
    a_inv = a_rev / a_sq                            # [1, dim]

    # Transition element T = a_inv . b
    T = self.algebra.geometric_product(a_inv, b)    # [1, dim]

    # Log approximation: grade-2 part of (T - 1)
    T_shift = T.clone()
    T_shift[..., 0] -= 1.0
    log_T = self.algebra.grade_projection(T_shift, 2)  # [1, dim]

    # Sample t in [0, 1]
    ts = torch.linspace(0.0, 1.0, steps, device=a.device, dtype=a.dtype)

    # Batched: scale log_T by all t values, exp, then GP
    scaled = ts.unsqueeze(-1) * log_T  # [steps, dim]
    exp_all = self.algebra.exp(scaled)  # [steps, dim]
    return self.algebra.geometric_product(a, exp_all)  # [steps, dim]

causal_report(data)

Full geodesic flow analysis with a causal interpretation.

Embeds data, computes coherence and curvature, and returns a human-readable verdict.

The causal threshold is adaptive: coherence must exceed the midpoint between random baseline and 1.0 (i.e. the measured coherence must be at least halfway between chance and perfect alignment). Curvature must be below 0.5.

Parameters:

Name Type Description Default
data Tensor

[N, d] raw data.

required

Returns:

Name Type Description
Dict Dict

report with keys coherence, curvature,

Dict

causal, baseline, threshold, label.

Source code in core/analysis/geodesic.py
def causal_report(self, data: torch.Tensor) -> Dict:
    """Full geodesic flow analysis with a causal interpretation.

    Embeds data, computes coherence and curvature, and returns a
    human-readable verdict.

    The causal threshold is adaptive: coherence must exceed the
    midpoint between random baseline and 1.0 (i.e. the measured
    coherence must be at least halfway between chance and perfect
    alignment).  Curvature must be below 0.5.

    Args:
        data (torch.Tensor): ``[N, d]`` raw data.

    Returns:
        Dict: report with keys ``coherence``, ``curvature``,
        ``causal``, ``baseline``, ``threshold``, ``label``.
    """
    mv = self._embed(data)
    coh = self.coherence(mv)
    curv = self.curvature(mv)
    baseline = self._random_baseline_coherence()
    threshold = (1.0 + baseline) / 2.0
    is_causal = (coh > threshold) and (curv < CONSTANTS.curvature_causal_threshold)
    return {
        'coherence': coh,
        'curvature': curv,
        'baseline': baseline,
        'threshold': threshold,
        'causal': is_causal,
        'label': (
            'Causal - smooth, aligned flow (low curvature)'
            if is_causal else
            'Noisy - fragmented, colliding flow (high curvature)'
        ),
    }

per_point_coherence(mv)

Per-point coherence scores for stratified sampling.

Returns a scalar coherence value per data point, measuring how well-aligned that point's neighbourhood connections are.

Parameters:

Name Type Description Default
mv Tensor

[N, dim] multivectors.

required

Returns:

Type Description
Tensor

torch.Tensor: [N] coherence scores in [0, 1].

Source code in core/analysis/geodesic.py
def per_point_coherence(self, mv: torch.Tensor) -> torch.Tensor:
    """Per-point coherence scores for stratified sampling.

    Returns a scalar coherence value per data point, measuring how
    well-aligned that point's neighbourhood connections are.

    Args:
        mv (torch.Tensor): ``[N, dim]`` multivectors.

    Returns:
        torch.Tensor: ``[N]`` coherence scores in [0, 1].
    """
    bv = self._connection_bivectors(mv)   # [N, k, dim]
    N, k, D = bv.shape

    bi = bv.unsqueeze(2)                  # [N, k, 1, dim]
    bj = bv.unsqueeze(1)                  # [N, 1, k, dim]
    abs_cos = (bi * bj).sum(dim=-1).abs() # [N, k, k]

    mask = ~torch.eye(k, dtype=torch.bool, device=mv.device)  # [k, k]
    # Mean over off-diagonal pairs per point
    off_diag = abs_cos[:, mask].reshape(N, -1)  # [N, k*(k-1)]
    return off_diag.mean(dim=1)                 # [N]

DimensionLifter

Tests whether lifting data to a higher-dimensional algebra reveals structure.

The hypothesis: data living on an n-dimensional manifold may possess latent structure that only becomes visible in Cl(n+1, q) or Cl(n, q+1).

Lifting appends extra coordinates to the grade-1 embedding:

  • Positive lift Cl(p, q) -> Cl(p+1, q): adds a spacelike dimension. The extra coordinate is set to 1 (projective / homogeneous lift).
  • Null lift Cl(p, q) -> Cl(p, q+1): adds a timelike dimension. The extra coordinate is set to 0 (null vector lift for conformal-like embeddings).

After lifting, geodesic flow coherence is re-measured. An improvement indicates that the extra dimension captures hidden geometric structure.

Source code in core/analysis/dimension.py
class DimensionLifter:
    """Tests whether lifting data to a higher-dimensional algebra reveals structure.

    The hypothesis: data living on an n-dimensional manifold may possess
    latent structure that only becomes visible in Cl(n+1, q) or Cl(n, q+1).

    Lifting appends extra coordinates to the grade-1 embedding:

    - **Positive lift** ``Cl(p, q) -> Cl(p+1, q)``: adds a spacelike dimension.
      The extra coordinate is set to 1 (projective / homogeneous lift).
    - **Null lift** ``Cl(p, q) -> Cl(p, q+1)``: adds a timelike dimension.
      The extra coordinate is set to 0 (null vector lift for conformal-like
      embeddings).

    After lifting, geodesic flow coherence is re-measured.  An improvement
    indicates that the extra dimension captures hidden geometric structure.
    """

    def __init__(self, device: str = 'cpu'):
        self.device = device

    def lift(
        self,
        data: torch.Tensor,
        target_algebra: CliffordAlgebra,
        fill: float = 1.0,
    ) -> torch.Tensor:
        """Lifts data into the grade-1 subspace of a higher-dimensional algebra.

        Pads each data vector with ``fill`` values in the new dimensions,
        then embeds as a grade-1 multivector.

        Args:
            data: ``[N, d]`` source data.
            target_algebra: Target algebra with n >= d.
            fill: Coordinate value for the new dimensions.
                Use 1.0 for a projective (homogeneous) lift,
                0.0 for a null-vector (conformal) lift.

        Returns:
            ``[N, target_algebra.dim]`` grade-1 multivectors.
        """
        N, d = data.shape
        n = target_algebra.n
        if n < d:
            raise ValueError(f"Target algebra dim {n} < source data dim {d}.")
        if n == d:
            return target_algebra.embed_vector(data.to(self.device))

        pad = torch.full(
            (N, n - d), fill,
            device=self.device, dtype=data.dtype,
        )
        lifted = torch.cat([data.to(self.device), pad], dim=-1)
        return target_algebra.embed_vector(lifted)

    def test(
        self,
        data: torch.Tensor,
        p: int,
        q: int,
        k: int = 8,
    ) -> Dict:
        """Compares geodesic flow coherence before and after dimension lifting.

        Tests three algebras:

        1. **Original** Cl(p, q): baseline coherence and curvature.
        2. **Positive lift** Cl(p+1, q): spacelike extra dimension, fill=1.
        3. **Null lift** Cl(p, q+1): timelike extra dimension, fill=0.

        Args:
            data: ``[N, d]`` data where d = p + q.
            p: Original positive signature.
            q: Original negative signature.
            k: Nearest neighbours for geodesic flow.

        Returns:
            Dict with keys ``original``, ``lift_positive``, ``lift_null``,
            and ``best`` (name of the algebra with highest coherence).
        """
        from .geodesic import GeodesicFlow

        data = data.to(self.device)
        results: Dict = {}

        def _measure(alg: CliffordAlgebra, mv: torch.Tensor) -> Dict:
            gf = GeodesicFlow(alg, k=k)
            coh = gf.coherence(mv)
            curv = gf.curvature(mv)
            baseline = gf._random_baseline_coherence()
            coh_threshold = (1.0 + baseline) / 2.0
            return {
                'signature': (alg.p, alg.q),
                'coherence': coh,
                'curvature': curv,
                'causal': (coh > coh_threshold) and (curv < CONSTANTS.curvature_causal_threshold),
            }

        alg_orig = CliffordAlgebra(p, q, device=self.device)
        mv_orig = alg_orig.embed_vector(data[..., :alg_orig.n])
        results['original'] = _measure(alg_orig, mv_orig)

        alg_pos = CliffordAlgebra(p + 1, q, device=self.device)
        mv_pos = self.lift(data, alg_pos, fill=1.0)
        results['lift_positive'] = _measure(alg_pos, mv_pos)

        alg_null = CliffordAlgebra(p, q + 1, device=self.device)
        mv_null = self.lift(data, alg_null, fill=0.0)
        results['lift_null'] = _measure(alg_null, mv_null)

        best = max(
            ('original', 'lift_positive', 'lift_null'),
            key=lambda key: results[key]['coherence'],
        )
        results['best'] = best

        return results

    def format_report(self, results: Dict) -> str:
        """Renders a lifting test result as a human-readable string."""
        lines = ['Dimension Lifting Report', '=' * 40]
        for key in ('original', 'lift_positive', 'lift_null'):
            r = results[key]
            p, q = r['signature']
            coh = r['coherence']
            curv = r['curvature']
            causal = 'O Causal' if r['causal'] else 'X Noisy'
            lines.append(
                f"  Cl({p},{q})  coherence={coh:+.3f}  curvature={curv:.3f}  {causal}"
            )
        lines.append(f"\n  Best algebra: {results['best']}")
        return '\n'.join(lines)

lift(data, target_algebra, fill=1.0)

Lifts data into the grade-1 subspace of a higher-dimensional algebra.

Pads each data vector with fill values in the new dimensions, then embeds as a grade-1 multivector.

Parameters:

Name Type Description Default
data Tensor

[N, d] source data.

required
target_algebra CliffordAlgebra

Target algebra with n >= d.

required
fill float

Coordinate value for the new dimensions. Use 1.0 for a projective (homogeneous) lift, 0.0 for a null-vector (conformal) lift.

1.0

Returns:

Type Description
Tensor

[N, target_algebra.dim] grade-1 multivectors.

Source code in core/analysis/dimension.py
def lift(
    self,
    data: torch.Tensor,
    target_algebra: CliffordAlgebra,
    fill: float = 1.0,
) -> torch.Tensor:
    """Lifts data into the grade-1 subspace of a higher-dimensional algebra.

    Pads each data vector with ``fill`` values in the new dimensions,
    then embeds as a grade-1 multivector.

    Args:
        data: ``[N, d]`` source data.
        target_algebra: Target algebra with n >= d.
        fill: Coordinate value for the new dimensions.
            Use 1.0 for a projective (homogeneous) lift,
            0.0 for a null-vector (conformal) lift.

    Returns:
        ``[N, target_algebra.dim]`` grade-1 multivectors.
    """
    N, d = data.shape
    n = target_algebra.n
    if n < d:
        raise ValueError(f"Target algebra dim {n} < source data dim {d}.")
    if n == d:
        return target_algebra.embed_vector(data.to(self.device))

    pad = torch.full(
        (N, n - d), fill,
        device=self.device, dtype=data.dtype,
    )
    lifted = torch.cat([data.to(self.device), pad], dim=-1)
    return target_algebra.embed_vector(lifted)

test(data, p, q, k=8)

Compares geodesic flow coherence before and after dimension lifting.

Tests three algebras:

  1. Original Cl(p, q): baseline coherence and curvature.
  2. Positive lift Cl(p+1, q): spacelike extra dimension, fill=1.
  3. Null lift Cl(p, q+1): timelike extra dimension, fill=0.

Parameters:

Name Type Description Default
data Tensor

[N, d] data where d = p + q.

required
p int

Original positive signature.

required
q int

Original negative signature.

required
k int

Nearest neighbours for geodesic flow.

8

Returns:

Type Description
Dict

Dict with keys original, lift_positive, lift_null,

Dict

and best (name of the algebra with highest coherence).

Source code in core/analysis/dimension.py
def test(
    self,
    data: torch.Tensor,
    p: int,
    q: int,
    k: int = 8,
) -> Dict:
    """Compares geodesic flow coherence before and after dimension lifting.

    Tests three algebras:

    1. **Original** Cl(p, q): baseline coherence and curvature.
    2. **Positive lift** Cl(p+1, q): spacelike extra dimension, fill=1.
    3. **Null lift** Cl(p, q+1): timelike extra dimension, fill=0.

    Args:
        data: ``[N, d]`` data where d = p + q.
        p: Original positive signature.
        q: Original negative signature.
        k: Nearest neighbours for geodesic flow.

    Returns:
        Dict with keys ``original``, ``lift_positive``, ``lift_null``,
        and ``best`` (name of the algebra with highest coherence).
    """
    from .geodesic import GeodesicFlow

    data = data.to(self.device)
    results: Dict = {}

    def _measure(alg: CliffordAlgebra, mv: torch.Tensor) -> Dict:
        gf = GeodesicFlow(alg, k=k)
        coh = gf.coherence(mv)
        curv = gf.curvature(mv)
        baseline = gf._random_baseline_coherence()
        coh_threshold = (1.0 + baseline) / 2.0
        return {
            'signature': (alg.p, alg.q),
            'coherence': coh,
            'curvature': curv,
            'causal': (coh > coh_threshold) and (curv < CONSTANTS.curvature_causal_threshold),
        }

    alg_orig = CliffordAlgebra(p, q, device=self.device)
    mv_orig = alg_orig.embed_vector(data[..., :alg_orig.n])
    results['original'] = _measure(alg_orig, mv_orig)

    alg_pos = CliffordAlgebra(p + 1, q, device=self.device)
    mv_pos = self.lift(data, alg_pos, fill=1.0)
    results['lift_positive'] = _measure(alg_pos, mv_pos)

    alg_null = CliffordAlgebra(p, q + 1, device=self.device)
    mv_null = self.lift(data, alg_null, fill=0.0)
    results['lift_null'] = _measure(alg_null, mv_null)

    best = max(
        ('original', 'lift_positive', 'lift_null'),
        key=lambda key: results[key]['coherence'],
    )
    results['best'] = best

    return results

format_report(results)

Renders a lifting test result as a human-readable string.

Source code in core/analysis/dimension.py
def format_report(self, results: Dict) -> str:
    """Renders a lifting test result as a human-readable string."""
    lines = ['Dimension Lifting Report', '=' * 40]
    for key in ('original', 'lift_positive', 'lift_null'):
        r = results[key]
        p, q = r['signature']
        coh = r['coherence']
        curv = r['curvature']
        causal = 'O Causal' if r['causal'] else 'X Noisy'
        lines.append(
            f"  Cl({p},{q})  coherence={coh:+.3f}  curvature={curv:.3f}  {causal}"
        )
    lines.append(f"\n  Best algebra: {results['best']}")
    return '\n'.join(lines)

GeometricAnalyzer

Top-level orchestrator for the geometric analysis toolkit.

Runs a configurable subset of analyzers in the correct dependency order and returns an :class:AnalysisReport.

Input modes:

  • data.ndim == 2 and algebra is None -- raw mode: full pipeline from sampling through signature search to GA analyses.
  • data.ndim == 3 and algebra is not None -- pre-embedded mode: skip sampling / dimension / signature; run spectral, symmetry, and commutator analyses directly.
  • data.ndim == 2 and algebra is not None -- raw + known algebra: embed data, then run GA analyses.

Parameters:

Name Type Description Default
config Optional[AnalysisConfig]

Master analysis configuration.

None
Source code in core/analysis/pipeline.py
class GeometricAnalyzer:
    """Top-level orchestrator for the geometric analysis toolkit.

    Runs a configurable subset of analyzers in the correct dependency
    order and returns an :class:`AnalysisReport`.

    **Input modes:**

    * ``data.ndim == 2`` and ``algebra is None`` -- **raw mode**: full
      pipeline from sampling through signature search to GA analyses.
    * ``data.ndim == 3`` and ``algebra is not None`` -- **pre-embedded
      mode**: skip sampling / dimension / signature; run spectral,
      symmetry, and commutator analyses directly.
    * ``data.ndim == 2`` and ``algebra is not None`` -- **raw + known
      algebra**: embed data, then run GA analyses.

    Args:
        config: Master analysis configuration.
    """

    def __init__(self, config: Optional[AnalysisConfig] = None):
        self.config = config or AnalysisConfig()

    def analyze(
        self,
        data: torch.Tensor,
        algebra: Optional[CliffordAlgebra] = None,
    ) -> AnalysisReport:
        """Run the full geometric analysis pipeline.

        Args:
            data: Raw ``[N, D]`` or pre-embedded ``[N, C, 2^n]`` tensor.
            algebra: Required when *data* is pre-embedded.  Optional
                when raw -- will be created from the signature search.

        Returns:
            :class:`AnalysisReport`.
        """
        cfg = self.config
        report = AnalysisReport()
        t0 = time.time()

        data = data.to(cfg.device)
        report.metadata["data_shape"] = list(data.shape)
        report.metadata["config_device"] = cfg.device

        if data.ndim == 3 and algebra is not None:
            # Pre-embedded mode
            report = self._run_ga_analyses(data, algebra, report)
        elif data.ndim == 2 and algebra is not None:
            # Raw + known algebra
            mv_data = self._embed_raw(data, algebra)
            report = self._run_ga_analyses(mv_data, algebra, report)
        elif data.ndim == 2:
            # Full raw mode
            report = self._run_full_pipeline(data, report)
        else:
            raise ValueError(
                f"Unexpected data shape {data.shape}. Expected [N, D] or [N, C, dim]."
            )

        report.metadata["elapsed_seconds"] = round(time.time() - t0, 2)
        return report

    def _run_full_pipeline(
        self, data: torch.Tensor, report: AnalysisReport
    ) -> AnalysisReport:
        cfg = self.config

        # 1. Sampling
        sampled, sample_meta = StatisticalSampler.sample(data, cfg.sampling)
        if isinstance(sampled, list):
            # bootstrap returns list -- use first for pipeline, rest for CI
            sampled = sampled[0]
        report.metadata["sampling"] = sample_meta

        # 2. Dimension analysis
        dim_result = None
        if cfg.run_dimension:
            da = EffectiveDimensionAnalyzer(
                device=cfg.device,
                energy_threshold=cfg.energy_threshold,
            )
            dim_result = da.analyze(sampled)
            report.dimension = dim_result

        # 3. Signature search
        sig_result = None
        if cfg.run_signature:
            ssa = SignatureSearchAnalyzer(device=cfg.device)
            sig_result = ssa.analyze(sampled, dim_result=dim_result)
            report.signature = sig_result

        # 4. Create algebra and embed (n >= 2 for meaningful GA structure)
        if sig_result is not None:
            p, q, r = sig_result.signature
        elif dim_result is not None:
            p, q, r = dim_result.intrinsic_dim, 0, 0
        else:
            p, q, r = min(sampled.shape[1], 6), 0, 0
        # Ensure n >= 2 so grade-2 (bivectors) exist for GA analyses
        if p + q + r < 2:
            p = max(p, 2 - q - r)

        algebra = CliffordAlgebra(p, q, r, device=cfg.device)
        mv_data = self._embed_raw(sampled, algebra)

        # 5. GA analyses (parallel)
        report = self._run_ga_analyses(mv_data, algebra, report)

        return report

    def _run_ga_analyses(
        self,
        mv_data: torch.Tensor,
        algebra: CliffordAlgebra,
        report: AnalysisReport,
    ) -> AnalysisReport:
        cfg = self.config

        tasks = {}
        if cfg.run_spectral:
            tasks["spectral"] = lambda: SpectralAnalyzer(algebra).analyze(mv_data)
        if cfg.run_symmetry:
            tasks["symmetry"] = lambda: SymmetryDetector(
                algebra, null_threshold=cfg.energy_threshold
            ).analyze(mv_data)
        if cfg.run_commutator:
            tasks["commutator"] = lambda: CommutatorAnalyzer(algebra).analyze(mv_data)

        if len(tasks) <= 1:
            # Sequential
            for name, fn in tasks.items():
                setattr(report, name, fn())
        else:
            # Parallel
            with concurrent.futures.ThreadPoolExecutor(max_workers=3) as pool:
                futures = {name: pool.submit(fn) for name, fn in tasks.items()}
                for name, fut in futures.items():
                    setattr(report, name, fut.result())

        # Refine continuous symmetries with commutator result
        if (
            cfg.run_symmetry
            and cfg.run_commutator
            and report.symmetry is not None
            and report.commutator is not None
        ):
            if mv_data.ndim == 3:
                flat = mv_data.mean(dim=1)
            else:
                flat = mv_data
            detector = SymmetryDetector(
                algebra, null_threshold=cfg.energy_threshold
            )
            report.symmetry.continuous_symmetry_dim = (
                detector.detect_continuous_symmetries(
                    flat, commutator_result=report.commutator
                )
            )

        return report

    @staticmethod
    def _embed_raw(data: torch.Tensor, algebra: CliffordAlgebra) -> torch.Tensor:
        """Embed raw ``[N, D]`` data as grade-1 multivectors ``[N, 1, dim]``."""
        n = algebra.n
        D = data.shape[1]
        if D > n:
            data = data[:, :n]
        elif D < n:
            pad = torch.zeros(
                data.shape[0], n - D, device=data.device, dtype=data.dtype
            )
            data = torch.cat([data, pad], dim=-1)
        mv = algebra.embed_vector(data.float())  # [N, dim]
        return mv.unsqueeze(1)  # [N, 1, dim]

analyze(data, algebra=None)

Run the full geometric analysis pipeline.

Parameters:

Name Type Description Default
data Tensor

Raw [N, D] or pre-embedded [N, C, 2^n] tensor.

required
algebra Optional[CliffordAlgebra]

Required when data is pre-embedded. Optional when raw -- will be created from the signature search.

None

Returns:

Type Description
AnalysisReport

class:AnalysisReport.

Source code in core/analysis/pipeline.py
def analyze(
    self,
    data: torch.Tensor,
    algebra: Optional[CliffordAlgebra] = None,
) -> AnalysisReport:
    """Run the full geometric analysis pipeline.

    Args:
        data: Raw ``[N, D]`` or pre-embedded ``[N, C, 2^n]`` tensor.
        algebra: Required when *data* is pre-embedded.  Optional
            when raw -- will be created from the signature search.

    Returns:
        :class:`AnalysisReport`.
    """
    cfg = self.config
    report = AnalysisReport()
    t0 = time.time()

    data = data.to(cfg.device)
    report.metadata["data_shape"] = list(data.shape)
    report.metadata["config_device"] = cfg.device

    if data.ndim == 3 and algebra is not None:
        # Pre-embedded mode
        report = self._run_ga_analyses(data, algebra, report)
    elif data.ndim == 2 and algebra is not None:
        # Raw + known algebra
        mv_data = self._embed_raw(data, algebra)
        report = self._run_ga_analyses(mv_data, algebra, report)
    elif data.ndim == 2:
        # Full raw mode
        report = self._run_full_pipeline(data, report)
    else:
        raise ValueError(
            f"Unexpected data shape {data.shape}. Expected [N, D] or [N, C, dim]."
        )

    report.metadata["elapsed_seconds"] = round(time.time() - t0, 2)
    return report

SpectralAnalyzer

Spectral analysis of multivector data.

Three independent analyses are combined:

  1. Grade energy spectrum -- population-level distribution of Hermitian grade energy across all grades.
  2. Bivector field spectrum -- singular values from decomposing the mean bivector into simple components (rotation planes).
  3. GP operator spectrum -- eigenvalues of the left-multiplication operator :math:L_x(y) = x \cdot y (only for small algebras).

Parameters:

Name Type Description Default
algebra CliffordAlgebra

:class:CliffordAlgebra instance.

required
max_simple_components int

Maximum number of simple components to extract from the mean bivector.

5
Source code in core/analysis/spectral.py
class SpectralAnalyzer:
    """Spectral analysis of multivector data.

    Three independent analyses are combined:

    1. **Grade energy spectrum** -- population-level distribution of
       Hermitian grade energy across all grades.
    2. **Bivector field spectrum** -- singular values from decomposing
       the mean bivector into simple components (rotation planes).
    3. **GP operator spectrum** -- eigenvalues of the left-multiplication
       operator :math:`L_x(y) = x \\cdot y` (only for small algebras).

    Args:
        algebra: :class:`CliffordAlgebra` instance.
        max_simple_components: Maximum number of simple components to
            extract from the mean bivector.
    """

    def __init__(
        self,
        algebra: CliffordAlgebra,
        max_simple_components: int = 5,
    ):
        self.algebra = algebra
        self.max_simple_components = max_simple_components

    def analyze(self, mv_data: torch.Tensor) -> SpectralResult:
        """Full spectral analysis.

        Args:
            mv_data: Multivector data.  Accepted shapes:

                * ``[N, dim]`` -- single-channel batch.
                * ``[N, C, dim]`` -- multi-channel batch (channels are
                  averaged before bivector / GP analysis).

        Returns:
            :class:`SpectralResult`.
        """
        if mv_data.ndim == 2:
            mv_data = mv_data.unsqueeze(1)  # [N, 1, dim]

        grade_energy = self.grade_energy_spectrum(mv_data)
        bv_spectrum, simple_comps = self.bivector_field_spectrum(mv_data)

        gp_eigs = None
        if self.algebra.n <= CONSTANTS.gp_spectrum_max_n:
            gp_eigs = self.gp_operator_spectrum(mv_data)

        return SpectralResult(
            grade_energy=grade_energy,
            bivector_spectrum=bv_spectrum,
            simple_components=simple_comps,
            gp_eigenvalues=gp_eigs,
        )

    def grade_energy_spectrum(self, mv_data: torch.Tensor) -> torch.Tensor:
        """Mean Hermitian grade energy across the batch.

        Args:
            mv_data: ``[N, C, dim]`` multivector data.

        Returns:
            ``[n+1]`` tensor of mean grade energies.
        """
        # hermitian_grade_spectrum expects [..., dim]
        # Average over channels first to get [N, dim]
        flat = mv_data.mean(dim=1)  # [N, dim]
        spectrum = hermitian_grade_spectrum(self.algebra, flat)  # [N, n+1]
        return spectrum.mean(dim=0)  # [n+1]

    def bivector_field_spectrum(
        self, mv_data: torch.Tensor
    ) -> tuple:
        """Decompose the mean bivector into simple components.

        Args:
            mv_data: ``[N, C, dim]`` multivector data.

        Returns:
            ``(singular_values, simple_components)`` where
            *singular_values* is a 1-D tensor of component norms and
            *simple_components* is a list of ``[dim]`` simple bivectors.
        """
        flat = mv_data.mean(dim=1)  # [N, dim]
        mean_mv = flat.mean(dim=0)  # [dim]

        # Guard: algebra needs at least grade 2 for bivectors
        if self.algebra.n < 2:
            return (
                torch.zeros(1, device=mv_data.device),
                [torch.zeros_like(mean_mv)],
            )

        # Extract grade-2 (bivector) part
        mean_bv = self.algebra.grade_projection(mean_mv, 2)

        bv_norm = mean_bv.norm()
        if bv_norm < self.algebra.eps_sq:
            return (
                torch.zeros(1, device=mv_data.device),
                [torch.zeros_like(mean_bv)],
            )

        decomp, _ = differentiable_invariant_decomposition(
            self.algebra,
            mean_bv.unsqueeze(0),  # [1, dim]
            k=self.max_simple_components,
        )

        # decomp is a list of [1, dim] tensors
        norms = []
        components = []
        for comp in decomp:
            c = comp.squeeze(0)  # [dim]
            n = c.norm()
            if n > self.algebra.eps_sq:
                norms.append(n)
                components.append(c)

        if not norms:
            return (
                torch.zeros(1, device=mv_data.device),
                [torch.zeros_like(mean_bv)],
            )

        sv = torch.stack(norms)
        # Sort descending
        order = sv.argsort(descending=True)
        sv = sv[order]
        components = [components[i] for i in order.tolist()]

        return sv, components

    def gp_operator_spectrum(
        self, mv_data: torch.Tensor, n_samples: Optional[int] = None
    ) -> torch.Tensor:
        """Eigenvalue magnitudes of the left-multiplication operator.

        For a subsample of data points, constructs the explicit matrix
        representation of :math:`L_x(y) = x \\cdot y` and computes
        eigenvalues.

        Args:
            mv_data: ``[N, C, dim]`` multivector data.
            n_samples: Number of data points to sample.

        Returns:
            Sorted (descending) eigenvalue magnitudes.
        """
        if n_samples is None:
            n_samples = CONSTANTS.gp_spectrum_n_samples
        dim = self.algebra.dim
        flat = mv_data.mean(dim=1)  # [N, dim]
        N = flat.shape[0]

        k = min(N, n_samples)
        if k < N:
            idx = torch.randperm(N)[:k]
            flat = flat[idx]

        # Build GP matrix: L[:, j] = gp(mean_x, e_j)
        basis = torch.eye(dim, device=flat.device, dtype=flat.dtype)
        mean_x = flat.mean(dim=0)  # [dim]
        # Batched GP: expand mean_x to [dim, dim], basis is [dim, dim]
        # Result[j, :] = gp(mean_x, e_j) = L[:, j], so transpose
        L = self.algebra.geometric_product(
            mean_x.unsqueeze(0).expand(dim, -1), basis
        ).T

        eigvals = torch.linalg.eigvals(L)  # complex
        magnitudes = eigvals.abs()
        return magnitudes.sort(descending=True).values

analyze(mv_data)

Full spectral analysis.

Parameters:

Name Type Description Default
mv_data Tensor

Multivector data. Accepted shapes:

  • [N, dim] -- single-channel batch.
  • [N, C, dim] -- multi-channel batch (channels are averaged before bivector / GP analysis).
required

Returns:

Type Description
SpectralResult

class:SpectralResult.

Source code in core/analysis/spectral.py
def analyze(self, mv_data: torch.Tensor) -> SpectralResult:
    """Full spectral analysis.

    Args:
        mv_data: Multivector data.  Accepted shapes:

            * ``[N, dim]`` -- single-channel batch.
            * ``[N, C, dim]`` -- multi-channel batch (channels are
              averaged before bivector / GP analysis).

    Returns:
        :class:`SpectralResult`.
    """
    if mv_data.ndim == 2:
        mv_data = mv_data.unsqueeze(1)  # [N, 1, dim]

    grade_energy = self.grade_energy_spectrum(mv_data)
    bv_spectrum, simple_comps = self.bivector_field_spectrum(mv_data)

    gp_eigs = None
    if self.algebra.n <= CONSTANTS.gp_spectrum_max_n:
        gp_eigs = self.gp_operator_spectrum(mv_data)

    return SpectralResult(
        grade_energy=grade_energy,
        bivector_spectrum=bv_spectrum,
        simple_components=simple_comps,
        gp_eigenvalues=gp_eigs,
    )

grade_energy_spectrum(mv_data)

Mean Hermitian grade energy across the batch.

Parameters:

Name Type Description Default
mv_data Tensor

[N, C, dim] multivector data.

required

Returns:

Type Description
Tensor

[n+1] tensor of mean grade energies.

Source code in core/analysis/spectral.py
def grade_energy_spectrum(self, mv_data: torch.Tensor) -> torch.Tensor:
    """Mean Hermitian grade energy across the batch.

    Args:
        mv_data: ``[N, C, dim]`` multivector data.

    Returns:
        ``[n+1]`` tensor of mean grade energies.
    """
    # hermitian_grade_spectrum expects [..., dim]
    # Average over channels first to get [N, dim]
    flat = mv_data.mean(dim=1)  # [N, dim]
    spectrum = hermitian_grade_spectrum(self.algebra, flat)  # [N, n+1]
    return spectrum.mean(dim=0)  # [n+1]

bivector_field_spectrum(mv_data)

Decompose the mean bivector into simple components.

Parameters:

Name Type Description Default
mv_data Tensor

[N, C, dim] multivector data.

required

Returns:

Type Description
tuple

(singular_values, simple_components) where

tuple

singular_values is a 1-D tensor of component norms and

tuple

simple_components is a list of [dim] simple bivectors.

Source code in core/analysis/spectral.py
def bivector_field_spectrum(
    self, mv_data: torch.Tensor
) -> tuple:
    """Decompose the mean bivector into simple components.

    Args:
        mv_data: ``[N, C, dim]`` multivector data.

    Returns:
        ``(singular_values, simple_components)`` where
        *singular_values* is a 1-D tensor of component norms and
        *simple_components* is a list of ``[dim]`` simple bivectors.
    """
    flat = mv_data.mean(dim=1)  # [N, dim]
    mean_mv = flat.mean(dim=0)  # [dim]

    # Guard: algebra needs at least grade 2 for bivectors
    if self.algebra.n < 2:
        return (
            torch.zeros(1, device=mv_data.device),
            [torch.zeros_like(mean_mv)],
        )

    # Extract grade-2 (bivector) part
    mean_bv = self.algebra.grade_projection(mean_mv, 2)

    bv_norm = mean_bv.norm()
    if bv_norm < self.algebra.eps_sq:
        return (
            torch.zeros(1, device=mv_data.device),
            [torch.zeros_like(mean_bv)],
        )

    decomp, _ = differentiable_invariant_decomposition(
        self.algebra,
        mean_bv.unsqueeze(0),  # [1, dim]
        k=self.max_simple_components,
    )

    # decomp is a list of [1, dim] tensors
    norms = []
    components = []
    for comp in decomp:
        c = comp.squeeze(0)  # [dim]
        n = c.norm()
        if n > self.algebra.eps_sq:
            norms.append(n)
            components.append(c)

    if not norms:
        return (
            torch.zeros(1, device=mv_data.device),
            [torch.zeros_like(mean_bv)],
        )

    sv = torch.stack(norms)
    # Sort descending
    order = sv.argsort(descending=True)
    sv = sv[order]
    components = [components[i] for i in order.tolist()]

    return sv, components

gp_operator_spectrum(mv_data, n_samples=None)

Eigenvalue magnitudes of the left-multiplication operator.

For a subsample of data points, constructs the explicit matrix representation of :math:L_x(y) = x \cdot y and computes eigenvalues.

Parameters:

Name Type Description Default
mv_data Tensor

[N, C, dim] multivector data.

required
n_samples Optional[int]

Number of data points to sample.

None

Returns:

Type Description
Tensor

Sorted (descending) eigenvalue magnitudes.

Source code in core/analysis/spectral.py
def gp_operator_spectrum(
    self, mv_data: torch.Tensor, n_samples: Optional[int] = None
) -> torch.Tensor:
    """Eigenvalue magnitudes of the left-multiplication operator.

    For a subsample of data points, constructs the explicit matrix
    representation of :math:`L_x(y) = x \\cdot y` and computes
    eigenvalues.

    Args:
        mv_data: ``[N, C, dim]`` multivector data.
        n_samples: Number of data points to sample.

    Returns:
        Sorted (descending) eigenvalue magnitudes.
    """
    if n_samples is None:
        n_samples = CONSTANTS.gp_spectrum_n_samples
    dim = self.algebra.dim
    flat = mv_data.mean(dim=1)  # [N, dim]
    N = flat.shape[0]

    k = min(N, n_samples)
    if k < N:
        idx = torch.randperm(N)[:k]
        flat = flat[idx]

    # Build GP matrix: L[:, j] = gp(mean_x, e_j)
    basis = torch.eye(dim, device=flat.device, dtype=flat.dtype)
    mean_x = flat.mean(dim=0)  # [dim]
    # Batched GP: expand mean_x to [dim, dim], basis is [dim, dim]
    # Result[j, :] = gp(mean_x, e_j) = L[:, j], so transpose
    L = self.algebra.geometric_product(
        mean_x.unsqueeze(0).expand(dim, -1), basis
    ).T

    eigvals = torch.linalg.eigvals(L)  # complex
    magnitudes = eigvals.abs()
    return magnitudes.sort(descending=True).values

SymmetryDetector

Detect symmetries, null directions, and invariances.

Parameters:

Name Type Description Default
algebra CliffordAlgebra

:class:CliffordAlgebra instance.

required
null_threshold float

Energy threshold below which a direction is considered effectively null.

0.01
Source code in core/analysis/symmetry.py
class SymmetryDetector:
    """Detect symmetries, null directions, and invariances.

    Args:
        algebra: :class:`CliffordAlgebra` instance.
        null_threshold: Energy threshold below which a direction is
            considered effectively null.
    """

    def __init__(
        self,
        algebra: CliffordAlgebra,
        null_threshold: float = 0.01,
    ):
        self.algebra = algebra
        self.null_threshold = null_threshold

    def analyze(
        self,
        mv_data: torch.Tensor,
        commutator_result: Optional[CommutatorResult] = None,
    ) -> SymmetryResult:
        """Full symmetry analysis.

        Args:
            mv_data: Multivector data.  Accepted shapes:

                * ``[N, dim]`` -- single-channel batch.
                * ``[N, C, dim]`` -- multi-channel batch (channels
                  averaged for scalar-valued analyses).

            commutator_result: Pre-computed commutator results for
                continuous-symmetry refinement.

        Returns:
            :class:`SymmetryResult`.
        """
        if mv_data.ndim == 3:
            flat = mv_data.mean(dim=1)
        else:
            flat = mv_data  # [N, dim]

        null_dirs, null_scores = self.detect_null_directions(flat)
        inv_sym = self.detect_involution_symmetry(flat)
        refl_syms = self.detect_reflection_symmetries(flat)
        cont_dim = self.detect_continuous_symmetries(
            flat, commutator_result=commutator_result
        )

        return SymmetryResult(
            null_directions=null_dirs,
            null_scores=null_scores,
            involution_symmetry=inv_sym,
            reflection_symmetries=refl_syms,
            continuous_symmetry_dim=cont_dim,
        )

    def detect_null_directions(
        self, mv_data: torch.Tensor
    ) -> Tuple[List[int], torch.Tensor]:
        """Detect effectively null basis-vector directions.

        For each basis vector ``e_i``, computes the mean squared
        projection energy of the data onto that direction.  Directions
        below :attr:`null_threshold` are flagged.

        Args:
            mv_data: ``[N, dim]`` multivector data.

        Returns:
            ``(null_indices, scores)`` where *scores* is ``[n]`` and
            *null_indices* lists those with score < threshold.
        """
        n = self.algebra.n
        g1_idx = (1 << torch.arange(n, device=mv_data.device)).long()

        # Energy on each grade-1 component
        g1_coeffs = mv_data[:, g1_idx]  # [N, n]
        scores = (g1_coeffs ** 2).mean(dim=0)  # [n]

        # Normalise so max is 1
        smax = scores.max()
        if smax > self.algebra.eps_sq:
            scores = scores / smax

        null_dirs = (scores < self.null_threshold).nonzero(as_tuple=True)[0]
        return null_dirs.tolist(), scores

    def detect_involution_symmetry(self, mv_data: torch.Tensor) -> float:
        """Measure grade-involution symmetry of the data distribution.

        Computes the fraction of total energy in odd-grade components:

            score = E[ ||x_odd||^2 / ||x||^2 ]

        where ``x_odd = (x - alpha(x)) / 2`` and ``alpha`` is the grade
        involution (flips odd-grade components).

        Returns:
            Score in ``[0, 1]``.  0 means the data lives entirely in
            the even sub-algebra; 1 means entirely odd grades.
        """
        alpha = self.algebra.grade_involution(mv_data)  # [N, dim]
        odd_part = (mv_data - alpha) / 2.0
        odd_energy = (odd_part ** 2).sum(dim=-1)
        total_energy = (mv_data ** 2).sum(dim=-1).clamp(min=self.algebra.eps_sq)
        return (odd_energy / total_energy).mean().item()

    def detect_reflection_symmetries(
        self, mv_data: torch.Tensor
    ) -> List[Dict]:
        """Test reflection symmetry along each basis-vector direction.

        For each basis vector ``e_i``, reflects the data
        ``x' = -e_i x e_i^{-1}`` and compares the reflected
        distribution to the original.

        Returns:
            List of dicts ``{"direction": i, "score": float}`` sorted
            by score ascending (lower = more symmetric).
        """
        n = self.algebra.n
        dim = self.algebra.dim
        N = mv_data.shape[0]

        # Build all n basis vectors: [n, dim]
        basis_vecs = torch.zeros(n, dim, device=mv_data.device, dtype=mv_data.dtype)
        blade_indices = (1 << torch.arange(n, device=mv_data.device)).long()
        basis_vecs[torch.arange(n, device=mv_data.device), blade_indices] = 1.0

        # Batch reflect: [n, N, dim]
        mv_exp = mv_data.unsqueeze(0).expand(n, N, dim)
        basis_exp = basis_vecs.unsqueeze(1).expand(n, N, dim)
        reflected = self.algebra.reflect(mv_exp, basis_exp)  # [n, N, dim]

        # Distributional distance for all directions at once
        orig_sorted = mv_data.sort(dim=0).values.unsqueeze(0).expand(n, N, dim)
        refl_sorted = reflected.sort(dim=1).values  # sort along N
        dist = ((orig_sorted - refl_sorted) ** 2).sum(dim=-1).mean(dim=-1)  # [n]
        norm = (mv_data ** 2).sum(dim=-1).mean().clamp(min=self.algebra.eps_sq)
        scores = dist / norm  # [n]

        results = [{"direction": i, "score": scores[i].item()} for i in range(n)]
        results.sort(key=lambda r: r["score"])
        return results

    def detect_continuous_symmetries(
        self,
        mv_data: torch.Tensor,
        commutator_result: Optional[CommutatorResult] = None,
        threshold: Optional[float] = None,
    ) -> int:
        """Estimate the dimension of the continuous symmetry group.

        A bivector ``B_j`` generates a continuous symmetry if
        ``E[||[B_j, x_i]||]`` is near zero for all data points.

        If a :class:`CommutatorResult` is provided, its exchange
        spectrum is used directly (eigenvalues near zero -> symmetry
        generators).  Otherwise the computation is done from scratch.

        Args:
            mv_data: ``[N, dim]`` multivector data.
            commutator_result: Pre-computed commutator analysis.
            threshold: Normalised commutator norm below which a bivector
                is considered a symmetry generator.

        Returns:
            Estimated dimension of the continuous symmetry group.
        """
        if threshold is None:
            threshold = CONSTANTS.continuous_symmetry_threshold

        if commutator_result is not None:
            spec = commutator_result.exchange_spectrum
            if spec.numel() > 0:
                max_val = spec.abs().max().clamp(min=self.algebra.eps_sq)
                return int((spec.abs() / max_val < threshold).sum().item())

        # Compute from scratch: test each basis bivector
        n = self.algebra.n
        dim = self.algebra.dim
        N = mv_data.shape[0]

        ii, jj = torch.triu_indices(n, n, offset=1)
        bv_indices = ((1 << ii) | (1 << jj)).tolist()

        if not bv_indices:
            return 0

        n_bv = len(bv_indices)
        # Build all bivector bases: [n_bv, dim]
        bv_bases = torch.zeros(n_bv, dim, device=mv_data.device, dtype=mv_data.dtype)
        bv_idx_tensor = torch.tensor(bv_indices, dtype=torch.long, device=mv_data.device)
        bv_bases[torch.arange(n_bv, device=mv_data.device), bv_idx_tensor] = 1.0

        # Batch commutator: [n_bv, N, dim]
        bv_exp = bv_bases.unsqueeze(1).expand(n_bv, N, dim)
        mv_exp = mv_data.unsqueeze(0).expand(n_bv, N, dim)
        comm = self.algebra.commutator(bv_exp, mv_exp)

        comm_norms = comm.norm(dim=-1).mean(dim=-1)  # [n_bv]
        data_norm = mv_data.norm(dim=-1).mean().clamp(min=self.algebra.eps_sq)

        return int((comm_norms / data_norm < threshold).sum().item())

analyze(mv_data, commutator_result=None)

Full symmetry analysis.

Parameters:

Name Type Description Default
mv_data Tensor

Multivector data. Accepted shapes:

  • [N, dim] -- single-channel batch.
  • [N, C, dim] -- multi-channel batch (channels averaged for scalar-valued analyses).
required
commutator_result Optional[CommutatorResult]

Pre-computed commutator results for continuous-symmetry refinement.

None

Returns:

Type Description
SymmetryResult

class:SymmetryResult.

Source code in core/analysis/symmetry.py
def analyze(
    self,
    mv_data: torch.Tensor,
    commutator_result: Optional[CommutatorResult] = None,
) -> SymmetryResult:
    """Full symmetry analysis.

    Args:
        mv_data: Multivector data.  Accepted shapes:

            * ``[N, dim]`` -- single-channel batch.
            * ``[N, C, dim]`` -- multi-channel batch (channels
              averaged for scalar-valued analyses).

        commutator_result: Pre-computed commutator results for
            continuous-symmetry refinement.

    Returns:
        :class:`SymmetryResult`.
    """
    if mv_data.ndim == 3:
        flat = mv_data.mean(dim=1)
    else:
        flat = mv_data  # [N, dim]

    null_dirs, null_scores = self.detect_null_directions(flat)
    inv_sym = self.detect_involution_symmetry(flat)
    refl_syms = self.detect_reflection_symmetries(flat)
    cont_dim = self.detect_continuous_symmetries(
        flat, commutator_result=commutator_result
    )

    return SymmetryResult(
        null_directions=null_dirs,
        null_scores=null_scores,
        involution_symmetry=inv_sym,
        reflection_symmetries=refl_syms,
        continuous_symmetry_dim=cont_dim,
    )

detect_null_directions(mv_data)

Detect effectively null basis-vector directions.

For each basis vector e_i, computes the mean squared projection energy of the data onto that direction. Directions below :attr:null_threshold are flagged.

Parameters:

Name Type Description Default
mv_data Tensor

[N, dim] multivector data.

required

Returns:

Type Description
List[int]

(null_indices, scores) where scores is [n] and

Tensor

null_indices lists those with score < threshold.

Source code in core/analysis/symmetry.py
def detect_null_directions(
    self, mv_data: torch.Tensor
) -> Tuple[List[int], torch.Tensor]:
    """Detect effectively null basis-vector directions.

    For each basis vector ``e_i``, computes the mean squared
    projection energy of the data onto that direction.  Directions
    below :attr:`null_threshold` are flagged.

    Args:
        mv_data: ``[N, dim]`` multivector data.

    Returns:
        ``(null_indices, scores)`` where *scores* is ``[n]`` and
        *null_indices* lists those with score < threshold.
    """
    n = self.algebra.n
    g1_idx = (1 << torch.arange(n, device=mv_data.device)).long()

    # Energy on each grade-1 component
    g1_coeffs = mv_data[:, g1_idx]  # [N, n]
    scores = (g1_coeffs ** 2).mean(dim=0)  # [n]

    # Normalise so max is 1
    smax = scores.max()
    if smax > self.algebra.eps_sq:
        scores = scores / smax

    null_dirs = (scores < self.null_threshold).nonzero(as_tuple=True)[0]
    return null_dirs.tolist(), scores

detect_involution_symmetry(mv_data)

Measure grade-involution symmetry of the data distribution.

Computes the fraction of total energy in odd-grade components:

score = E[ ||x_odd||^2 / ||x||^2 ]

where x_odd = (x - alpha(x)) / 2 and alpha is the grade involution (flips odd-grade components).

Returns:

Type Description
float

Score in [0, 1]. 0 means the data lives entirely in

float

the even sub-algebra; 1 means entirely odd grades.

Source code in core/analysis/symmetry.py
def detect_involution_symmetry(self, mv_data: torch.Tensor) -> float:
    """Measure grade-involution symmetry of the data distribution.

    Computes the fraction of total energy in odd-grade components:

        score = E[ ||x_odd||^2 / ||x||^2 ]

    where ``x_odd = (x - alpha(x)) / 2`` and ``alpha`` is the grade
    involution (flips odd-grade components).

    Returns:
        Score in ``[0, 1]``.  0 means the data lives entirely in
        the even sub-algebra; 1 means entirely odd grades.
    """
    alpha = self.algebra.grade_involution(mv_data)  # [N, dim]
    odd_part = (mv_data - alpha) / 2.0
    odd_energy = (odd_part ** 2).sum(dim=-1)
    total_energy = (mv_data ** 2).sum(dim=-1).clamp(min=self.algebra.eps_sq)
    return (odd_energy / total_energy).mean().item()

detect_reflection_symmetries(mv_data)

Test reflection symmetry along each basis-vector direction.

For each basis vector e_i, reflects the data x' = -e_i x e_i^{-1} and compares the reflected distribution to the original.

Returns:

Type Description
List[Dict]

List of dicts {"direction": i, "score": float} sorted

List[Dict]

by score ascending (lower = more symmetric).

Source code in core/analysis/symmetry.py
def detect_reflection_symmetries(
    self, mv_data: torch.Tensor
) -> List[Dict]:
    """Test reflection symmetry along each basis-vector direction.

    For each basis vector ``e_i``, reflects the data
    ``x' = -e_i x e_i^{-1}`` and compares the reflected
    distribution to the original.

    Returns:
        List of dicts ``{"direction": i, "score": float}`` sorted
        by score ascending (lower = more symmetric).
    """
    n = self.algebra.n
    dim = self.algebra.dim
    N = mv_data.shape[0]

    # Build all n basis vectors: [n, dim]
    basis_vecs = torch.zeros(n, dim, device=mv_data.device, dtype=mv_data.dtype)
    blade_indices = (1 << torch.arange(n, device=mv_data.device)).long()
    basis_vecs[torch.arange(n, device=mv_data.device), blade_indices] = 1.0

    # Batch reflect: [n, N, dim]
    mv_exp = mv_data.unsqueeze(0).expand(n, N, dim)
    basis_exp = basis_vecs.unsqueeze(1).expand(n, N, dim)
    reflected = self.algebra.reflect(mv_exp, basis_exp)  # [n, N, dim]

    # Distributional distance for all directions at once
    orig_sorted = mv_data.sort(dim=0).values.unsqueeze(0).expand(n, N, dim)
    refl_sorted = reflected.sort(dim=1).values  # sort along N
    dist = ((orig_sorted - refl_sorted) ** 2).sum(dim=-1).mean(dim=-1)  # [n]
    norm = (mv_data ** 2).sum(dim=-1).mean().clamp(min=self.algebra.eps_sq)
    scores = dist / norm  # [n]

    results = [{"direction": i, "score": scores[i].item()} for i in range(n)]
    results.sort(key=lambda r: r["score"])
    return results

detect_continuous_symmetries(mv_data, commutator_result=None, threshold=None)

Estimate the dimension of the continuous symmetry group.

A bivector B_j generates a continuous symmetry if E[||[B_j, x_i]||] is near zero for all data points.

If a :class:CommutatorResult is provided, its exchange spectrum is used directly (eigenvalues near zero -> symmetry generators). Otherwise the computation is done from scratch.

Parameters:

Name Type Description Default
mv_data Tensor

[N, dim] multivector data.

required
commutator_result Optional[CommutatorResult]

Pre-computed commutator analysis.

None
threshold Optional[float]

Normalised commutator norm below which a bivector is considered a symmetry generator.

None

Returns:

Type Description
int

Estimated dimension of the continuous symmetry group.

Source code in core/analysis/symmetry.py
def detect_continuous_symmetries(
    self,
    mv_data: torch.Tensor,
    commutator_result: Optional[CommutatorResult] = None,
    threshold: Optional[float] = None,
) -> int:
    """Estimate the dimension of the continuous symmetry group.

    A bivector ``B_j`` generates a continuous symmetry if
    ``E[||[B_j, x_i]||]`` is near zero for all data points.

    If a :class:`CommutatorResult` is provided, its exchange
    spectrum is used directly (eigenvalues near zero -> symmetry
    generators).  Otherwise the computation is done from scratch.

    Args:
        mv_data: ``[N, dim]`` multivector data.
        commutator_result: Pre-computed commutator analysis.
        threshold: Normalised commutator norm below which a bivector
            is considered a symmetry generator.

    Returns:
        Estimated dimension of the continuous symmetry group.
    """
    if threshold is None:
        threshold = CONSTANTS.continuous_symmetry_threshold

    if commutator_result is not None:
        spec = commutator_result.exchange_spectrum
        if spec.numel() > 0:
            max_val = spec.abs().max().clamp(min=self.algebra.eps_sq)
            return int((spec.abs() / max_val < threshold).sum().item())

    # Compute from scratch: test each basis bivector
    n = self.algebra.n
    dim = self.algebra.dim
    N = mv_data.shape[0]

    ii, jj = torch.triu_indices(n, n, offset=1)
    bv_indices = ((1 << ii) | (1 << jj)).tolist()

    if not bv_indices:
        return 0

    n_bv = len(bv_indices)
    # Build all bivector bases: [n_bv, dim]
    bv_bases = torch.zeros(n_bv, dim, device=mv_data.device, dtype=mv_data.dtype)
    bv_idx_tensor = torch.tensor(bv_indices, dtype=torch.long, device=mv_data.device)
    bv_bases[torch.arange(n_bv, device=mv_data.device), bv_idx_tensor] = 1.0

    # Batch commutator: [n_bv, N, dim]
    bv_exp = bv_bases.unsqueeze(1).expand(n_bv, N, dim)
    mv_exp = mv_data.unsqueeze(0).expand(n_bv, N, dim)
    comm = self.algebra.commutator(bv_exp, mv_exp)

    comm_norms = comm.norm(dim=-1).mean(dim=-1)  # [n_bv]
    data_norm = mv_data.norm(dim=-1).mean().clamp(min=self.algebra.eps_sq)

    return int((comm_norms / data_norm < threshold).sum().item())

CommutatorAnalyzer

Analyze algebraic exchange properties via commutators.

The commutator [A, B] = AB - BA measures non-commutativity. In Clifford algebras, commutators of grade-1 elements yield grade-2 elements (bivectors), directly related to rotation planes.

Parameters:

Name Type Description Default
algebra CliffordAlgebra

:class:CliffordAlgebra instance.

required
max_bivectors int

Maximum number of bivectors for Lie-bracket closure analysis (guards combinatorial cost).

15
Source code in core/analysis/commutator.py
class CommutatorAnalyzer:
    """Analyze algebraic exchange properties via commutators.

    The commutator ``[A, B] = AB - BA`` measures non-commutativity.
    In Clifford algebras, commutators of grade-1 elements yield grade-2
    elements (bivectors), directly related to rotation planes.

    Args:
        algebra: :class:`CliffordAlgebra` instance.
        max_bivectors: Maximum number of bivectors for Lie-bracket
            closure analysis (guards combinatorial cost).
    """

    def __init__(
        self,
        algebra: CliffordAlgebra,
        max_bivectors: int = 15,
    ):
        self.algebra = algebra
        self.max_bivectors = max_bivectors

    def analyze(self, mv_data: torch.Tensor) -> CommutatorResult:
        """Full commutator analysis.

        Args:
            mv_data: Multivector data.  Accepted shapes:

                * ``[N, dim]`` -- single-channel batch.
                * ``[N, C, dim]`` -- multi-channel batch (channels
                  averaged).

        Returns:
            :class:`CommutatorResult`.
        """
        if mv_data.ndim == 3:
            flat = mv_data.mean(dim=1)
        else:
            flat = mv_data  # [N, dim]

        comm_matrix = self.commutativity_matrix(flat)
        ex_spectrum = self.exchange_spectrum(flat)
        mcn = self.mean_commutator_norm(flat)
        lie_struct = self.lie_bracket_closure(flat)

        return CommutatorResult(
            commutativity_matrix=comm_matrix,
            exchange_spectrum=ex_spectrum,
            mean_commutator_norm=mcn,
            lie_bracket_structure=lie_struct,
        )

    def commutativity_matrix(self, mv_data: torch.Tensor) -> torch.Tensor:
        """Pairwise commutativity index for input dimensions.

        For each pair ``(i, j)`` of the *n* basis-vector directions,
        computes ``E[||[x_i, x_j]||]`` where ``x_i`` is the data
        projected onto ``e_i``.

        Args:
            mv_data: ``[N, dim]`` multivector data.

        Returns:
            ``[n, n]`` symmetric matrix of commutativity indices.
        """
        n = self.algebra.n
        dim = self.algebra.dim
        device = mv_data.device
        dtype = mv_data.dtype

        # Build per-direction projections: keep only the e_i component
        g1_idx = (1 << torch.arange(n, device=device)).long()

        N = mv_data.shape[0]

        # Build all n projected multivectors: [n, N, dim]
        xi_all = torch.zeros(n, N, dim, device=device, dtype=dtype)
        coeffs = mv_data[:, g1_idx]  # [N, n]
        row_idx = torch.arange(n, device=device).unsqueeze(1).expand(n, N)
        batch_idx = torch.arange(N, device=device).unsqueeze(0).expand(n, N)
        col_idx = g1_idx.unsqueeze(1).expand(n, N)
        xi_all[row_idx, batch_idx, col_idx] = coeffs.T

        # All (i, j) pairs with i < j
        i_idx, j_idx = torch.triu_indices(n, n, offset=1, device=device)

        # Batched commutator: [n_pairs, N, dim]
        comm = self.algebra.commutator(xi_all[i_idx], xi_all[j_idx])
        vals = comm.norm(dim=-1).mean(dim=-1)  # [n_pairs]

        matrix = torch.zeros(n, n, device=device, dtype=dtype)
        matrix[i_idx, j_idx] = vals
        matrix[j_idx, i_idx] = vals

        return matrix

    def exchange_spectrum(self, mv_data: torch.Tensor) -> torch.Tensor:
        """Eigenvalue magnitudes of the adjoint operator ``ad_mu``.

        Constructs the explicit matrix for ``ad_mu(x) = [mu, x]``
        where ``mu = E[x]`` and diagonalises it.

        Args:
            mv_data: ``[N, dim]`` multivector data.

        Returns:
            Eigenvalue magnitudes sorted descending.  Returns an
            empty tensor if the algebra is too large.
        """
        n = self.algebra.n
        dim = self.algebra.dim
        device = mv_data.device
        dtype = mv_data.dtype

        if n > CONSTANTS.adjoint_max_n:
            return torch.tensor([], device=device, dtype=dtype)

        mu = mv_data.mean(dim=0)  # [dim]
        basis = torch.eye(dim, device=device, dtype=dtype)

        # Batched commutator: [dim, dim] x [dim, dim] -> [dim, dim], transpose
        ad_mu = self.algebra.commutator(
            mu.unsqueeze(0).expand(dim, -1), basis
        ).T

        eigvals = torch.linalg.eigvals(ad_mu)  # complex
        magnitudes = eigvals.abs()
        return magnitudes.sort(descending=True).values

    def mean_commutator_norm(self, mv_data: torch.Tensor) -> float:
        """``E[||[x_i, mu]||_2]`` -- scalar non-commutativity summary.

        Generalises the *Geometric Uncertainty Index* from
        :func:`core.analysis.compute_uncertainty_and_alignment`.

        Args:
            mv_data: ``[N, dim]`` multivector data.

        Returns:
            Mean commutator norm (float).
        """
        mu = mv_data.mean(dim=0, keepdim=True)  # [1, dim]
        comm = self.algebra.commutator(mv_data, mu.expand_as(mv_data))
        return comm.norm(dim=-1).mean().item()

    def lie_bracket_closure(self, mv_data: torch.Tensor) -> Dict:
        """Test whether the data bivectors close under the Lie bracket.

        Selects the top-*k* energetic bivectors from the batch,
        computes all pairwise brackets ``[B_i, B_j]``, and measures
        how well the results lie in the span of the original set.

        Args:
            mv_data: ``[N, dim]`` multivector data.

        Returns:
            Dict with ``"structure_constants"`` (``[k, k, k]``),
            ``"closure_error"`` (scalar), and ``"basis_indices"`` (list
            of multivector-coefficient indices of the chosen bivectors).
        """
        dim = self.algebra.dim
        n = self.algebra.n
        device = mv_data.device
        dtype = mv_data.dtype

        if n < 2:
            return {
                "structure_constants": torch.zeros(0, 0, 0, device=device),
                "closure_error": 0.0,
                "basis_indices": [],
            }

        # Extract grade-2 part of mean per-sample
        bv_data = self.algebra.grade_projection(mv_data, 2)  # [N, dim]
        mean_bv = bv_data.mean(dim=0)  # [dim]

        # Identify bivector basis indices
        ii, jj = torch.triu_indices(n, n, offset=1)
        bv_blade_indices = ((1 << ii) | (1 << jj)).tolist()

        if not bv_blade_indices:
            return {
                "structure_constants": torch.zeros(0, 0, 0, device=device),
                "closure_error": 0.0,
                "basis_indices": [],
            }

        # Pick top-k by energy in the mean bivector
        bv_idx_tensor = torch.tensor(bv_blade_indices, dtype=torch.long, device=device)
        energies = mean_bv[bv_idx_tensor].abs()
        k = min(self.max_bivectors, len(bv_blade_indices))
        topk_pos = energies.topk(k).indices.tolist()

        selected_indices = [bv_blade_indices[p] for p in topk_pos]

        # Build basis bivector multivectors
        B = torch.zeros(k, dim, device=device, dtype=dtype)
        sel_tensor = torch.tensor(selected_indices, dtype=torch.long, device=device)
        B[torch.arange(k, device=device), sel_tensor] = 1.0

        # Compute structure constants c_{a,b,c} such that [B_a, B_b] ~= Sum_c c_{abc} B_c
        a_idx, b_idx = torch.triu_indices(k, k, offset=1, device=device)

        # Batched commutator and grade-2 projection
        brackets = self.algebra.commutator(B[a_idx], B[b_idx])  # [n_pairs, dim]
        brackets_bv = self.algebra.grade_projection(brackets, 2)  # [n_pairs, dim]

        # Project onto basis: coeffs[p, c] = <bracket_bv_p, B_c>
        coeffs = brackets_bv @ B.T  # [n_pairs, k]

        structure = torch.zeros(k, k, k, device=device, dtype=dtype)
        structure[a_idx, b_idx, :] = coeffs
        structure[b_idx, a_idx, :] = -coeffs  # antisymmetry

        # Closure errors: residual norm / bracket norm
        projected = coeffs @ B  # [n_pairs, dim]
        residuals = brackets_bv - projected
        res_norms = residuals.norm(dim=-1)  # [n_pairs]
        bracket_norms = brackets_bv.norm(dim=-1)  # [n_pairs]

        valid = bracket_norms > self.algebra.eps_sq
        if valid.any():
            closure_error = (res_norms[valid] / bracket_norms[valid]).mean().item()
        else:
            closure_error = 0.0

        return {
            "structure_constants": structure,
            "closure_error": closure_error,
            "basis_indices": selected_indices,
        }

analyze(mv_data)

Full commutator analysis.

Parameters:

Name Type Description Default
mv_data Tensor

Multivector data. Accepted shapes:

  • [N, dim] -- single-channel batch.
  • [N, C, dim] -- multi-channel batch (channels averaged).
required

Returns:

Type Description
CommutatorResult

class:CommutatorResult.

Source code in core/analysis/commutator.py
def analyze(self, mv_data: torch.Tensor) -> CommutatorResult:
    """Full commutator analysis.

    Args:
        mv_data: Multivector data.  Accepted shapes:

            * ``[N, dim]`` -- single-channel batch.
            * ``[N, C, dim]`` -- multi-channel batch (channels
              averaged).

    Returns:
        :class:`CommutatorResult`.
    """
    if mv_data.ndim == 3:
        flat = mv_data.mean(dim=1)
    else:
        flat = mv_data  # [N, dim]

    comm_matrix = self.commutativity_matrix(flat)
    ex_spectrum = self.exchange_spectrum(flat)
    mcn = self.mean_commutator_norm(flat)
    lie_struct = self.lie_bracket_closure(flat)

    return CommutatorResult(
        commutativity_matrix=comm_matrix,
        exchange_spectrum=ex_spectrum,
        mean_commutator_norm=mcn,
        lie_bracket_structure=lie_struct,
    )

commutativity_matrix(mv_data)

Pairwise commutativity index for input dimensions.

For each pair (i, j) of the n basis-vector directions, computes E[||[x_i, x_j]||] where x_i is the data projected onto e_i.

Parameters:

Name Type Description Default
mv_data Tensor

[N, dim] multivector data.

required

Returns:

Type Description
Tensor

[n, n] symmetric matrix of commutativity indices.

Source code in core/analysis/commutator.py
def commutativity_matrix(self, mv_data: torch.Tensor) -> torch.Tensor:
    """Pairwise commutativity index for input dimensions.

    For each pair ``(i, j)`` of the *n* basis-vector directions,
    computes ``E[||[x_i, x_j]||]`` where ``x_i`` is the data
    projected onto ``e_i``.

    Args:
        mv_data: ``[N, dim]`` multivector data.

    Returns:
        ``[n, n]`` symmetric matrix of commutativity indices.
    """
    n = self.algebra.n
    dim = self.algebra.dim
    device = mv_data.device
    dtype = mv_data.dtype

    # Build per-direction projections: keep only the e_i component
    g1_idx = (1 << torch.arange(n, device=device)).long()

    N = mv_data.shape[0]

    # Build all n projected multivectors: [n, N, dim]
    xi_all = torch.zeros(n, N, dim, device=device, dtype=dtype)
    coeffs = mv_data[:, g1_idx]  # [N, n]
    row_idx = torch.arange(n, device=device).unsqueeze(1).expand(n, N)
    batch_idx = torch.arange(N, device=device).unsqueeze(0).expand(n, N)
    col_idx = g1_idx.unsqueeze(1).expand(n, N)
    xi_all[row_idx, batch_idx, col_idx] = coeffs.T

    # All (i, j) pairs with i < j
    i_idx, j_idx = torch.triu_indices(n, n, offset=1, device=device)

    # Batched commutator: [n_pairs, N, dim]
    comm = self.algebra.commutator(xi_all[i_idx], xi_all[j_idx])
    vals = comm.norm(dim=-1).mean(dim=-1)  # [n_pairs]

    matrix = torch.zeros(n, n, device=device, dtype=dtype)
    matrix[i_idx, j_idx] = vals
    matrix[j_idx, i_idx] = vals

    return matrix

exchange_spectrum(mv_data)

Eigenvalue magnitudes of the adjoint operator ad_mu.

Constructs the explicit matrix for ad_mu(x) = [mu, x] where mu = E[x] and diagonalises it.

Parameters:

Name Type Description Default
mv_data Tensor

[N, dim] multivector data.

required

Returns:

Type Description
Tensor

Eigenvalue magnitudes sorted descending. Returns an

Tensor

empty tensor if the algebra is too large.

Source code in core/analysis/commutator.py
def exchange_spectrum(self, mv_data: torch.Tensor) -> torch.Tensor:
    """Eigenvalue magnitudes of the adjoint operator ``ad_mu``.

    Constructs the explicit matrix for ``ad_mu(x) = [mu, x]``
    where ``mu = E[x]`` and diagonalises it.

    Args:
        mv_data: ``[N, dim]`` multivector data.

    Returns:
        Eigenvalue magnitudes sorted descending.  Returns an
        empty tensor if the algebra is too large.
    """
    n = self.algebra.n
    dim = self.algebra.dim
    device = mv_data.device
    dtype = mv_data.dtype

    if n > CONSTANTS.adjoint_max_n:
        return torch.tensor([], device=device, dtype=dtype)

    mu = mv_data.mean(dim=0)  # [dim]
    basis = torch.eye(dim, device=device, dtype=dtype)

    # Batched commutator: [dim, dim] x [dim, dim] -> [dim, dim], transpose
    ad_mu = self.algebra.commutator(
        mu.unsqueeze(0).expand(dim, -1), basis
    ).T

    eigvals = torch.linalg.eigvals(ad_mu)  # complex
    magnitudes = eigvals.abs()
    return magnitudes.sort(descending=True).values

mean_commutator_norm(mv_data)

E[||[x_i, mu]||_2] -- scalar non-commutativity summary.

Generalises the Geometric Uncertainty Index from :func:core.analysis.compute_uncertainty_and_alignment.

Parameters:

Name Type Description Default
mv_data Tensor

[N, dim] multivector data.

required

Returns:

Type Description
float

Mean commutator norm (float).

Source code in core/analysis/commutator.py
def mean_commutator_norm(self, mv_data: torch.Tensor) -> float:
    """``E[||[x_i, mu]||_2]`` -- scalar non-commutativity summary.

    Generalises the *Geometric Uncertainty Index* from
    :func:`core.analysis.compute_uncertainty_and_alignment`.

    Args:
        mv_data: ``[N, dim]`` multivector data.

    Returns:
        Mean commutator norm (float).
    """
    mu = mv_data.mean(dim=0, keepdim=True)  # [1, dim]
    comm = self.algebra.commutator(mv_data, mu.expand_as(mv_data))
    return comm.norm(dim=-1).mean().item()

lie_bracket_closure(mv_data)

Test whether the data bivectors close under the Lie bracket.

Selects the top-k energetic bivectors from the batch, computes all pairwise brackets [B_i, B_j], and measures how well the results lie in the span of the original set.

Parameters:

Name Type Description Default
mv_data Tensor

[N, dim] multivector data.

required

Returns:

Type Description
Dict

Dict with "structure_constants" ([k, k, k]),

Dict

"closure_error" (scalar), and "basis_indices" (list

Dict

of multivector-coefficient indices of the chosen bivectors).

Source code in core/analysis/commutator.py
def lie_bracket_closure(self, mv_data: torch.Tensor) -> Dict:
    """Test whether the data bivectors close under the Lie bracket.

    Selects the top-*k* energetic bivectors from the batch,
    computes all pairwise brackets ``[B_i, B_j]``, and measures
    how well the results lie in the span of the original set.

    Args:
        mv_data: ``[N, dim]`` multivector data.

    Returns:
        Dict with ``"structure_constants"`` (``[k, k, k]``),
        ``"closure_error"`` (scalar), and ``"basis_indices"`` (list
        of multivector-coefficient indices of the chosen bivectors).
    """
    dim = self.algebra.dim
    n = self.algebra.n
    device = mv_data.device
    dtype = mv_data.dtype

    if n < 2:
        return {
            "structure_constants": torch.zeros(0, 0, 0, device=device),
            "closure_error": 0.0,
            "basis_indices": [],
        }

    # Extract grade-2 part of mean per-sample
    bv_data = self.algebra.grade_projection(mv_data, 2)  # [N, dim]
    mean_bv = bv_data.mean(dim=0)  # [dim]

    # Identify bivector basis indices
    ii, jj = torch.triu_indices(n, n, offset=1)
    bv_blade_indices = ((1 << ii) | (1 << jj)).tolist()

    if not bv_blade_indices:
        return {
            "structure_constants": torch.zeros(0, 0, 0, device=device),
            "closure_error": 0.0,
            "basis_indices": [],
        }

    # Pick top-k by energy in the mean bivector
    bv_idx_tensor = torch.tensor(bv_blade_indices, dtype=torch.long, device=device)
    energies = mean_bv[bv_idx_tensor].abs()
    k = min(self.max_bivectors, len(bv_blade_indices))
    topk_pos = energies.topk(k).indices.tolist()

    selected_indices = [bv_blade_indices[p] for p in topk_pos]

    # Build basis bivector multivectors
    B = torch.zeros(k, dim, device=device, dtype=dtype)
    sel_tensor = torch.tensor(selected_indices, dtype=torch.long, device=device)
    B[torch.arange(k, device=device), sel_tensor] = 1.0

    # Compute structure constants c_{a,b,c} such that [B_a, B_b] ~= Sum_c c_{abc} B_c
    a_idx, b_idx = torch.triu_indices(k, k, offset=1, device=device)

    # Batched commutator and grade-2 projection
    brackets = self.algebra.commutator(B[a_idx], B[b_idx])  # [n_pairs, dim]
    brackets_bv = self.algebra.grade_projection(brackets, 2)  # [n_pairs, dim]

    # Project onto basis: coeffs[p, c] = <bracket_bv_p, B_c>
    coeffs = brackets_bv @ B.T  # [n_pairs, k]

    structure = torch.zeros(k, k, k, device=device, dtype=dtype)
    structure[a_idx, b_idx, :] = coeffs
    structure[b_idx, a_idx, :] = -coeffs  # antisymmetry

    # Closure errors: residual norm / bracket norm
    projected = coeffs @ B  # [n_pairs, dim]
    residuals = brackets_bv - projected
    res_norms = residuals.norm(dim=-1)  # [n_pairs]
    bracket_norms = brackets_bv.norm(dim=-1)  # [n_pairs]

    valid = bracket_norms > self.algebra.eps_sq
    if valid.any():
        closure_error = (res_norms[valid] / bracket_norms[valid]).mean().item()
    else:
        closure_error = 0.0

    return {
        "structure_constants": structure,
        "closure_error": closure_error,
        "basis_indices": selected_indices,
    }

EffectiveDimensionAnalyzer

Estimate effective intrinsic dimensionality of data.

Uses the covariance eigenvalue spectrum together with the participation ratio (random matrix theory) and the broken-stick model (MacArthur 1957) for significance testing.

Parameters:

Name Type Description Default
device str

Torch device string.

'cpu'
dtype dtype

Floating-point dtype used for covariance computations. Defaults to torch.float32. Pass torch.float64 for higher-precision analyses.

float32
k_local int

Number of neighbours for local-dimension estimation.

20
energy_threshold float

Minimum normalised eigenvalue to count as active.

0.05
Source code in core/analysis/dimension.py
class EffectiveDimensionAnalyzer:
    """Estimate effective intrinsic dimensionality of data.

    Uses the covariance eigenvalue spectrum together with the
    participation ratio (random matrix theory) and the broken-stick
    model (MacArthur 1957) for significance testing.

    Args:
        device: Torch device string.
        dtype: Floating-point dtype used for covariance computations.
            Defaults to ``torch.float32``.  Pass ``torch.float64`` for
            higher-precision analyses.
        k_local: Number of neighbours for local-dimension estimation.
        energy_threshold: Minimum normalised eigenvalue to count as
            active.
    """

    def __init__(
        self,
        device: str = "cpu",
        dtype: torch.dtype = torch.float32,
        k_local: int = 20,
        energy_threshold: float = 0.05,
    ):
        self.device = device
        self.dtype = dtype
        self.k_local = k_local
        self.energy_threshold = energy_threshold
        self._eps_sq: float = float(torch.finfo(dtype).eps ** 2)

    def analyze(self, data: torch.Tensor) -> DimensionResult:
        """Full effective-dimension analysis.

        Args:
            data: ``[N, D]`` raw data tensor.

        Returns:
            :class:`DimensionResult` with eigenvalues, participation
            ratio, broken-stick threshold, and optional local dims.
        """
        data = data.to(device=self.device, dtype=self.dtype)
        N, D = data.shape

        eigenvalues = self._covariance_eigenvalues(data)  # descending
        pr = self._participation_ratio(eigenvalues)

        total = eigenvalues.sum()
        if total > 0:
            evr = eigenvalues / total
        else:
            evr = torch.zeros_like(eigenvalues)

        bs_threshold = self._broken_stick_threshold(eigenvalues, D)

        local_dims = None
        if N < 5000 and N > self.k_local + 1:
            local_dims = self._local_dimensions(data, self.k_local)

        intrinsic_dim = max(1, bs_threshold)

        return DimensionResult(
            intrinsic_dim=intrinsic_dim,
            participation_ratio=pr,
            eigenvalues=eigenvalues,
            broken_stick_threshold=bs_threshold,
            explained_variance_ratio=evr,
            local_dims=local_dims,
        )

    def reduce(self, data: torch.Tensor, target_dim: int) -> torch.Tensor:
        """PCA projection to *target_dim* dimensions.

        Args:
            data: ``[N, D]`` tensor.
            target_dim: Target dimensionality.

        Returns:
            ``[N, target_dim]`` projected tensor.
        """
        data = data.to(device=self.device, dtype=self.dtype)
        mean = data.mean(dim=0, keepdim=True)
        centered = data - mean
        # Economy SVD
        U, S, Vh = torch.linalg.svd(centered, full_matrices=False)
        return (U[:, :target_dim] * S[:target_dim].unsqueeze(0))

    @staticmethod
    def _covariance_eigenvalues(data: torch.Tensor) -> torch.Tensor:
        """Eigenvalues of the sample covariance, sorted descending."""
        N = data.shape[0]
        mean = data.mean(dim=0, keepdim=True)
        centered = data - mean
        cov = (centered.T @ centered) / max(N - 1, 1)
        # eigh returns ascending; flip
        eigvals = torch.linalg.eigvalsh(cov)
        return eigvals.flip(0).clamp(min=0.0)

    def _participation_ratio(self, eigenvalues: torch.Tensor) -> float:
        """``(Sum lam)^2 / Sum lam^2`` -- smooth dimensionality estimator."""
        s1 = eigenvalues.sum()
        s2 = (eigenvalues ** 2).sum()
        if s2 < self._eps_sq:
            return 0.0
        return (s1 ** 2 / s2).item()

    @staticmethod
    def _broken_stick(d: int, dtype: torch.dtype = torch.float32) -> torch.Tensor:
        """Expected eigenvalues under the broken-stick null model.

        The *k*-th expected value is ``(1/d) * Sum_{j=k+1}^{d} 1/j``
        (0-indexed).
        """
        inv = 1.0 / torch.arange(1, d + 1, dtype=torch.float64)
        # Reverse cumulative sum gives Sum_{j=k+1}^{d} 1/j for 0-indexed k
        expected = inv.flip(0).cumsum(0).flip(0) / d
        return expected.to(dtype=dtype)

    def _broken_stick_threshold(
        self, eigenvalues: torch.Tensor, d: int
    ) -> int:
        """Number of components exceeding the broken-stick null."""
        if d <= 0:
            return 0
        expected = self._broken_stick(d, dtype=eigenvalues.dtype).to(
            eigenvalues.device
        )
        total = eigenvalues.sum()
        if total < self._eps_sq:
            return 0
        normed = eigenvalues[:d] / total
        return int((normed > expected[:len(normed)]).sum().item())

    def _local_dimensions(
        self, data: torch.Tensor, k: int
    ) -> torch.Tensor:
        """Per-point local dimension via neighbourhood PCA."""
        dists = torch.cdist(data, data)  # [N, N]
        # k+1 because the closest point is itself
        _, knn_idx = dists.topk(k + 1, largest=False, dim=1)
        knn_idx = knn_idx[:, 1:]  # drop self

        # Batched neighborhood PCA
        all_nbrs = data[knn_idx]  # [N, k, D]
        centered = all_nbrs - all_nbrs.mean(dim=1, keepdim=True)
        k_count = centered.shape[1]
        cov = torch.bmm(centered.transpose(1, 2), centered) / max(k_count - 1, 1)
        eigvals = torch.linalg.eigvalsh(cov).flip(-1).clamp(min=0.0)  # [N, D] desc

        # Vectorized participation ratio
        s1 = eigvals.sum(dim=-1)
        s2 = (eigvals ** 2).sum(dim=-1)
        local_dims = torch.where(s2 > self._eps_sq, s1 ** 2 / s2, torch.zeros_like(s1))

        return local_dims

analyze(data)

Full effective-dimension analysis.

Parameters:

Name Type Description Default
data Tensor

[N, D] raw data tensor.

required

Returns:

Type Description
DimensionResult

class:DimensionResult with eigenvalues, participation

DimensionResult

ratio, broken-stick threshold, and optional local dims.

Source code in core/analysis/dimension.py
def analyze(self, data: torch.Tensor) -> DimensionResult:
    """Full effective-dimension analysis.

    Args:
        data: ``[N, D]`` raw data tensor.

    Returns:
        :class:`DimensionResult` with eigenvalues, participation
        ratio, broken-stick threshold, and optional local dims.
    """
    data = data.to(device=self.device, dtype=self.dtype)
    N, D = data.shape

    eigenvalues = self._covariance_eigenvalues(data)  # descending
    pr = self._participation_ratio(eigenvalues)

    total = eigenvalues.sum()
    if total > 0:
        evr = eigenvalues / total
    else:
        evr = torch.zeros_like(eigenvalues)

    bs_threshold = self._broken_stick_threshold(eigenvalues, D)

    local_dims = None
    if N < 5000 and N > self.k_local + 1:
        local_dims = self._local_dimensions(data, self.k_local)

    intrinsic_dim = max(1, bs_threshold)

    return DimensionResult(
        intrinsic_dim=intrinsic_dim,
        participation_ratio=pr,
        eigenvalues=eigenvalues,
        broken_stick_threshold=bs_threshold,
        explained_variance_ratio=evr,
        local_dims=local_dims,
    )

reduce(data, target_dim)

PCA projection to target_dim dimensions.

Parameters:

Name Type Description Default
data Tensor

[N, D] tensor.

required
target_dim int

Target dimensionality.

required

Returns:

Type Description
Tensor

[N, target_dim] projected tensor.

Source code in core/analysis/dimension.py
def reduce(self, data: torch.Tensor, target_dim: int) -> torch.Tensor:
    """PCA projection to *target_dim* dimensions.

    Args:
        data: ``[N, D]`` tensor.
        target_dim: Target dimensionality.

    Returns:
        ``[N, target_dim]`` projected tensor.
    """
    data = data.to(device=self.device, dtype=self.dtype)
    mean = data.mean(dim=0, keepdim=True)
    centered = data - mean
    # Economy SVD
    U, S, Vh = torch.linalg.svd(centered, full_matrices=False)
    return (U[:, :target_dim] * S[:target_dim].unsqueeze(0))