Skip to content

PubSocket silently drops large messages due to one-shot try_send + noop waker #237

@ndohuu

Description

@ndohuu

PubSocket silently drops large messages due to one-shot try_send + noop waker

Summary

PubSocket::send silently drops any message whose wire size exceeds what the TCP send buffer can absorb in a single write() syscall, when the PUB socket is long-lived (i.e., reused across many sends). The caller receives Ok(()) and the subscriber never sees the message. No log, no error, no warning.

The bug is reliably reproducible on localhost with any payload larger than ~300 KiB (exact threshold depends on net.ipv4.tcp_wmem).

This is not the "subscriber can't keep up" case that issue #145 / ZMTP §29 talks about. It is the publisher-side case where FramedWrite's own internal buffer has not yet finished flushing the previous write — a condition that looks identical to Poll::Pending on poll_ready but has completely different semantics.

Reproducer

Save as tests/large_message_drop.rs in a fresh cargo project with zeromq = "0.4.1" (also reproduces on "0.6.0-pre.1"):

use zeromq::{PubSocket, SubSocket, Socket, SocketRecv, SocketSend, ZmqMessage};

#[tokio::test]
async fn pub_sub_delivers_300kib_message() {
    // Bind the broker-style SUB end first so the subscription is active
    // when the PUB connects.
    let mut sub = SubSocket::new();
    sub.bind("tcp://127.0.0.1:0").await.unwrap();
    sub.subscribe("").await.unwrap();

    // Extract the actual bound port.
    let endpoint = match sub.binds().keys().next().cloned().unwrap() {
        zeromq::Endpoint::Tcp(_, port) => format!("tcp://127.0.0.1:{port}"),
        _ => unreachable!(),
    };

    let mut pubs = PubSocket::new();
    pubs.connect(&endpoint).await.unwrap();

    // Small sleep so the PUB/SUB subscription handshake completes
    // before the first send (this is the standard slow-joiner workaround
    // and is not the bug under test).
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;

    // Payload: a single frame of ~300 KiB. This is larger than the
    // default Linux TCP send buffer can absorb in one write(), so
    // `poll_flush` on the publisher's FramedWrite sink will return
    // Pending after draining only part of it.
    let payload = vec![0xABu8; 300_000];
    let msg = ZmqMessage::from(payload.clone());

    pubs.send(msg).await.expect("PubSocket::send returned Err");

    // Give the runtime time to actually flush the bytes on the wire.
    tokio::time::sleep(std::time::Duration::from_millis(200)).await;

    let received = tokio::time::timeout(
        std::time::Duration::from_secs(2),
        sub.recv(),
    )
    .await
    .expect("recv timed out — the 300 KiB message was silently dropped");

    let received = received.expect("recv errored");
    assert_eq!(received.get(0).unwrap().as_ref(), payload.as_slice());
}

Actual result (0.4.1 and 0.6.0-pre.1):

---- pub_sub_delivers_300kib_message stdout ----
thread '...' panicked at 'recv timed out — the 300 KiB message was silently dropped'

Expected result: the message is delivered. Subscribers can keep up perfectly fine — there is only one message in flight and it is below any sensible queue limit.

For reference, the same reproducer with payload.len() = 100_000 passes reliably. The threshold is deterministically tied to the TCP send-buffer high-water mark: messages that fit in one write() are fine; messages that don't, disappear.

Root cause

src/codec/mod.rs:

impl TrySend for ZmqFramedWrite {
    fn try_send(mut self: Pin<&mut Self>, item: Message) -> ZmqResult<()> {
        let waker = noop_waker();
        let mut cx = Context::from_waker(&waker);
        match self.as_mut().poll_ready(&mut cx) {
            Poll::Ready(Ok(())) => {
                self.as_mut().start_send(item)?;
                let _ = self.as_mut().poll_flush(&mut cx); // ignore result just hope that it flush eventually
                Ok(())
            }
            Poll::Ready(Err(e)) => Err(e.into()),
            Poll::Pending => Err(ZmqError::BufferFull("Sink is full")),
        }
    }
}

And src/pub.rs::SocketSend for PubSocket:

