Skip to content

Chained live-query with orderBy misses same-key value updates (TopKState.processElement) #1489

@vibl

Description

@vibl

Summary

A chained live-query Q3 = q.from({ row: Q2 }).orderBy(row.name) over an upstream live-query collection Q2 = q.from(A).leftJoin(B).select({ ...A, fromB }) receives the initial snapshot but does not receive subsequent updates when the right-hand side of Q2's leftJoin arrives after the initial seed. Q2 updates correctly; the drop is in Q3.

Root cause: TopKState.processElement at packages/db-ivm/src/operators/topKState.ts:37-53 only emits a topK change when per-key multiplicity flips 0 ↔ positive. A same-key retract + insert pair with different values nets positive-positive throughout, so the operator returns { moveIn: null, moveOut: null } on both messages. The stored value in the underlying TopKArray never gets swapped for the new one, and no downstream output is emitted.

Reproduction

import { describe, expect, test } from 'vitest';
import { createCollection, createLiveQueryCollection, coalesce, eq } from '@tanstack/db';

type Pub = { id: string; name: string; authorSubId: string | null };
type Author = { id: string; substackId: string; name: string };

const initialPubs: Pub[] = [
  { id: 'pub_a', name: 'Alpha', authorSubId: 'sub_1' },
  { id: 'pub_b', name: 'Bravo', authorSubId: 'sub_2' }
];

// Minimal in-memory collection helper.
function makeInMemory<T extends object>(id: string, getKey: (x: T) => string, initial: T[] = []) {
  let begin!: () => void;
  let write!: (op: { type: 'insert' | 'delete' | 'update'; value: T }) => void;
  let commit!: () => void;
  const col = createCollection<T, string>({
    id,
    getKey,
    startSync: true,
    sync: {
      sync: (p) => {
        begin = p.begin;
        write = p.write;
        commit = p.commit;
        begin();
        for (const item of initial) write({ type: 'insert', value: item });
        commit();
        p.markReady();
      }
    }
  });
  return Object.assign(col, {
    insert: (value: T) => {
      begin();
      write({ type: 'insert', value });
      commit();
    }
  });
}

describe('chained live-query over leftJoin misses upstream updates', () => {
  test('author inserted after seed — Layer 3 should see it (FAILS)', async () => {
    const pubs = makeInMemory<Pub>('pubs', (p) => p.id, initialPubs);
    const authors = makeInMemory<Author>('authors', (a) => a.id, []);

    const joined = createLiveQueryCollection({
      id: 'joined',
      startSync: true,
      query: (q) =>
        q
          .from({ pub: pubs })
          .leftJoin({ a: authors }, ({ pub, a }) => eq(a.substackId, pub.authorSubId))
          .select(({ pub, a }) => ({ ...pub, authorName: coalesce(a?.name, '') }))
    });

    const chained = createLiveQueryCollection({
      id: 'chained',
      startSync: true,
      query: (q) => q.from({ row: joined }).orderBy(({ row }) => row.name, 'asc')
    });

    await new Promise((r) => setTimeout(r, 0));

    authors.insert({ id: 'a_1', substackId: 'sub_1', name: 'Alice' });
    await new Promise((r) => setTimeout(r, 0));

    // Layer 2 sees the author (passes):
    expect(joined.toArray.find((r) => r.id === 'pub_a')?.authorName).toBe('Alice');
    // Layer 3 does not (FAILS):
    expect(chained.toArray.find((r) => r.id === 'pub_a')?.authorName).toBe('Alice');
  });
});

Bisect: removing .orderBy(...) from chained makes the bug disappear

Dropping .orderBy(({ row }) => row.name, 'asc') from the chained query removes the topKWithFractionalIndex operator from the D2 graph, and the upstream update propagates correctly. This pinpoints the drop site to the orderBy/topK path.

Trace (instrumented run)

After adding logs through the pipeline, the sequence on author insert is:

[DBG sendChanges alias=author]           insert 'a_1'
[DBG D2 output id=joined] count=2
  +1 '[[pub_a,a_1],undefined]'  authorName='Alice'
  −1 '[[pub_a,undefined],undefined]'  authorName=''
[DBG applyChanges id=joined] × 2          ← Layer 2 collection updates OK
[DBG sendChanges alias=row] [ +1 new, −1 old ]   ← Layer 3 subscriber receives pair
[DBG filterDuplicateInserts PASS]         (not the drop)
[DBG sendChangesToInput]                  ← both collapse to key='pub_a' via getKey(row)
  { key:'pub_a', mult:+1, authorName:'Alice' }
  { key:'pub_a', mult:−1, authorName:''      }
[DBG TopKFractional.run] msgs=1 items=2
  key=pub_a mult=+1  value.row.authorName='Alice'
  key=pub_a mult=−1  value.row.authorName=''
[DBG TopKFractional.run] result.length=0  ← DROP SITE
[DBG Consolidate.run] inputMessages=0 itemCount=0
(no D2 output for id=chained)

In TopKState.processElement:

  • +1 at pub_a: oldMult=1, newMult=2 → both > 0{moveIn:null, moveOut:null}
  • −1 at pub_a: oldMult=2, newMult=1 → both > 0{moveIn:null, moveOut:null}

The topK's stored value at pub_a remains oldValue forever.

Why the same-key collision happens

Layer 2's internal D2 keys are composite ([[pubKey, authorKey], evalKey] — produced by processJoinResults in packages/db/src/query/compiler/joins.ts). Layer 2 emits the update correctly as a retract of the old-composite-key + insert of the new-composite-key.

Layer 3's subscription receives those composite-keyed messages. sendChangesToInput in packages/db/src/query/live/utils.ts then calls getKey(change.value) on the projected row to build the D2 input. Because the .select() uses ...pub spread, the projected row carries pub.id at top level and the downstream collection's default getKey resolves to 'pub_a' for both the old and new rows. The composite distinction is lost before reaching the topK operator.

Suggested fixes (in increasing scope)

  1. Detect same-key value changes in TopKState.processElement. When oldMult > 0 && newMult > 0, compare the incoming value against the stored one at key. If different, emit moveOut(oldValue) + moveIn(newValue). Targeted fix, mirrors how applyChanges at the collection level already merges retract + insert into an update.
  2. Insert consolidate() before topKWithFractionalIndex. MultiSet.#consolidateKeyed uses distinct valueIds for distinct value references (via WeakMap), so +1(oldVal) + −1(oldVal) at the same key does not cancel — but +1(newVal) and −1(oldVal) would both reach topK as net messages, and the −1(oldVal) would flip oldVal's multiplicity to 0 while the +1(newVal) would flip newVal's to 1. The multiplicity map keyed by K alone still collapses these; this option requires keying by (K, V) as well.
  3. Key #multiplicities by (K, V) using the same identity the underlying TopK uses. Larger surgical change, aligns with D2 consolidation semantics.

Option 1 is smallest. Option 3 is most correct.

Related prior work

Environment

  • @tanstack/db@0.6.5
  • Node 22.x, Vitest 3.2.4
  • No Zero / Svelte / adapter involvement — pure TanStack DB in Node test env.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions