PubSocket silently drops large messages due to one-shot try_send + noop waker
Summary
PubSocket::send silently drops any message whose wire size exceeds what the TCP send buffer can absorb in a single write() syscall, when the PUB socket is long-lived (i.e., reused across many sends). The caller receives Ok(()) and the subscriber never sees the message. No log, no error, no warning.
The bug is reliably reproducible on localhost with any payload larger than ~300 KiB (exact threshold depends on net.ipv4.tcp_wmem).
This is not the "subscriber can't keep up" case that issue #145 / ZMTP §29 talks about. It is the publisher-side case where FramedWrite's own internal buffer has not yet finished flushing the previous write — a condition that looks identical to Poll::Pending on poll_ready but has completely different semantics.
Reproducer
Save as tests/large_message_drop.rs in a fresh cargo project with zeromq = "0.4.1" (also reproduces on "0.6.0-pre.1"):
use zeromq::{PubSocket, SubSocket, Socket, SocketRecv, SocketSend, ZmqMessage};
#[tokio::test]
async fn pub_sub_delivers_300kib_message() {
// Bind the broker-style SUB end first so the subscription is active
// when the PUB connects.
let mut sub = SubSocket::new();
sub.bind("tcp://127.0.0.1:0").await.unwrap();
sub.subscribe("").await.unwrap();
// Extract the actual bound port.
let endpoint = match sub.binds().keys().next().cloned().unwrap() {
zeromq::Endpoint::Tcp(_, port) => format!("tcp://127.0.0.1:{port}"),
_ => unreachable!(),
};
let mut pubs = PubSocket::new();
pubs.connect(&endpoint).await.unwrap();
// Small sleep so the PUB/SUB subscription handshake completes
// before the first send (this is the standard slow-joiner workaround
// and is not the bug under test).
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// Payload: a single frame of ~300 KiB. This is larger than the
// default Linux TCP send buffer can absorb in one write(), so
// `poll_flush` on the publisher's FramedWrite sink will return
// Pending after draining only part of it.
let payload = vec![0xABu8; 300_000];
let msg = ZmqMessage::from(payload.clone());
pubs.send(msg).await.expect("PubSocket::send returned Err");
// Give the runtime time to actually flush the bytes on the wire.
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let received = tokio::time::timeout(
std::time::Duration::from_secs(2),
sub.recv(),
)
.await
.expect("recv timed out — the 300 KiB message was silently dropped");
let received = received.expect("recv errored");
assert_eq!(received.get(0).unwrap().as_ref(), payload.as_slice());
}
Actual result (0.4.1 and 0.6.0-pre.1):
---- pub_sub_delivers_300kib_message stdout ----
thread '...' panicked at 'recv timed out — the 300 KiB message was silently dropped'
Expected result: the message is delivered. Subscribers can keep up perfectly fine — there is only one message in flight and it is below any sensible queue limit.
For reference, the same reproducer with payload.len() = 100_000 passes reliably. The threshold is deterministically tied to the TCP send-buffer high-water mark: messages that fit in one write() are fine; messages that don't, disappear.
Root cause
src/codec/mod.rs:
impl TrySend for ZmqFramedWrite {
fn try_send(mut self: Pin<&mut Self>, item: Message) -> ZmqResult<()> {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match self.as_mut().poll_ready(&mut cx) {
Poll::Ready(Ok(())) => {
self.as_mut().start_send(item)?;
let _ = self.as_mut().poll_flush(&mut cx); // ignore result just hope that it flush eventually
Ok(())
}
Poll::Ready(Err(e)) => Err(e.into()),
Poll::Pending => Err(ZmqError::BufferFull("Sink is full")),
}
}
}
And src/pub.rs::SocketSend for PubSocket:
let res = subscriber
.send_queue
.as_mut()
.try_send(Message::Message(message.clone()));
match res {
Ok(()) => {}
Err(ZmqError::Codec(CodecError::Io(e))) => { /* broken pipe handling */ }
Err(ZmqError::BufferFull(_)) => {
// ignore silently. https://rfc.zeromq.org/spec/29/ says:
// SHALL silently drop the message if the queue for a subscriber is full.
}
Err(e) => { dbg!(e); todo!() }
}
The failure sequence for a 300 KiB message:
try_send calls poll_ready with a noop waker. FramedWrite's sink reports Ready(Ok(())) (the sink accepts arbitrarily-sized items via start_send — nothing has been flushed yet).
start_send copies the entire 300 KiB frame into the sink's internal BytesMut buffer.
poll_flush is called once with the same noop waker. The tokio TCP write-half drains what fits in the OS send buffer (e.g. 128 KiB), returns Pending for the rest. The result is let _ = ... — dropped.
- Because the waker is noop, nothing is ever registered to re-wake and re-poll the sink. The remaining ~170 KiB stays in the FramedWrite buffer indefinitely.
- On the next call to
PubSocket::send, try_send calls poll_ready again. FramedWrite's sink checks whether there's room to accept a new item on top of the already-buffered bytes, returns Pending because the buffer is non-empty.
try_send maps Pending to Err(BufferFull("Sink is full")).
PubSocket::send matches BufferFull(_) and silently drops the new message citing ZMTP §29.
- Since step 4 never re-polled, the partial flush from step 3 never completes. The original 300 KiB message is also lost — only its prefix reached the wire.
In practice, for a single-shot large send, the first message is lost. For repeated large sends, all of them after the first are lost.
Why ZMTP §29 does not apply
Issue #145 and the // ignore silently comment cite RFC 29 §Processing Outgoing Messages:
For processing outgoing messages:
SHALL silently drop the message if the queue for a subscriber is full.
This rule is about subscriber backpressure — a situation where the subscriber side is genuinely unable to keep up with the publisher's send rate and the publisher would otherwise block or OOM waiting for queue space. Dropping in that case is the correct ZMTP behavior.
The bug above is a different condition: the publisher's own FramedWrite sink has not yet finished draining the previous write. No subscriber is backed up — the sink is self-backed up because a single message is larger than one write() syscall and nothing is ever polled to finish the flush.
These two conditions look identical from inside TrySend::try_send (both surface as Poll::Pending on poll_ready), which is the source of the conflation. They are genuinely distinct events:
| Condition |
How to distinguish |
Correct behavior |
| A. Subscriber queue full |
poll_ready returns Pending and a properly-registered waker never fires, because the subscriber has stopped reading from its socket. |
§29: drop. |
| B. Sink mid-flush |
poll_ready returns Pending, and a properly-registered waker will fire shortly after the OS socket buffer has room. |
Await the flush, then accept the new item. |
The current code cannot distinguish A from B because it uses a noop waker and never gets woken for either case.
Proposed fix
Replace the try_send call site in PubSocket::send with the proper async SinkExt::send(item).await path. This awaits poll_ready correctly, drains the sink across multiple polls, and only returns when the item has actually been handed to the underlying write half.
Backpressure is then respected end-to-end: if a subscriber is actually slow, the publisher's send future pends until the flush completes, giving the application a natural signal. Callers who want the §29-style drop can wrap the send in tokio::time::timeout.
The other socket types (rep, router, req, sub, dealer via backend.rs) already use the send().await path for exactly this reason — PubSocket is the only outlier. Fixing it brings it in line with the rest of the crate.
I have a working patch (testing against v0.4.1) and will open a PR against main shortly; just wanted to surface the issue + reasoning first since #145 has existing context that should be referenced rather than re-argued from scratch.
Environment
zeromq = "0.4.1" and "0.6.0-pre.1" — both affected, identical code path
- Linux 6.x on x86_64 and aarch64; macOS on aarch64 (all three reproduce)
- Tokio 1.x multi-thread runtime
- TCP transport (ipc transport likely has a different threshold because AF_UNIX buffer sizes differ, but has not been tested)
PubSocket silently drops large messages due to one-shot
try_send+ noop wakerSummary
PubSocket::sendsilently drops any message whose wire size exceeds what the TCP send buffer can absorb in a singlewrite()syscall, when the PUB socket is long-lived (i.e., reused across many sends). The caller receivesOk(())and the subscriber never sees the message. No log, no error, no warning.The bug is reliably reproducible on localhost with any payload larger than ~300 KiB (exact threshold depends on
net.ipv4.tcp_wmem).This is not the "subscriber can't keep up" case that issue #145 / ZMTP §29 talks about. It is the publisher-side case where
FramedWrite's own internal buffer has not yet finished flushing the previous write — a condition that looks identical toPoll::Pendingonpoll_readybut has completely different semantics.Reproducer
Save as
tests/large_message_drop.rsin a fresh cargo project withzeromq = "0.4.1"(also reproduces on"0.6.0-pre.1"):Actual result (0.4.1 and 0.6.0-pre.1):
Expected result: the message is delivered. Subscribers can keep up perfectly fine — there is only one message in flight and it is below any sensible queue limit.
For reference, the same reproducer with
payload.len() = 100_000passes reliably. The threshold is deterministically tied to the TCP send-buffer high-water mark: messages that fit in onewrite()are fine; messages that don't, disappear.Root cause
src/codec/mod.rs:And
src/pub.rs::SocketSend for PubSocket:The failure sequence for a 300 KiB message:
try_sendcallspoll_readywith a noop waker.FramedWrite's sink reportsReady(Ok(()))(the sink accepts arbitrarily-sized items viastart_send— nothing has been flushed yet).start_sendcopies the entire 300 KiB frame into the sink's internalBytesMutbuffer.poll_flushis called once with the same noop waker. The tokio TCP write-half drains what fits in the OS send buffer (e.g. 128 KiB), returnsPendingfor the rest. The result islet _ = ...— dropped.PubSocket::send,try_sendcallspoll_readyagain.FramedWrite's sink checks whether there's room to accept a new item on top of the already-buffered bytes, returnsPendingbecause the buffer is non-empty.try_sendmapsPendingtoErr(BufferFull("Sink is full")).PubSocket::sendmatchesBufferFull(_)and silently drops the new message citing ZMTP §29.In practice, for a single-shot large send, the first message is lost. For repeated large sends, all of them after the first are lost.
Why ZMTP §29 does not apply
Issue #145 and the
// ignore silentlycomment cite RFC 29 §Processing Outgoing Messages:This rule is about subscriber backpressure — a situation where the subscriber side is genuinely unable to keep up with the publisher's send rate and the publisher would otherwise block or OOM waiting for queue space. Dropping in that case is the correct ZMTP behavior.
The bug above is a different condition: the publisher's own
FramedWritesink has not yet finished draining the previous write. No subscriber is backed up — the sink is self-backed up because a single message is larger than onewrite()syscall and nothing is ever polled to finish the flush.These two conditions look identical from inside
TrySend::try_send(both surface asPoll::Pendingonpoll_ready), which is the source of the conflation. They are genuinely distinct events:poll_readyreturnsPendingand a properly-registered waker never fires, because the subscriber has stopped reading from its socket.poll_readyreturnsPending, and a properly-registered waker will fire shortly after the OS socket buffer has room.The current code cannot distinguish A from B because it uses a noop waker and never gets woken for either case.
Proposed fix
Replace the
try_sendcall site inPubSocket::sendwith the proper asyncSinkExt::send(item).awaitpath. This awaitspoll_readycorrectly, drains the sink across multiple polls, and only returns when the item has actually been handed to the underlying write half.Backpressure is then respected end-to-end: if a subscriber is actually slow, the publisher's send future pends until the flush completes, giving the application a natural signal. Callers who want the §29-style drop can wrap the send in
tokio::time::timeout.The other socket types (
rep,router,req,sub,dealerviabackend.rs) already use thesend().awaitpath for exactly this reason —PubSocketis the only outlier. Fixing it brings it in line with the rest of the crate.I have a working patch (testing against v0.4.1) and will open a PR against
mainshortly; just wanted to surface the issue + reasoning first since #145 has existing context that should be referenced rather than re-argued from scratch.Environment
zeromq = "0.4.1"and"0.6.0-pre.1"— both affected, identical code path