fix(connectors): prevent silent failures in FlatBuffer conversions#3431
fix(connectors): prevent silent failures in FlatBuffer conversions#3431Standing-Man wants to merge 1 commit into
Conversation
|
Thanks for the PR. It is labeled Slash commands (own line, regular comment) move it around the queue:
See CONTRIBUTING.md for details. |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3431 +/- ##
=============================================
- Coverage 74.31% 57.01% -17.30%
Complexity 943 943
=============================================
Files 1245 1244 -1
Lines 121653 108619 -13034
Branches 97959 84925 -13034
=============================================
- Hits 90410 61933 -28477
- Misses 28288 43967 +15679
+ Partials 2955 2719 -236
🚀 New features to boost your workflow:
|
Signed-off-by: StandingMan <jmtangcs@gmail.com>
4194ac5 to
cca1079
Compare
hubcio
left a comment
There was a problem hiding this comment.
core fix is sound - removing the silent _ => message.payload catch-all, adding the payload/source-format guard, and dropping the double-application of field_mappings are all the right direction.
two things that don't land on changed lines so noting them here:
flatbuffer_convert.rs:137-141 - the FlatBuffer -> Json arm still does if let Payload::FlatBuffer(bytes) = message.payload { ... } else { return Err(InvalidPayloadType) }. the new front guard already proves the payload is flatbuffer whenever source_format == FlatBuffer, so that else is unreachable. it's also inconsistent with the sibling -> Text / -> Raw arms that just trust the guard. collapse to let Payload::FlatBuffer(bytes) = message.payload else { return Err(Error::InvalidPayloadType); }; - one line, still fail-closed.
behavior changes worth calling out in the pr description (all intended, no code change): unsupported pairs the old catch-all silently passed through (json -> text, text -> json, etc) now hard-error; the new guard rejects mismatched payloads the old encoder would have coped with; and json -> json with field_mappings no longer renames keys (the old pre-conversion step did).
| message.payload | ||
| (source, target) if source == target => message.payload, | ||
| (source, target) => { | ||
| return Err(Error::InvalidConfigValue(format!( |
There was a problem hiding this comment.
this unsupported-pair error only depends on source_format and target_format, both static config, but it's checked per message. a misconfigured pair like text -> flatbuffer won't fail at startup - the runtime just logs, drops the message, and continues, so every message on the stream is silently dropped forever. that's the same silent-failure class this pr kills, moved from passthrough to drop. make FlatBufferConvert::new return Result and validate the pair there. from_config already returns Result and threads ? for FilterFields::new / UnwrapEnvelope::new, so the plumbing's there. once validated in new, this arm is unreachable and can go.
| // Apply field mappings if configured | ||
| if let Some(field_mappings) = &self.config.field_mappings { | ||
| message.payload = self.apply_field_mappings(message.payload, field_mappings)?; | ||
| if !self.payload_matches_source_format(&message.payload) { |
There was a problem hiding this comment.
InvalidPayloadType is a unit variant - display is just "invalid payload type", no expected-vs-actual. when it fires the operator can't tell if they mislabeled source_format or an upstream decoder produced an unexpected variant. the sibling unsupported-conversion error embeds {source} -> {target}, so the two failure modes here are asymmetric. cheap fix: Error::InvalidConfigValue(format!("expected {} payload", self.config.source_format)) since Schema already implements Display.
| )); | ||
| } | ||
|
|
||
| #[test] |
There was a problem hiding this comment.
payload_matches_source_format enumerates all 6 schema/payload pairs but this test only hits json source with a text payload. the other 5 guards never see a mismatched payload, so a future edit that drops or mis-maps one arm (say avro -> raw) passes every test. exhaustiveness of this guard is the whole point of the fix - worth a table-driven test over each schema variant with a deliberately wrong payload, all asserting InvalidPayloadType.
|
|
||
| assert!(matches!( | ||
| result, | ||
| Err(Error::InvalidConfigValue(message)) |
There was a problem hiding this comment.
asserting the full string "unsupported FlatBuffer conversion: text -> flatbuffer" couples the test to the exact prose, the strum lowercasing, and the arrow format - any wording tweak breaks it for no behavioral reason. loosen to Err(Error::InvalidConfigValue(_)) and, if the message matters, assert it contains the source/target tokens instead of full equality.
| } | ||
| } | ||
|
|
||
| #[cfg(test)] |
There was a problem hiding this comment.
the default config is flatbuffer -> json (the most common real setup) yet none of the three FlatBuffer -> arms have a happy-path test - only json -> flatbuffer and the flatbuffer same==same passthrough are covered. a regression in the decoder wiring or the convert_format text/raw arms wouldn't be caught. worth one deterministic test each: -> text produces base64 Payload::Text, -> raw returns the bytes unchanged, -> json produces the flatbuffer_size/raw_data_base64 shape.
Which issue does this PR address?
Relates to #3430
Rationale
Prevent
FlatBufferConvertfrom silently passing through messages when theconfigured conversion is unsupported.
FlatBufferConvertpreviously used a catch-all match branch that returned theoriginal payload unchanged. For example, configuring
Text -> FlatBufferreturned success while leaving themessage as
Payload::Text.What changed?
FlatBufferStreamEncoder.Local Execution
Passed
Ran
AI Usage