Skip to content

feat: implement Handler<M> API with envelope-based mailbox#148

Open
ElFantasma wants to merge 6 commits intomainfrom
feat/handler-api-v0.5
Open

feat: implement Handler<M> API with envelope-based mailbox#148
ElFantasma wants to merge 6 commits intomainfrom
feat/handler-api-v0.5

Conversation

@ElFantasma
Copy link
Collaborator

@ElFantasma ElFantasma commented Feb 9, 2026

Summary

Implements the v0.5 API redesign (issues #144, #145) replacing the monolithic Actor trait with a per-message Handler<M> pattern:

  • Message trait + messages! macro — each message is its own struct with an associated Result type. TT-muncher macro supports mixing unit and field-based messages in a single invocation.
  • Handler<M> trait — one impl per message type, replacing single handle_request/handle_message with enum matching
  • Receiver<M> trait + Recipient<M> — object-safe type-erased messaging for cross-actor communication (solves circular deps)
  • Envelope-based dispatch — internal type erasure so the actor mailbox holds Box<dyn Envelope<A>>
  • Simplified Actor trait — just started()/stopped() lifecycle hooks
  • Context<A> — passed to handlers instead of ActorRef<A>, with send(), request(), send_request(), stop(), and Debug

Both tasks (async) and threads (sync) modules fully rewritten. All 11 examples migrated. 28 tests pass.

API consistency

  • send_message_on, send_after, send_interval all take Context<A> (use actor_ref.context() from outside)
  • send_request has a 5s default timeout in both tasks and threads modules
  • send_request free function (for Recipient<M>) takes an explicit timeout in both modules
  • ctx.stop() in started() correctly exits the actor loop without blocking on recv

Test plan

  • cargo build --workspace compiles clean
  • cargo test --workspace — 28/28 tests pass
  • Bank example works with typed request/reply (tasks + threads)
  • Ping-pong example works with Recipient<M> (no circular deps)
  • Signal test examples use send_message_on with new API
  • Updater examples use send_interval/send_after with Context<A>
  • Blocking/busy examples use Backend::Thread with new API
  • Name server uses send_request (no racy send + sleep)

@github-actions
Copy link

github-actions bot commented Feb 9, 2026

🤖 Codex Code Review

Findings

  • Critical: stop() doesn’t wake a blocked actor loop (shutdown can hang)
    Context::stop() only cancels the token, but both async and thread actor loops block on recv without selecting on cancellation. If the actor is idle, join() can wait forever and timers/ctrl‑c shutdown won’t stop the actor promptly.
    Affects concurrency/src/tasks/actor.rs:323339 and concurrency/src/threads/actor.rs:291307.
    Suggestion:

    • async: select! on cancellation_token.cancelled() and rx.recv(); exit on cancel.
    • threads: use CancellationToken::on_cancel to wake a Condvar/channel, or loop with recv_timeout and check is_cancelled().
  • High: panic in started()/stopped() can prevent completion signal
    In tasks backend, completion_tx.send(true) is only executed if run_actor returns normally. A panic in started() or stopped() will skip it, causing ActorRef::join() to hang.
    Affects concurrency/src/tasks/actor.rs:288291, 321, 343.
    Suggestion: wrap started()/stopped() in catch_unwind, or use a guard that always signals completion on drop (like the threads backend’s CompletionGuard).

  • Medium: cancellation checked only after handling a message
    After ctx.stop(), the actor still processes the next queued message before noticing cancellation. This may violate “stop means no more handling” expectations.
    concurrency/src/tasks/actor.rs:326335 and concurrency/src/threads/actor.rs:295304.
    Suggestion: check cancellation_token.is_cancelled() before handling an envelope.

  • Low (example): ping‑pong startup race can drop the loop
    SetConsumer is sent asynchronously, then Ping is sent immediately. If Ping is processed before SetConsumer, Producer drops the ping‑pong chain because consumer is None.
    examples/ping_pong/src/main.rs:3034.
    Suggestion: make SetConsumer a request/ack or send Ping only after a confirmed setup.

Notes

  • No tests were run locally.

If you want, I can propose concrete patches for the cancellation/wakeup and completion-guard issues.


Automated review by OpenAI Codex · custom prompt

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 9, 2026

Greptile Overview

Greptile Summary

This PR implements the v0.5 actor API redesign by introducing a per-message Handler<M> model with a Message trait (and messages! macro), and moving both the async (tasks) and sync (threads) runtimes to an envelope-based mailbox (Box<dyn Envelope<A>>) to enable type-erased dispatch.

The new Receiver<M>/Recipient<M> abstraction provides object-safe, type-erased cross-actor messaging while preserving typed request/reply via M::Result. Examples and tests are migrated to the new API.

Key issue to address before merge: in the async runtime, Context::stop() only cancels a token, but the actor loop blocks on rx.recv().await without selecting against cancellation, so an externally-triggered stop can leave the actor stuck idle and join() never completing unless another message arrives.

Confidence Score: 3/5

  • This PR is mostly safe to merge, but has a real shutdown/join hang edge case in the async runtime that should be fixed first.
  • Large refactor is cohesive and tests/examples are updated, but tasks::run_actor awaits mailbox recv without reacting to cancellation, so an external stop can leave actors stuck idle and join() never completing unless another message arrives.
  • concurrency/src/tasks/actor.rs

Important Files Changed

Filename Overview
concurrency/src/error.rs Adds ActorError enum with conversions from task/thread mpsc SendError and basic std::error::Error test.
concurrency/src/lib.rs Updates crate exports to new modules (error/message/tasks/threads) for Handler-based API.
concurrency/src/message.rs Introduces Message trait and messages! macro generating message structs; macro splits unit vs field variants.
concurrency/src/tasks/actor.rs Implements async actor runtime with envelope mailbox, Context/ActorRef/Recipient; found bug where cancel signal never races because same future is pinned and moved into select.
concurrency/src/tasks/mod.rs Re-exports tasks actor API plus stream/time helpers for async backend rewrite.
concurrency/src/tasks/stream.rs Rewrites stream utilities to integrate with Context/ActorRef and Handler-based messaging.
concurrency/src/tasks/stream_tests.rs Updates stream tests to new typed message API for tasks backend.
concurrency/src/tasks/time.rs Updates timer utilities (send_after/send_interval) to work with Context and typed messages.
concurrency/src/tasks/timer_tests.rs Migrates timer tests to Handler/Messages API in tasks backend.
concurrency/src/threads/actor.rs Implements sync actor runtime with envelope mailbox, Context/ActorRef/Recipient; mirrors send_message_on utility without select bug.
concurrency/src/threads/mod.rs Re-exports threads actor API plus stream/time helpers for sync backend rewrite.
concurrency/src/threads/stream.rs Rewrites threads stream utilities to integrate with new Context/ActorRef API.
concurrency/src/threads/time.rs Updates thread timer utilities (send_after/send_interval) to work with Context and typed messages.
concurrency/src/threads/timer_tests.rs Migrates thread timer tests to Handler/Messages API.
docs/API_REDESIGN.md Updates API redesign docs to describe Handler, Message, Recipient, and envelope-based mailboxes.
examples/bank/src/main.rs Migrates async bank example main to new Handler/Messages API.
examples/bank/src/messages.rs Defines bank messages using Message trait/macro for async example.
examples/bank/src/server.rs Migrates bank server actor to implement Handler per message type.
examples/bank_threads/src/main.rs Migrates threaded bank example to new sync actor API.
examples/bank_threads/src/messages.rs Defines bank thread messages; ensure it matches Handler impls and request API.
examples/bank_threads/src/server.rs Threaded bank server updated to per-message Handler impls.
examples/blocking_genserver/main.rs Updates blocking genserver example to use backend selection and new messaging API.
examples/busy_genserver_warning/main.rs Updates busy warning example to new Handler API; verify WarnOnBlocking expectations in debug builds.
examples/name_server/src/main.rs Name server example migrated to new messages/handlers and Recipient usage.
examples/name_server/src/messages.rs Defines name server messages using macro; ensure message structs are public where needed.
examples/name_server/src/server.rs Name server logic updated to per-message Handler implementations; verify reply types.
examples/ping_pong/src/consumer.rs Ping-pong consumer updated to use Recipient to avoid circular deps.
examples/ping_pong/src/main.rs Ping-pong example main updated for new API.
examples/ping_pong/src/messages.rs Defines ping/pong messages with macro; verify handler result types.
examples/ping_pong/src/producer.rs Ping-pong producer updated to send to Recipient and handle stop logic.
examples/signal_test/src/main.rs Signal test example updated to send_message_on with new actor API.
examples/signal_test_threads/src/main.rs Threaded signal test migrated to new sync API.
examples/updater/src/main.rs Updater async example migrated to Context-based scheduling helpers.
examples/updater/src/messages.rs Defines updater messages via macro; verify any mixed unit/field usage.
examples/updater/src/server.rs Updater server implements Handler per message and uses Context for timers.
examples/updater_threads/src/main.rs Updater threaded example migrated to sync Context-based scheduling.
examples/updater_threads/src/messages.rs Defines updater thread messages; verify request/reply types and visibility.
examples/updater_threads/src/server.rs Threaded updater server updated to per-message Handler implementations.

Sequence Diagram

sequenceDiagram
    autonumber
    participant Caller
    participant Ref as ActorRef
    participant MPSC as "mpsc mailbox"
    participant Runner as "run_actor"
    participant Act as "Actor"

    Caller->>Ref: request(msg)
    Ref->>MPSC: send(MessageEnvelope)
    Runner->>MPSC: recv().await
    MPSC-->>Runner: envelope
    Runner->>Act: handle(msg, ctx)
    Act-->>Runner: result
    Runner-->>Ref: oneshot.send(result)
    Ref-->>Caller: oneshot resolves

    Note over Runner: stop() cancels token only
    Note over Runner: loop still awaits recv().await
Loading

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

38 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Comment on lines +323 to 339
loop {
let msg = rx.recv().await;
match msg {
Some(envelope) => {
let result = AssertUnwindSafe(envelope.handle(&mut actor, &ctx))
.catch_unwind()
.await;
if let Err(panic) = result {
tracing::error!("Panic in message handler: {panic:?}");
break;
}
if cancellation_token.is_cancelled() {
break;
}
}
tracing::trace!("Stopping Actor");
self
None => break,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stop can hang mailbox
Context::stop() only flips the cancellation token, but run_actor will keep awaiting rx.recv().await until another message arrives (since the cancellation token isn’t selected against recv). This means ActorRef::join() can block indefinitely if an actor is stopped externally (via ctx.stop() from another task) and no further messages are sent to wake the mailbox.

This is observable whenever an actor is idle and a separate task calls Context::from_ref(&actor_ref).stop() (or other helpers do so) without also sending a message.

Prompt To Fix With AI
This is a comment left during a code review.
Path: concurrency/src/tasks/actor.rs
Line: 323:339

Comment:
**Stop can hang mailbox**
`Context::stop()` only flips the cancellation token, but `run_actor` will keep awaiting `rx.recv().await` until another message arrives (since the cancellation token isn’t selected against `recv`). This means `ActorRef::join()` can block indefinitely if an actor is stopped externally (via `ctx.stop()` from another task) and no further messages are sent to wake the mailbox.

This is observable whenever an actor is idle and a separate task calls `Context::from_ref(&actor_ref).stop()` (or other helpers do so) without also sending a message.

How can I resolve this? If you propose a fix, please make it concise.

@ElFantasma ElFantasma force-pushed the feat/handler-api-v0.5 branch from 593eecb to 179437e Compare February 9, 2026 23:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant