feat: implement Handler<M> API with envelope-based mailbox#148
feat: implement Handler<M> API with envelope-based mailbox#148ElFantasma wants to merge 6 commits intomainfrom
Conversation
🤖 Codex Code ReviewFindings
Notes
If you want, I can propose concrete patches for the cancellation/wakeup and completion-guard issues. Automated review by OpenAI Codex · custom prompt |
Greptile OverviewGreptile SummaryThis PR implements the v0.5 actor API redesign by introducing a per-message The new Key issue to address before merge: in the async runtime, Confidence Score: 3/5
|
| Filename | Overview |
|---|---|
| concurrency/src/error.rs | Adds ActorError enum with conversions from task/thread mpsc SendError and basic std::error::Error test. |
| concurrency/src/lib.rs | Updates crate exports to new modules (error/message/tasks/threads) for Handler-based API. |
| concurrency/src/message.rs | Introduces Message trait and messages! macro generating message structs; macro splits unit vs field variants. |
| concurrency/src/tasks/actor.rs | Implements async actor runtime with envelope mailbox, Context/ActorRef/Recipient; found bug where cancel signal never races because same future is pinned and moved into select. |
| concurrency/src/tasks/mod.rs | Re-exports tasks actor API plus stream/time helpers for async backend rewrite. |
| concurrency/src/tasks/stream.rs | Rewrites stream utilities to integrate with Context/ActorRef and Handler-based messaging. |
| concurrency/src/tasks/stream_tests.rs | Updates stream tests to new typed message API for tasks backend. |
| concurrency/src/tasks/time.rs | Updates timer utilities (send_after/send_interval) to work with Context and typed messages. |
| concurrency/src/tasks/timer_tests.rs | Migrates timer tests to Handler/Messages API in tasks backend. |
| concurrency/src/threads/actor.rs | Implements sync actor runtime with envelope mailbox, Context/ActorRef/Recipient; mirrors send_message_on utility without select bug. |
| concurrency/src/threads/mod.rs | Re-exports threads actor API plus stream/time helpers for sync backend rewrite. |
| concurrency/src/threads/stream.rs | Rewrites threads stream utilities to integrate with new Context/ActorRef API. |
| concurrency/src/threads/time.rs | Updates thread timer utilities (send_after/send_interval) to work with Context and typed messages. |
| concurrency/src/threads/timer_tests.rs | Migrates thread timer tests to Handler/Messages API. |
| docs/API_REDESIGN.md | Updates API redesign docs to describe Handler, Message, Recipient, and envelope-based mailboxes. |
| examples/bank/src/main.rs | Migrates async bank example main to new Handler/Messages API. |
| examples/bank/src/messages.rs | Defines bank messages using Message trait/macro for async example. |
| examples/bank/src/server.rs | Migrates bank server actor to implement Handler per message type. |
| examples/bank_threads/src/main.rs | Migrates threaded bank example to new sync actor API. |
| examples/bank_threads/src/messages.rs | Defines bank thread messages; ensure it matches Handler impls and request API. |
| examples/bank_threads/src/server.rs | Threaded bank server updated to per-message Handler impls. |
| examples/blocking_genserver/main.rs | Updates blocking genserver example to use backend selection and new messaging API. |
| examples/busy_genserver_warning/main.rs | Updates busy warning example to new Handler API; verify WarnOnBlocking expectations in debug builds. |
| examples/name_server/src/main.rs | Name server example migrated to new messages/handlers and Recipient usage. |
| examples/name_server/src/messages.rs | Defines name server messages using macro; ensure message structs are public where needed. |
| examples/name_server/src/server.rs | Name server logic updated to per-message Handler implementations; verify reply types. |
| examples/ping_pong/src/consumer.rs | Ping-pong consumer updated to use Recipient to avoid circular deps. |
| examples/ping_pong/src/main.rs | Ping-pong example main updated for new API. |
| examples/ping_pong/src/messages.rs | Defines ping/pong messages with macro; verify handler result types. |
| examples/ping_pong/src/producer.rs | Ping-pong producer updated to send to Recipient and handle stop logic. |
| examples/signal_test/src/main.rs | Signal test example updated to send_message_on with new actor API. |
| examples/signal_test_threads/src/main.rs | Threaded signal test migrated to new sync API. |
| examples/updater/src/main.rs | Updater async example migrated to Context-based scheduling helpers. |
| examples/updater/src/messages.rs | Defines updater messages via macro; verify any mixed unit/field usage. |
| examples/updater/src/server.rs | Updater server implements Handler per message and uses Context for timers. |
| examples/updater_threads/src/main.rs | Updater threaded example migrated to sync Context-based scheduling. |
| examples/updater_threads/src/messages.rs | Defines updater thread messages; verify request/reply types and visibility. |
| examples/updater_threads/src/server.rs | Threaded updater server updated to per-message Handler implementations. |
Sequence Diagram
sequenceDiagram
autonumber
participant Caller
participant Ref as ActorRef
participant MPSC as "mpsc mailbox"
participant Runner as "run_actor"
participant Act as "Actor"
Caller->>Ref: request(msg)
Ref->>MPSC: send(MessageEnvelope)
Runner->>MPSC: recv().await
MPSC-->>Runner: envelope
Runner->>Act: handle(msg, ctx)
Act-->>Runner: result
Runner-->>Ref: oneshot.send(result)
Ref-->>Caller: oneshot resolves
Note over Runner: stop() cancels token only
Note over Runner: loop still awaits recv().await
| loop { | ||
| let msg = rx.recv().await; | ||
| match msg { | ||
| Some(envelope) => { | ||
| let result = AssertUnwindSafe(envelope.handle(&mut actor, &ctx)) | ||
| .catch_unwind() | ||
| .await; | ||
| if let Err(panic) = result { | ||
| tracing::error!("Panic in message handler: {panic:?}"); | ||
| break; | ||
| } | ||
| if cancellation_token.is_cancelled() { | ||
| break; | ||
| } | ||
| } | ||
| tracing::trace!("Stopping Actor"); | ||
| self | ||
| None => break, | ||
| } |
There was a problem hiding this comment.
Stop can hang mailbox
Context::stop() only flips the cancellation token, but run_actor will keep awaiting rx.recv().await until another message arrives (since the cancellation token isn’t selected against recv). This means ActorRef::join() can block indefinitely if an actor is stopped externally (via ctx.stop() from another task) and no further messages are sent to wake the mailbox.
This is observable whenever an actor is idle and a separate task calls Context::from_ref(&actor_ref).stop() (or other helpers do so) without also sending a message.
Prompt To Fix With AI
This is a comment left during a code review.
Path: concurrency/src/tasks/actor.rs
Line: 323:339
Comment:
**Stop can hang mailbox**
`Context::stop()` only flips the cancellation token, but `run_actor` will keep awaiting `rx.recv().await` until another message arrives (since the cancellation token isn’t selected against `recv`). This means `ActorRef::join()` can block indefinitely if an actor is stopped externally (via `ctx.stop()` from another task) and no further messages are sent to wake the mailbox.
This is observable whenever an actor is idle and a separate task calls `Context::from_ref(&actor_ref).stop()` (or other helpers do so) without also sending a message.
How can I resolve this? If you propose a fix, please make it concise.593eecb to
179437e
Compare
Summary
Implements the v0.5 API redesign (issues #144, #145) replacing the monolithic Actor trait with a per-message
Handler<M>pattern:Messagetrait +messages!macro — each message is its own struct with an associatedResulttype. TT-muncher macro supports mixing unit and field-based messages in a single invocation.Handler<M>trait — one impl per message type, replacing singlehandle_request/handle_messagewith enum matchingReceiver<M>trait +Recipient<M>— object-safe type-erased messaging for cross-actor communication (solves circular deps)Box<dyn Envelope<A>>Actortrait — juststarted()/stopped()lifecycle hooksContext<A>— passed to handlers instead ofActorRef<A>, withsend(),request(),send_request(),stop(), andDebugBoth
tasks(async) andthreads(sync) modules fully rewritten. All 11 examples migrated. 28 tests pass.API consistency
send_message_on,send_after,send_intervalall takeContext<A>(useactor_ref.context()from outside)send_requesthas a 5s default timeout in both tasks and threads modulessend_requestfree function (forRecipient<M>) takes an explicit timeout in both modulesctx.stop()instarted()correctly exits the actor loop without blocking on recvTest plan
cargo build --workspacecompiles cleancargo test --workspace— 28/28 tests passRecipient<M>(no circular deps)send_message_onwith new APIsend_interval/send_afterwithContext<A>Backend::Threadwith new APIsend_request(no racysend+sleep)