let res = subscriber
    .send_queue
    .as_mut()
    .try_send(Message::Message(message.clone()));
match res {
    Ok(()) => {}
    Err(ZmqError::Codec(CodecError::Io(e))) => { /* broken pipe handling */ }
    Err(ZmqError::BufferFull(_)) => {
        // ignore silently. https://rfc.zeromq.org/spec/29/ says:
        //   SHALL silently drop the message if the queue for a subscriber is full.
    }
    Err(e) => { dbg!(e); todo!() }
}

The failure sequence for a 300 KiB message:

  1. try_send calls poll_ready with a noop waker. FramedWrite's sink reports Ready(Ok(())) (the sink accepts arbitrarily-sized items via start_send — nothing has been flushed yet).
  2. start_send copies the entire 300 KiB frame into the sink's internal BytesMut buffer.
  3. poll_flush is called once with the same noop waker. The tokio TCP write-half drains what fits in the OS send buffer (e.g. 128 KiB), returns Pending for the rest. The result is let _ = ... — dropped.
  4. Because the waker is noop, nothing is ever registered to re-wake and re-poll the sink. The remaining ~170 KiB stays in the FramedWrite buffer indefinitely.
  5. On the next call to PubSocket::send, try_send calls poll_ready again. FramedWrite's sink checks whether there's room to accept a new item on top of the already-buffered bytes, returns Pending because the buffer is non-empty.
  6. try_send maps Pending to Err(BufferFull("Sink is full")).
  7. PubSocket::send matches BufferFull(_) and silently drops the new message citing ZMTP §29.
  8. Since step 4 never re-polled, the partial flush from step 3 never completes. The original 300 KiB message is also lost — only its prefix reached the wire.

In practice, for a single-shot large send, the first message is lost. For repeated large sends, all of them after the first are lost.

Why ZMTP §29 does not apply

Issue #145 and the // ignore silently comment cite RFC 29 §Processing Outgoing Messages:

For processing outgoing messages:
SHALL silently drop the message if the queue for a subscriber is full.

This rule is about subscriber backpressure — a situation where the subscriber side is genuinely unable to keep up with the publisher's send rate and the publisher would otherwise block or OOM waiting for queue space. Dropping in that case is the correct ZMTP behavior.

The bug above is a different condition: the publisher's own FramedWrite sink has not yet finished draining the previous write. No subscriber is backed up — the sink is self-backed up because a single message is larger than one write() syscall and nothing is ever polled to finish the flush.

These two conditions look identical from inside TrySend::try_send (both surface as Poll::Pending on poll_ready), which is the source of the conflation. They are genuinely distinct events:

Condition How to distinguish Correct behavior
A. Subscriber queue full poll_ready returns Pending and a properly-registered waker never fires, because the subscriber has stopped reading from its socket. §29: drop.
B. Sink mid-flush poll_ready returns Pending, and a properly-registered waker will fire shortly after the OS socket buffer has room. Await the flush, then accept the new item.

The current code cannot distinguish A from B because it uses a noop waker and never gets woken for either case.

Proposed fix

Replace the try_send call site in PubSocket::send with the proper async SinkExt::send(item).await path. This awaits poll_ready correctly, drains the sink across multiple polls, and only returns when the item has actually been handed to the underlying write half.

Backpressure is then respected end-to-end: if a subscriber is actually slow, the publisher's send future pends until the flush completes, giving the application a natural signal. Callers who want the §29-style drop can wrap the send in tokio::time::timeout.

The other socket types (rep, router, req, sub, dealer via backend.rs) already use the send().await path for exactly this reason — PubSocket is the only outlier. Fixing it brings it in line with the rest of the crate.

I have a working patch (testing against v0.4.1) and will open a PR against main shortly; just wanted to surface the issue + reasoning first since #145 has existing context that should be referenced rather than re-argued from scratch.

Environment

  • zeromq = "0.4.1" and "0.6.0-pre.1" — both affected, identical code path
  • Linux 6.x on x86_64 and aarch64; macOS on aarch64 (all three reproduce)
  • Tokio 1.x multi-thread runtime
  • TCP transport (ipc transport likely has a different threshold because AF_UNIX buffer sizes differ, but has not been tested)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions