diff --git a/apps/backend/src/modules/chains/chains.module.ts b/apps/backend/src/modules/chains/chains.module.ts index 4547ee6..05065bd 100644 --- a/apps/backend/src/modules/chains/chains.module.ts +++ b/apps/backend/src/modules/chains/chains.module.ts @@ -1,6 +1,7 @@ import { Module } from '@nestjs/common'; import { ChainRegistryService } from './chain-registry.service'; import { StellarChainMonitor } from './monitors/stellar.chain-monitor'; +import { EthereumChainMonitor } from './monitors/ethereum.chain-monitor'; import { IChainMonitor } from './interfaces/chain-monitor.interface'; /** @@ -20,10 +21,9 @@ import { IChainMonitor } from './interfaces/chain-monitor.interface'; useFactory: (): IChainMonitor[] => { const monitors: IChainMonitor[] = [new StellarChainMonitor()]; - // Future chains — uncomment to activate: - // if (process.env.ETHEREUM_RPC_URL) { - // monitors.push(new EthereumChainMonitor(process.env.ETHEREUM_RPC_URL)); - // } + if (process.env.ETHEREUM_RPC_URL) { + monitors.push(new EthereumChainMonitor(process.env.ETHEREUM_RPC_URL)); + } return monitors; }, diff --git a/apps/backend/src/modules/chains/monitors/ethereum.chain-monitor.spec.ts b/apps/backend/src/modules/chains/monitors/ethereum.chain-monitor.spec.ts new file mode 100644 index 0000000..a4539e9 --- /dev/null +++ b/apps/backend/src/modules/chains/monitors/ethereum.chain-monitor.spec.ts @@ -0,0 +1,205 @@ +import { EthereumChainMonitor } from './ethereum.chain-monitor'; +import { NormalizedChainEvent } from '../interfaces/normalized-chain-event.interface'; + +// Mock the ethers module so that ethers.JsonRpcProvider returns a controlled stub +const mockGetBlockNumber = jest.fn(); +const mockGetBlock = jest.fn(); + +jest.mock('ethers', () => { + // eslint-disable-next-line @typescript-eslint/no-var-requires + const actual = jest.requireActual('ethers'); + const formatEther = actual.formatEther ?? actual.ethers?.formatEther; + return { + __esModule: true, + ethers: { + JsonRpcProvider: jest.fn().mockImplementation(() => ({ + getBlockNumber: mockGetBlockNumber, + getBlock: mockGetBlock, + })), + formatEther, + }, + }; +}); + +describe('EthereumChainMonitor', () => { + let monitor: EthereumChainMonitor; + + beforeEach(() => { + jest.useFakeTimers(); + mockGetBlockNumber.mockReset(); + mockGetBlock.mockReset(); + + monitor = new EthereumChainMonitor('http://localhost:8545', 5000); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + describe('chainId', () => { + it('returns "ethereum"', () => { + expect(monitor.chainId).toBe('ethereum'); + }); + }); + + describe('normalizeEvent', () => { + it('normalizes a native ETH transfer', () => { + const raw: Record = { + hash: '0xabc123', + from: '0xalice', + to: '0xbob', + value: '1000000000000000000', // 1 ETH + data: '0x', + blockTimestamp: 1700000000, + }; + + const result: NormalizedChainEvent = monitor.normalizeEvent(raw); + + expect(result.chainId).toBe('ethereum'); + expect(result.txHash).toBe('0xabc123'); + expect(result.from).toBe('0xalice'); + expect(result.to).toBe('0xbob'); + expect(result.amount).toBe('1.0'); + expect(result.asset).toBe('ETH'); + expect(result.eventType).toBe('transfer'); + }); + + it('normalizes a contract call with input data', () => { + const raw: Record = { + hash: '0xdef456', + from: '0xalice', + to: '0xcontract', + value: '0', + data: '0xa9059cbb000000000000000000000000', + blockTimestamp: 1700000000, + }; + + const result: NormalizedChainEvent = monitor.normalizeEvent(raw); + + expect(result.eventType).toBe('contract_call'); + expect(result.to).toBe('0xcontract'); + expect(result.amount).toBeUndefined(); + }); + + it('normalizes a contract deployment (no "to" field)', () => { + const raw: Record = { + hash: '0xghi789', + from: '0xdeployer', + data: '0x6080604052', + blockTimestamp: 1700000000, + }; + + const result: NormalizedChainEvent = monitor.normalizeEvent(raw); + + expect(result.eventType).toBe('contract_deploy'); + expect(result.to).toBeUndefined(); + }); + + it('handles blockTimestamp as a string', () => { + const raw: Record = { + hash: '0x123', + from: '0xalice', + data: '0x', + blockTimestamp: '2024-01-01T00:00:00.000Z', + }; + + const result: NormalizedChainEvent = monitor.normalizeEvent(raw); + + expect(result.timestamp).toBe('2024-01-01T00:00:00.000Z'); + }); + + it('handles missing fields gracefully', () => { + const raw: Record = {}; + + const result: NormalizedChainEvent = monitor.normalizeEvent(raw); + + expect(result.chainId).toBe('ethereum'); + expect(result.txHash).toBe(''); + expect(result.from).toBe(''); + expect(result.to).toBeUndefined(); + expect(result.eventType).toBe('contract_deploy'); + expect(result.raw).toBe(raw); + }); + + it('includes the raw event payload', () => { + const raw: Record = { + hash: '0xfull', + from: '0xalice', + customField: 'extra-data', + }; + + const result: NormalizedChainEvent = monitor.normalizeEvent(raw); + + expect(result.raw).toBe(raw); + expect(result.raw.customField).toBe('extra-data'); + }); + }); + + describe('subscribe', () => { + it('polls for new blocks and emits normalized events', async () => { + // First call (initial sync) returns 4, second call (poll) returns 5 + mockGetBlockNumber.mockResolvedValueOnce(4).mockResolvedValue(5); + mockGetBlock.mockImplementation((blockNum: number, _includeTxs: boolean) => { + if (blockNum <= 5) { + return Promise.resolve({ + number: blockNum, + timestamp: 1700000000 + blockNum, + transactions: [ + { + hash: `0xblock${blockNum}tx0`, + from: '0xalice', + to: '0xbob', + value: BigInt('1000000000000000000'), + data: '0x', + }, + ], + }); + } + return Promise.resolve(null); + }); + + const callback = jest.fn(); + + // subscribe resolves after the first poll cycle, which includes the callback + await monitor.subscribe(callback); + + expect(callback).toHaveBeenCalled(); + }); + + it('prevents duplicate polling when subscribe is called twice', async () => { + mockGetBlockNumber.mockResolvedValue(1); + mockGetBlock.mockResolvedValue(null); + + await monitor.subscribe(jest.fn()); + + // Second subscribe should return early without throwing + await expect(monitor.subscribe(jest.fn())).resolves.toBeUndefined(); + }); + }); + + describe('isHealthy', () => { + it('returns true when RPC responds with a block number', async () => { + mockGetBlockNumber.mockResolvedValue(21000000); + + const healthy = await monitor.isHealthy(); + + expect(healthy).toBe(true); + }); + + it('returns false when RPC throws', async () => { + mockGetBlockNumber.mockRejectedValue(new Error('connection refused')); + + const healthy = await monitor.isHealthy(); + + expect(healthy).toBe(false); + }); + + it('returns false when block number is negative', async () => { + mockGetBlockNumber.mockResolvedValue(-1); + + const healthy = await monitor.isHealthy(); + + expect(healthy).toBe(false); + }); + }); +}); diff --git a/apps/backend/src/modules/chains/monitors/ethereum.chain-monitor.ts b/apps/backend/src/modules/chains/monitors/ethereum.chain-monitor.ts new file mode 100644 index 0000000..7c0f804 --- /dev/null +++ b/apps/backend/src/modules/chains/monitors/ethereum.chain-monitor.ts @@ -0,0 +1,171 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { ethers } from 'ethers'; +import { IChainMonitor } from '../interfaces/chain-monitor.interface'; +import { NormalizedChainEvent } from '../interfaces/normalized-chain-event.interface'; + +/** + * Ethereum chain monitor — implements IChainMonitor for EVM-compatible chains. + * + * Polls the Ethereum JSON-RPC endpoint for new blocks, extracts transactions, + * and normalizes each to NormalizedChainEvent. + * + * Environment variables: + * ETHEREUM_RPC_URL — Ethereum JSON-RPC endpoint (default: http://localhost:8545) + * ETHEREUM_POLL_INTERVAL_MS — polling interval in milliseconds (default: 15000) + */ +@Injectable() +export class EthereumChainMonitor implements IChainMonitor { + readonly chainId = 'ethereum'; + private readonly logger = new Logger(EthereumChainMonitor.name); + private readonly provider: ethers.JsonRpcProvider; + private readonly pollIntervalMs: number; + private isPolling = false; + + constructor(rpcUrl?: string, pollIntervalMs?: number) { + const url = rpcUrl ?? process.env.ETHEREUM_RPC_URL ?? 'http://localhost:8545'; + this.provider = new ethers.JsonRpcProvider(url); + this.pollIntervalMs = + (pollIntervalMs ?? Number(process.env.ETHEREUM_POLL_INTERVAL_MS)) || 15000; + this.logger.log(`EthereumChainMonitor: connecting to ${url}`); + } + + /** + * Translate a raw Ethereum transaction into a NormalizedChainEvent. + * + * Handles: + * - Native ETH transfers (value > 0) + * - Contract calls (data field present) + * - Contract creation (to is null) + */ + normalizeEvent(rawEvent: Record): NormalizedChainEvent { + const from = typeof rawEvent.from === 'string' ? rawEvent.from : ''; + const to = typeof rawEvent.to === 'string' ? rawEvent.to : undefined; + const txHash = typeof rawEvent.hash === 'string' ? rawEvent.hash : ''; + + let eventType = 'transfer'; + let amount: string | undefined; + let asset: string | undefined; + + // Native ETH value transfer + if (typeof rawEvent.value === 'string' || typeof rawEvent.value === 'bigint') { + const valueWei = BigInt(rawEvent.value as string | bigint); + if (valueWei > 0n) { + amount = ethers.formatEther(valueWei); + asset = 'ETH'; + } + } + + // Classify event type: contract_deploy > contract_call > transfer + if (to === undefined) { + eventType = 'contract_deploy'; + } else if ( + rawEvent.data && + typeof rawEvent.data === 'string' && + (rawEvent.data as string).length > 2 + ) { + eventType = 'contract_call'; + } + + let timestamp: string; + if (typeof rawEvent.blockTimestamp === 'string') { + timestamp = rawEvent.blockTimestamp; + } else if (typeof rawEvent.blockTimestamp === 'number') { + timestamp = new Date((rawEvent.blockTimestamp as number) * 1000).toISOString(); + } else { + timestamp = new Date().toISOString(); + } + + return { + timestamp, + chainId: this.chainId, + eventType, + txHash, + from, + to, + amount, + asset, + raw: rawEvent, + }; + } + + async subscribe(onEvent: (event: NormalizedChainEvent) => void): Promise { + if (this.isPolling) { + this.logger.warn('EthereumChainMonitor: already polling, skipping duplicate subscribe'); + return; + } + + this.isPolling = true; + this.logger.log('EthereumChainMonitor: starting block polling'); + + let lastBlock = await this.provider.getBlockNumber().catch(() => undefined); + + const poll = async () => { + try { + const currentBlock = await this.provider.getBlockNumber(); + + if (lastBlock === undefined) { + lastBlock = currentBlock; + } + + // Fetch new blocks since last poll + const fromBlock = lastBlock + 1; + if (fromBlock <= currentBlock) { + for (let blockNum = fromBlock; blockNum <= currentBlock; blockNum++) { + try { + const block = await this.provider.getBlock(blockNum, true); + if (!block?.transactions) continue; + + const txs = Array.isArray(block.transactions) ? block.transactions : []; + + for (const tx of txs) { + if (typeof tx === 'object' && tx !== null) { + // Access getter properties from the TransactionResponse. + const txObj = tx as ethers.TransactionResponse; + const txPlain: Record = { + hash: txObj.hash, + from: txObj.from, + to: txObj.to, + value: typeof txObj.value === 'bigint' ? txObj.value.toString() : txObj.value, + data: txObj.data, + }; + + const rawTx: Record = { + ...txPlain, + blockTimestamp: + typeof block.timestamp === 'number' ? block.timestamp : undefined, + blockNumber: block.number, + }; + + const normalized = this.normalizeEvent(rawTx); + onEvent(normalized); + } + } + } catch (blockError) { + this.logger.warn( + `EthereumChainMonitor: error fetching block ${blockNum}: ${String(blockError)}`, + ); + } + } + + lastBlock = currentBlock; + } + } catch (error) { + this.logger.warn(`EthereumChainMonitor: poll error: ${String(error)}`); + } + + setTimeout(() => void poll(), this.pollIntervalMs); + }; + + await poll(); + } + + async isHealthy(): Promise { + try { + const blockNumber = await this.provider.getBlockNumber(); + return typeof blockNumber === 'number' && blockNumber >= 0; + } catch (error) { + this.logger.warn(`EthereumChainMonitor: health check failed: ${String(error)}`); + return false; + } + } +}