diff --git a/README.md b/README.md index e97efca..a90eebe 100644 --- a/README.md +++ b/README.md @@ -152,6 +152,8 @@ const transfers = try client.subscribe(.{ .logs = .{ .address = usdc_address, .topics = &.{transfer_event_topic}, } }); +// MEV searchers: stream full pending transactions (geth-style). +const pending = try client.subscribe(.{ .new_pending_transactions = .{ .full = true } }); while (true) { const event = try client.next(); @@ -162,6 +164,10 @@ while (true) { } else if (event.sub == transfers) { const log = try eth.subscription.parseLogFromNotification(allocator, event.payload); // ... handle Transfer log + } else if (event.sub == pending) { + const tx = try eth.subscription.parseTransactionFromNotification(allocator, event.payload); + defer eth.rpc_transaction.freeRpcTransaction(allocator, tx); + // ... evaluate the pending tx (sandwich, backrun, ...) } } ``` diff --git a/src/provider.zig b/src/provider.zig index 7c11257..94a8a20 100644 --- a/src/provider.zig +++ b/src/provider.zig @@ -6,6 +6,7 @@ const primitives = @import("primitives.zig"); const receipt_mod = @import("receipt.zig"); const block_mod = @import("block.zig"); const state_overrides_mod = @import("state_overrides.zig"); +const rpc_transaction_mod = @import("rpc_transaction.zig"); const HttpTransport = @import("http_transport.zig").HttpTransport; /// Read-only Ethereum JSON-RPC provider. @@ -837,6 +838,66 @@ fn parseTopics(allocator: std.mem.Allocator, obj: std.json.ObjectMap) ![]const [ return topics; } +/// Parse a single RpcTransaction from a JSON object as returned by +/// `eth_getTransactionByHash`, full-tx pending subscriptions, etc. +/// +/// Caller owns the returned transaction's `input` slice; use +/// `rpc_transaction.freeRpcTransaction` to release it. +pub fn parseSingleTransaction(allocator: std.mem.Allocator, obj: std.json.ObjectMap) !rpc_transaction_mod.RpcTransaction { + const hash = try parseHash(jsonGetString(obj, "hash") orelse return error.InvalidResponse); + const nonce = try parseHexU64(jsonGetString(obj, "nonce") orelse return error.InvalidResponse); + const block_hash = try parseOptionalHash(jsonGetString(obj, "blockHash")); + const block_number = try parseOptionalHexU64(jsonGetString(obj, "blockNumber")); + const tx_index: ?u32 = if (jsonGetString(obj, "transactionIndex")) |s| + parseHexU32(s) catch null + else + null; + + const from_addr = (try parseOptionalAddress(jsonGetString(obj, "from"))) orelse return error.InvalidResponse; + const to_addr = try parseOptionalAddress(jsonGetString(obj, "to")); + const value = try parseHexU256(jsonGetString(obj, "value") orelse "0x0"); + + const gas = try parseHexU64(jsonGetString(obj, "gas") orelse return error.InvalidResponse); + const gas_price: ?u256 = if (jsonGetString(obj, "gasPrice")) |s| try parseHexU256(s) else null; + const max_fee: ?u256 = if (jsonGetString(obj, "maxFeePerGas")) |s| try parseHexU256(s) else null; + const max_priority: ?u256 = if (jsonGetString(obj, "maxPriorityFeePerGas")) |s| try parseHexU256(s) else null; + const max_blob_fee: ?u256 = if (jsonGetString(obj, "maxFeePerBlobGas")) |s| try parseHexU256(s) else null; + + // `input` and `data` are aliases; geth uses `input`, parity used `data`. + const input_str = jsonGetString(obj, "input") orelse jsonGetString(obj, "data") orelse "0x"; + const input = try parseHexBytes(allocator, input_str); + errdefer allocator.free(input); + + const v = try parseHexU256(jsonGetString(obj, "v") orelse "0x0"); + const r = try parseHash(jsonGetString(obj, "r") orelse return error.InvalidResponse); + const s = try parseHash(jsonGetString(obj, "s") orelse return error.InvalidResponse); + + const type_val = parseHexU8(jsonGetString(obj, "type") orelse "0x0") catch 0; + const chain_id = try parseOptionalHexU64(jsonGetString(obj, "chainId")); + + return rpc_transaction_mod.RpcTransaction{ + .hash = hash, + .nonce = nonce, + .block_hash = block_hash, + .block_number = block_number, + .transaction_index = tx_index, + .from = from_addr, + .to = to_addr, + .value = value, + .gas = gas, + .gas_price = gas_price, + .max_fee_per_gas = max_fee, + .max_priority_fee_per_gas = max_priority, + .max_fee_per_blob_gas = max_blob_fee, + .input = input, + .v = v, + .r = r, + .s = s, + .type_ = type_val, + .chain_id = chain_id, + }; +} + /// Parse the logs response from eth_getLogs. fn parseLogsResponse(allocator: std.mem.Allocator, raw: []const u8) ![]receipt_mod.Log { const parsed = std.json.parseFromSlice(std.json.Value, allocator, raw, .{}) catch { @@ -1261,6 +1322,166 @@ test "parseTransactionReceipt - null result" { try std.testing.expect(receipt == null); } +test "parseSingleTransaction - pending EIP-1559" { + const allocator = std.testing.allocator; + const raw = + \\{"hash":"0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + \\ "nonce":"0x5", + \\ "blockHash":null, + \\ "blockNumber":null, + \\ "transactionIndex":null, + \\ "from":"0x1111111111111111111111111111111111111111", + \\ "to":"0x2222222222222222222222222222222222222222", + \\ "value":"0xde0b6b3a7640000", + \\ "gas":"0x5208", + \\ "maxFeePerGas":"0x4a817c800", + \\ "maxPriorityFeePerGas":"0x77359400", + \\ "input":"0xdeadbeef", + \\ "v":"0x1", + \\ "r":"0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc", + \\ "s":"0xdddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd", + \\ "type":"0x2", + \\ "chainId":"0x1"} + ; + const parsed = try std.json.parseFromSlice(std.json.Value, allocator, raw, .{}); + defer parsed.deinit(); + const tx = try parseSingleTransaction(allocator, parsed.value.object); + defer rpc_transaction_mod.freeRpcTransaction(allocator, tx); + + try std.testing.expectEqual(@as(u64, 5), tx.nonce); + try std.testing.expect(tx.block_hash == null); + try std.testing.expectEqual(@as(u8, 2), tx.type_); + try std.testing.expect(tx.gas_price == null); + try std.testing.expectEqual(@as(?u256, 20_000_000_000), tx.max_fee_per_gas); + try std.testing.expectEqual(@as(?u256, 2_000_000_000), tx.max_priority_fee_per_gas); + try std.testing.expectEqual(@as(?u64, 1), tx.chain_id); + try std.testing.expectEqualSlices(u8, &.{ 0xde, 0xad, 0xbe, 0xef }, tx.input); + try std.testing.expectEqual(@as(u256, 1_000_000_000_000_000_000), tx.value); +} + +test "parseSingleTransaction - mined legacy" { + const allocator = std.testing.allocator; + const raw = + \\{"hash":"0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + \\ "nonce":"0x10", + \\ "blockHash":"0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + \\ "blockNumber":"0xbc614e", + \\ "transactionIndex":"0x2a", + \\ "from":"0x1111111111111111111111111111111111111111", + \\ "to":"0x2222222222222222222222222222222222222222", + \\ "value":"0x0", + \\ "gas":"0x5208", + \\ "gasPrice":"0x4a817c800", + \\ "input":"0x", + \\ "v":"0x25", + \\ "r":"0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc", + \\ "s":"0xdddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd", + \\ "type":"0x0", + \\ "chainId":"0x1"} + ; + const parsed = try std.json.parseFromSlice(std.json.Value, allocator, raw, .{}); + defer parsed.deinit(); + const tx = try parseSingleTransaction(allocator, parsed.value.object); + defer rpc_transaction_mod.freeRpcTransaction(allocator, tx); + + try std.testing.expectEqual(@as(u8, 0), tx.type_); + try std.testing.expectEqual(@as(?u64, 12345678), tx.block_number); + try std.testing.expectEqual(@as(?u32, 42), tx.transaction_index); + try std.testing.expectEqual(@as(?u256, 20_000_000_000), tx.gas_price); + try std.testing.expect(tx.max_fee_per_gas == null); + try std.testing.expectEqual(@as(usize, 0), tx.input.len); + try std.testing.expectEqual(@as(u256, 0x25), tx.v); +} + +test "parseSingleTransaction - contract creation has null to" { + const allocator = std.testing.allocator; + const raw = + \\{"hash":"0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + \\ "nonce":"0x0", + \\ "blockHash":null, + \\ "blockNumber":null, + \\ "transactionIndex":null, + \\ "from":"0x1111111111111111111111111111111111111111", + \\ "to":null, + \\ "value":"0x0", + \\ "gas":"0x5208", + \\ "gasPrice":"0x4a817c800", + \\ "input":"0x6080", + \\ "v":"0x1c", + \\ "r":"0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc", + \\ "s":"0xdddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd", + \\ "type":"0x0"} + ; + const parsed = try std.json.parseFromSlice(std.json.Value, allocator, raw, .{}); + defer parsed.deinit(); + const tx = try parseSingleTransaction(allocator, parsed.value.object); + defer rpc_transaction_mod.freeRpcTransaction(allocator, tx); + + try std.testing.expect(tx.to == null); + try std.testing.expectEqualSlices(u8, &.{ 0x60, 0x80 }, tx.input); +} + +test "parseSingleTransaction - data alias falls back when input missing" { + const allocator = std.testing.allocator; + const raw = + \\{"hash":"0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + \\ "nonce":"0x0", + \\ "blockHash":null, + \\ "blockNumber":null, + \\ "transactionIndex":null, + \\ "from":"0x1111111111111111111111111111111111111111", + \\ "to":"0x2222222222222222222222222222222222222222", + \\ "value":"0x0", + \\ "gas":"0x5208", + \\ "gasPrice":"0x1", + \\ "data":"0xfeed", + \\ "v":"0x1", + \\ "r":"0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc", + \\ "s":"0xdddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd", + \\ "type":"0x0"} + ; + const parsed = try std.json.parseFromSlice(std.json.Value, allocator, raw, .{}); + defer parsed.deinit(); + const tx = try parseSingleTransaction(allocator, parsed.value.object); + defer rpc_transaction_mod.freeRpcTransaction(allocator, tx); + + try std.testing.expectEqualSlices(u8, &.{ 0xfe, 0xed }, tx.input); +} + +test "parseTransactionFromNotification - end-to-end pending tx" { + // Verify the subscription.zig wrapper round-trips: build a fake + // notification envelope wrapping a tx object and parse it. + const allocator = std.testing.allocator; + const raw = + \\{"jsonrpc":"2.0","method":"eth_subscription","params":{ + \\ "subscription":"0xfeedface", + \\ "result":{ + \\ "hash":"0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + \\ "nonce":"0x1", + \\ "blockHash":null, + \\ "blockNumber":null, + \\ "transactionIndex":null, + \\ "from":"0x1111111111111111111111111111111111111111", + \\ "to":"0x2222222222222222222222222222222222222222", + \\ "value":"0x0", + \\ "gas":"0x5208", + \\ "maxFeePerGas":"0x4a817c800", + \\ "maxPriorityFeePerGas":"0x77359400", + \\ "input":"0x", + \\ "v":"0x1", + \\ "r":"0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc", + \\ "s":"0xdddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd", + \\ "type":"0x2", + \\ "chainId":"0x1"}}} + ; + const subscription = @import("subscription.zig"); + const tx = try subscription.parseTransactionFromNotification(allocator, raw); + defer rpc_transaction_mod.freeRpcTransaction(allocator, tx); + + try std.testing.expectEqual(@as(u8, 2), tx.type_); + try std.testing.expectEqual(@as(u64, 1), tx.nonce); +} + test "parseBlockHeader - basic block" { const allocator = std.testing.allocator; diff --git a/src/root.zig b/src/root.zig index 50c1717..b6580c3 100644 --- a/src/root.zig +++ b/src/root.zig @@ -22,6 +22,7 @@ pub const eip155 = @import("eip155.zig"); // -- Layer 4: Types -- pub const access_list = @import("access_list.zig"); pub const transaction = @import("transaction.zig"); +pub const rpc_transaction = @import("rpc_transaction.zig"); pub const receipt = @import("receipt.zig"); pub const block = @import("block.zig"); pub const blob = @import("blob.zig"); @@ -99,6 +100,7 @@ test { // Layer 4 _ = @import("access_list.zig"); _ = @import("transaction.zig"); + _ = @import("rpc_transaction.zig"); _ = @import("receipt.zig"); _ = @import("block.zig"); _ = @import("blob.zig"); diff --git a/src/rpc_transaction.zig b/src/rpc_transaction.zig new file mode 100644 index 0000000..aaa8d79 --- /dev/null +++ b/src/rpc_transaction.zig @@ -0,0 +1,136 @@ +const std = @import("std"); + +/// A transaction object as returned by Ethereum JSON-RPC methods like +/// `eth_getTransactionByHash`, `eth_getTransactionByBlockHashAndIndex`, and +/// the full-tx variant of `newPendingTransactions` (geth: `eth_subscribe( +/// "newPendingTransactions", true)`). +/// +/// This is distinct from the canonical signing-time `Transaction` union in +/// `transaction.zig`. RPC responses include fields the signing types do not +/// (hash, recovered `from`, optional block-position fields), and gas-pricing +/// fields are sparse depending on the transaction type. +/// +/// Fields whose presence depends on the transaction type: +/// +/// - `gas_price` is set for legacy (type 0) and EIP-2930 (type 1). For +/// EIP-1559 (type 2) and EIP-4844 (type 3) it is the effective gas price +/// when the tx is mined; when pending, nodes return the legacy +/// gasPrice-equivalent computed from the priority fee. +/// - `max_fee_per_gas` and `max_priority_fee_per_gas` are set for EIP-1559 +/// and EIP-4844. +/// - `max_fee_per_blob_gas` is set only for EIP-4844. +/// - `chain_id` is required for typed transactions and present for EIP-155 +/// legacy transactions; null for pre-EIP-155 legacy. +/// - `block_hash`, `block_number`, `transaction_index` are null while the +/// transaction is in the mempool (pending) and set once mined. +/// +/// The fields `access_list` and `blob_versioned_hashes` are intentionally +/// omitted from the v1 parser; both are rare in mempool sniping and add +/// significant parsing surface area. They can be added later without a +/// breaking change. +pub const RpcTransaction = struct { + // ----- Identity & position ----- + hash: [32]u8, + nonce: u64, + block_hash: ?[32]u8, + block_number: ?u64, + transaction_index: ?u32, + + // ----- Parties & value ----- + from: [20]u8, + /// Null for contract creation transactions. + to: ?[20]u8, + value: u256, + + // ----- Gas & pricing ----- + gas: u64, + gas_price: ?u256, + max_fee_per_gas: ?u256, + max_priority_fee_per_gas: ?u256, + max_fee_per_blob_gas: ?u256, + + // ----- Calldata (heap-owned) ----- + /// Caller owns this memory; free with `freeRpcTransaction`. + input: []const u8, + + // ----- Signature ----- + /// EIP-2718 v value: 0/1 for typed transactions, 27/28 for pre-155 + /// legacy, chain_id*2+35 / chain_id*2+36 for EIP-155 legacy. + v: u256, + r: [32]u8, + s: [32]u8, + + // ----- Type & chain ----- + /// 0 = legacy, 1 = EIP-2930, 2 = EIP-1559, 3 = EIP-4844. + type_: u8, + chain_id: ?u64, +}; + +/// Free heap-owned memory inside an RpcTransaction. +pub fn freeRpcTransaction(allocator: std.mem.Allocator, tx: RpcTransaction) void { + allocator.free(tx.input); +} + +// ============================================================================ +// Tests +// ============================================================================ + +test "RpcTransaction struct layout" { + const tx = RpcTransaction{ + .hash = [_]u8{0xaa} ** 32, + .nonce = 5, + .block_hash = null, + .block_number = null, + .transaction_index = null, + .from = [_]u8{0x11} ** 20, + .to = [_]u8{0x22} ** 20, + .value = 1_000_000_000_000_000_000, + .gas = 21_000, + .gas_price = 20_000_000_000, + .max_fee_per_gas = null, + .max_priority_fee_per_gas = null, + .max_fee_per_blob_gas = null, + .input = &.{}, + .v = 27, + .r = [_]u8{0x33} ** 32, + .s = [_]u8{0x44} ** 32, + .type_ = 0, + .chain_id = 1, + }; + + try std.testing.expectEqual(@as(u64, 5), tx.nonce); + try std.testing.expectEqual(@as(u8, 0), tx.type_); + try std.testing.expect(tx.block_hash == null); + try std.testing.expect(tx.gas_price != null); + try std.testing.expect(tx.max_fee_per_gas == null); +} + +test "freeRpcTransaction frees the input slice" { + const allocator = std.testing.allocator; + const input = try allocator.alloc(u8, 4); + @memcpy(input, "abcd"); + const tx = RpcTransaction{ + .hash = [_]u8{0} ** 32, + .nonce = 0, + .block_hash = null, + .block_number = null, + .transaction_index = null, + .from = [_]u8{0} ** 20, + .to = null, + .value = 0, + .gas = 0, + .gas_price = null, + .max_fee_per_gas = null, + .max_priority_fee_per_gas = null, + .max_fee_per_blob_gas = null, + .input = input, + .v = 0, + .r = [_]u8{0} ** 32, + .s = [_]u8{0} ** 32, + .type_ = 2, + .chain_id = null, + }; + // If freeRpcTransaction does not free input, the testing allocator + // would report a leak. + freeRpcTransaction(allocator, tx); +} diff --git a/src/subscription.zig b/src/subscription.zig index 08434f5..e2aff48 100644 --- a/src/subscription.zig +++ b/src/subscription.zig @@ -5,6 +5,7 @@ const block_mod = @import("block.zig"); const receipt_mod = @import("receipt.zig"); const primitives = @import("primitives.zig"); const provider_mod = @import("provider.zig"); +const rpc_tx_mod = @import("rpc_transaction.zig"); /// Types of Ethereum subscriptions available via eth_subscribe. pub const SubscriptionType = enum { @@ -28,11 +29,20 @@ pub const LogSubscriptionParams = struct { topics: ?[]const ?[32]u8 = null, }; +/// Parameters for a `newPendingTransactions` subscription. +pub const PendingTxParams = struct { + /// If true, the node streams full transaction objects instead of just + /// transaction hashes. Maps to the geth-style second parameter: + /// `eth_subscribe("newPendingTransactions", true)`. Some L2s and older + /// nodes ignore the parameter and only ever stream hashes. + full: bool = false, +}; + /// Parameters for an eth_subscribe call. pub const SubscriptionParams = union(enum) { new_heads: void, logs: LogSubscriptionParams, - new_pending_transactions: void, + new_pending_transactions: PendingTxParams, /// Get the subscription type. pub fn subType(self: SubscriptionParams) SubscriptionType { @@ -223,6 +233,21 @@ pub fn parseTxHashFromNotification(allocator: std.mem.Allocator, raw: []const u8 return primitives.hashFromHex(result_val.string) catch error.InvalidNotification; } +/// Parse a full-tx `newPendingTransactions` notification payload into an +/// RpcTransaction. Use this when the subscription was created with +/// `PendingTxParams{ .full = true }`. Caller owns the returned +/// transaction's heap fields; release with `rpc_transaction.freeRpcTransaction`. +pub fn parseTransactionFromNotification(allocator: std.mem.Allocator, raw: []const u8) !rpc_tx_mod.RpcTransaction { + const parsed = std.json.parseFromSlice(std.json.Value, allocator, raw, .{}) catch + return error.InvalidNotification; + defer parsed.deinit(); + + const result_val = getNotificationResult(parsed.value) orelse return error.InvalidNotification; + if (result_val != .object) return error.InvalidNotification; + + return provider_mod.parseSingleTransaction(allocator, result_val.object); +} + // --------------------------------------------------------------------------- // Notification parsing helpers // --------------------------------------------------------------------------- @@ -249,8 +274,12 @@ pub fn buildSubscribeParams(allocator: std.mem.Allocator, params: SubscriptionPa .new_heads => { return std.fmt.allocPrint(allocator, "[\"{s}\"]", .{SubscriptionType.new_heads.toParamString()}) catch return error.OutOfMemory; }, - .new_pending_transactions => { - return std.fmt.allocPrint(allocator, "[\"{s}\"]", .{SubscriptionType.new_pending_transactions.toParamString()}) catch return error.OutOfMemory; + .new_pending_transactions => |pending_params| { + const tag = SubscriptionType.new_pending_transactions.toParamString(); + if (pending_params.full) { + return std.fmt.allocPrint(allocator, "[\"{s}\",true]", .{tag}) catch return error.OutOfMemory; + } + return std.fmt.allocPrint(allocator, "[\"{s}\"]", .{tag}) catch return error.OutOfMemory; }, .logs => |log_params| { return buildLogSubscribeParams(allocator, log_params) catch return error.OutOfMemory; @@ -468,7 +497,7 @@ test "SubscriptionParams subType" { const logs: SubscriptionParams = .{ .logs = .{} }; try std.testing.expectEqual(SubscriptionType.logs, logs.subType()); - const pending: SubscriptionParams = .{ .new_pending_transactions = {} }; + const pending: SubscriptionParams = .{ .new_pending_transactions = .{} }; try std.testing.expectEqual(SubscriptionType.new_pending_transactions, pending.subType()); } @@ -481,15 +510,24 @@ test "buildSubscribeParams - newHeads" { try std.testing.expectEqualStrings("[\"newHeads\"]", result); } -test "buildSubscribeParams - newPendingTransactions" { +test "buildSubscribeParams - newPendingTransactions hashes only" { const allocator = std.testing.allocator; - const params: SubscriptionParams = .{ .new_pending_transactions = {} }; + const params: SubscriptionParams = .{ .new_pending_transactions = .{} }; const result = try buildSubscribeParams(allocator, params); defer allocator.free(result); try std.testing.expectEqualStrings("[\"newPendingTransactions\"]", result); } +test "buildSubscribeParams - newPendingTransactions full" { + const allocator = std.testing.allocator; + const params: SubscriptionParams = .{ .new_pending_transactions = .{ .full = true } }; + const result = try buildSubscribeParams(allocator, params); + defer allocator.free(result); + + try std.testing.expectEqualStrings("[\"newPendingTransactions\",true]", result); +} + test "buildSubscribeParams - logs with address only" { const allocator = std.testing.allocator; const addr = [_]u8{0xde} ** 20; diff --git a/src/ws_client.zig b/src/ws_client.zig index f15d88f..e1700f0 100644 --- a/src/ws_client.zig +++ b/src/ws_client.zig @@ -577,7 +577,7 @@ pub fn applyJitter(base_ms: u64, pct: u8, rng: std.Random) u64 { fn cloneParams(allocator: std.mem.Allocator, params: SubscriptionParams) !SubscriptionParams { return switch (params) { .new_heads => SubscriptionParams{ .new_heads = {} }, - .new_pending_transactions => SubscriptionParams{ .new_pending_transactions = {} }, + .new_pending_transactions => |pp| SubscriptionParams{ .new_pending_transactions = pp }, .logs => |lp| blk: { var owned = LogSubscriptionParams{ .address = lp.address, diff --git a/tests/integration_tests.zig b/tests/integration_tests.zig index 3272052..e290dbc 100644 --- a/tests/integration_tests.zig +++ b/tests/integration_tests.zig @@ -496,3 +496,69 @@ test "WsClient unsubscribe frees handle and removes from registry" { // The registry should be empty; pointer `sub` is freed and must not be // dereferenced after this point. } + +test "WsClient subscribe pending full streams an RpcTransaction" { + if (!isAnvilAvailable()) return; + const allocator = std.testing.allocator; + + // Subscribe on a fresh WsClient before sending the tx so we don't miss + // the notification. + const opts = eth.ws_client.Opts{ .ping_interval_ms = 0 }; + const client = try eth.ws_client.WsClient.connect(allocator, ANVIL_WS_URL, opts); + defer client.deinit(); + + const sub = try client.subscribe(.{ .new_pending_transactions = .{ .full = true } }); + + // Send a transaction via the HTTP wallet so the WsClient is purely a + // reader. Account #0 -> account #1, 0.001 ETH. + var http = eth.http_transport.HttpTransport.init(allocator, ANVIL_URL); + defer http.deinit(); + var provider = eth.provider.Provider.init(allocator, &http); + const private_key = try eth.hex.hexToBytesFixed(32, ACCOUNT_0_KEY_HEX); + var wallet = eth.wallet.Wallet.init(allocator, private_key, &provider); + const recipient = try eth.primitives.addressFromHex(ACCOUNT_1_ADDR_HEX); + const send_value = eth.units.parseEther(0.001) orelse return error.ParseEtherFailed; + const tx_hash = try wallet.sendTransaction(.{ + .to = recipient, + .value = send_value, + }); + + // Drain notifications until we see one matching our sub. Anvil with + // default instamine still emits a pending-tx event before the mine. + // + // Some nodes ignore the `full = true` parameter on + // `newPendingTransactions` and stream tx hashes (JSON strings) instead + // of full objects; in that case parseTransactionFromNotification will + // return error.InvalidNotification and we just skip and keep reading. + var found = false; + var attempts: usize = 0; + while (!found and attempts < 8) : (attempts += 1) { + const ev = client.next() catch |err| switch (err) { + // Some Anvil builds do not emit full pending-tx notifications; + // skip the test in that case rather than reporting failure. + error.Disconnected, error.Closed => return, + else => return err, + }; + defer allocator.free(ev.payload); + if (ev.sub != sub) continue; + + const tx = eth.subscription.parseTransactionFromNotification(allocator, ev.payload) catch |err| switch (err) { + // Hash-only notification on a node that doesn't support `full`; + // skip and keep draining until we see (or stop seeing) a real + // tx object. + error.InvalidNotification => continue, + else => return err, + }; + defer eth.rpc_transaction.freeRpcTransaction(allocator, tx); + + // Anvil may stream txs from previous tests in the same run; only + // accept the one we sent. + if (!std.mem.eql(u8, &tx.hash, &tx_hash)) continue; + + try std.testing.expectEqualSlices(u8, &recipient, &tx.to.?); + try std.testing.expectEqual(@as(u256, send_value), tx.value); + try std.testing.expect(tx.block_hash == null); // pending + found = true; + } + if (!found) return error.NoPendingTxObserved; +}