Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions apps/backend/src/modules/chains/chains.module.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Module } from '@nestjs/common';
import { ChainRegistryService } from './chain-registry.service';
import { StellarChainMonitor } from './monitors/stellar.chain-monitor';
import { EthereumChainMonitor } from './monitors/ethereum.chain-monitor';
import { IChainMonitor } from './interfaces/chain-monitor.interface';

/**
Expand All @@ -20,10 +21,9 @@ import { IChainMonitor } from './interfaces/chain-monitor.interface';
useFactory: (): IChainMonitor[] => {
const monitors: IChainMonitor[] = [new StellarChainMonitor()];

// Future chains — uncomment to activate:
// if (process.env.ETHEREUM_RPC_URL) {
// monitors.push(new EthereumChainMonitor(process.env.ETHEREUM_RPC_URL));
// }
if (process.env.ETHEREUM_RPC_URL) {
monitors.push(new EthereumChainMonitor(process.env.ETHEREUM_RPC_URL));
}

return monitors;
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
import { EthereumChainMonitor } from './ethereum.chain-monitor';
import { NormalizedChainEvent } from '../interfaces/normalized-chain-event.interface';

// Mock the ethers module so that ethers.JsonRpcProvider returns a controlled stub
const mockGetBlockNumber = jest.fn();
const mockGetBlock = jest.fn();

jest.mock('ethers', () => {
// eslint-disable-next-line @typescript-eslint/no-var-requires
const actual = jest.requireActual('ethers');
const formatEther = actual.formatEther ?? actual.ethers?.formatEther;
return {
__esModule: true,
ethers: {
JsonRpcProvider: jest.fn().mockImplementation(() => ({
getBlockNumber: mockGetBlockNumber,
getBlock: mockGetBlock,
})),
formatEther,
},
};
});

describe('EthereumChainMonitor', () => {
let monitor: EthereumChainMonitor;

beforeEach(() => {
jest.useFakeTimers();
mockGetBlockNumber.mockReset();
mockGetBlock.mockReset();

monitor = new EthereumChainMonitor('http://localhost:8545', 5000);
});

afterEach(() => {
jest.useRealTimers();
});

describe('chainId', () => {
it('returns "ethereum"', () => {
expect(monitor.chainId).toBe('ethereum');
});
});

describe('normalizeEvent', () => {
it('normalizes a native ETH transfer', () => {
const raw: Record<string, unknown> = {
hash: '0xabc123',
from: '0xalice',
to: '0xbob',
value: '1000000000000000000', // 1 ETH
data: '0x',
blockTimestamp: 1700000000,
};

const result: NormalizedChainEvent = monitor.normalizeEvent(raw);

expect(result.chainId).toBe('ethereum');
expect(result.txHash).toBe('0xabc123');
expect(result.from).toBe('0xalice');
expect(result.to).toBe('0xbob');
expect(result.amount).toBe('1.0');
expect(result.asset).toBe('ETH');
expect(result.eventType).toBe('transfer');
});

it('normalizes a contract call with input data', () => {
const raw: Record<string, unknown> = {
hash: '0xdef456',
from: '0xalice',
to: '0xcontract',
value: '0',
data: '0xa9059cbb000000000000000000000000',
blockTimestamp: 1700000000,
};

const result: NormalizedChainEvent = monitor.normalizeEvent(raw);

expect(result.eventType).toBe('contract_call');
expect(result.to).toBe('0xcontract');
expect(result.amount).toBeUndefined();
});

it('normalizes a contract deployment (no "to" field)', () => {
const raw: Record<string, unknown> = {
hash: '0xghi789',
from: '0xdeployer',
data: '0x6080604052',
blockTimestamp: 1700000000,
};

const result: NormalizedChainEvent = monitor.normalizeEvent(raw);

expect(result.eventType).toBe('contract_deploy');
expect(result.to).toBeUndefined();
});

it('handles blockTimestamp as a string', () => {
const raw: Record<string, unknown> = {
hash: '0x123',
from: '0xalice',
data: '0x',
blockTimestamp: '2024-01-01T00:00:00.000Z',
};

const result: NormalizedChainEvent = monitor.normalizeEvent(raw);

expect(result.timestamp).toBe('2024-01-01T00:00:00.000Z');
});

it('handles missing fields gracefully', () => {
const raw: Record<string, unknown> = {};

const result: NormalizedChainEvent = monitor.normalizeEvent(raw);

expect(result.chainId).toBe('ethereum');
expect(result.txHash).toBe('');
expect(result.from).toBe('');
expect(result.to).toBeUndefined();
expect(result.eventType).toBe('contract_deploy');
expect(result.raw).toBe(raw);
});

it('includes the raw event payload', () => {
const raw: Record<string, unknown> = {
hash: '0xfull',
from: '0xalice',
customField: 'extra-data',
};

const result: NormalizedChainEvent = monitor.normalizeEvent(raw);

expect(result.raw).toBe(raw);
expect(result.raw.customField).toBe('extra-data');
});
});

describe('subscribe', () => {
it('polls for new blocks and emits normalized events', async () => {
// First call (initial sync) returns 4, second call (poll) returns 5
mockGetBlockNumber.mockResolvedValueOnce(4).mockResolvedValue(5);
mockGetBlock.mockImplementation((blockNum: number, _includeTxs: boolean) => {
if (blockNum <= 5) {
return Promise.resolve({
number: blockNum,
timestamp: 1700000000 + blockNum,
transactions: [
{
hash: `0xblock${blockNum}tx0`,
from: '0xalice',
to: '0xbob',
value: BigInt('1000000000000000000'),
data: '0x',
},
],
});
}
return Promise.resolve(null);
});

const callback = jest.fn();

// subscribe resolves after the first poll cycle, which includes the callback
await monitor.subscribe(callback);

expect(callback).toHaveBeenCalled();
});

it('prevents duplicate polling when subscribe is called twice', async () => {
mockGetBlockNumber.mockResolvedValue(1);
mockGetBlock.mockResolvedValue(null);

await monitor.subscribe(jest.fn());

// Second subscribe should return early without throwing
await expect(monitor.subscribe(jest.fn())).resolves.toBeUndefined();
});
});

describe('isHealthy', () => {
it('returns true when RPC responds with a block number', async () => {
mockGetBlockNumber.mockResolvedValue(21000000);

const healthy = await monitor.isHealthy();

expect(healthy).toBe(true);
});

it('returns false when RPC throws', async () => {
mockGetBlockNumber.mockRejectedValue(new Error('connection refused'));

const healthy = await monitor.isHealthy();

expect(healthy).toBe(false);
});

it('returns false when block number is negative', async () => {
mockGetBlockNumber.mockResolvedValue(-1);

const healthy = await monitor.isHealthy();

expect(healthy).toBe(false);
});
});
});
171 changes: 171 additions & 0 deletions apps/backend/src/modules/chains/monitors/ethereum.chain-monitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import { Injectable, Logger } from '@nestjs/common';
import { ethers } from 'ethers';
import { IChainMonitor } from '../interfaces/chain-monitor.interface';
import { NormalizedChainEvent } from '../interfaces/normalized-chain-event.interface';

/**
* Ethereum chain monitor — implements IChainMonitor for EVM-compatible chains.
*
* Polls the Ethereum JSON-RPC endpoint for new blocks, extracts transactions,
* and normalizes each to NormalizedChainEvent.
*
* Environment variables:
* ETHEREUM_RPC_URL — Ethereum JSON-RPC endpoint (default: http://localhost:8545)
* ETHEREUM_POLL_INTERVAL_MS — polling interval in milliseconds (default: 15000)
*/
@Injectable()
export class EthereumChainMonitor implements IChainMonitor {
readonly chainId = 'ethereum';
private readonly logger = new Logger(EthereumChainMonitor.name);
private readonly provider: ethers.JsonRpcProvider;
private readonly pollIntervalMs: number;
private isPolling = false;

constructor(rpcUrl?: string, pollIntervalMs?: number) {
const url = rpcUrl ?? process.env.ETHEREUM_RPC_URL ?? 'http://localhost:8545';
this.provider = new ethers.JsonRpcProvider(url);
this.pollIntervalMs =
(pollIntervalMs ?? Number(process.env.ETHEREUM_POLL_INTERVAL_MS)) || 15000;
this.logger.log(`EthereumChainMonitor: connecting to ${url}`);
}

/**
* Translate a raw Ethereum transaction into a NormalizedChainEvent.
*
* Handles:
* - Native ETH transfers (value > 0)
* - Contract calls (data field present)
* - Contract creation (to is null)
*/
normalizeEvent(rawEvent: Record<string, unknown>): NormalizedChainEvent {
const from = typeof rawEvent.from === 'string' ? rawEvent.from : '';
const to = typeof rawEvent.to === 'string' ? rawEvent.to : undefined;
const txHash = typeof rawEvent.hash === 'string' ? rawEvent.hash : '';

let eventType = 'transfer';
let amount: string | undefined;
let asset: string | undefined;

// Native ETH value transfer
if (typeof rawEvent.value === 'string' || typeof rawEvent.value === 'bigint') {
const valueWei = BigInt(rawEvent.value as string | bigint);
if (valueWei > 0n) {
amount = ethers.formatEther(valueWei);
asset = 'ETH';
}
}

// Classify event type: contract_deploy > contract_call > transfer
if (to === undefined) {
eventType = 'contract_deploy';
} else if (
rawEvent.data &&
typeof rawEvent.data === 'string' &&
(rawEvent.data as string).length > 2
) {
eventType = 'contract_call';
}

let timestamp: string;
if (typeof rawEvent.blockTimestamp === 'string') {
timestamp = rawEvent.blockTimestamp;
} else if (typeof rawEvent.blockTimestamp === 'number') {
timestamp = new Date((rawEvent.blockTimestamp as number) * 1000).toISOString();
} else {
timestamp = new Date().toISOString();
}

return {
timestamp,
chainId: this.chainId,
eventType,
txHash,
from,
to,
amount,
asset,
raw: rawEvent,
};
}

async subscribe(onEvent: (event: NormalizedChainEvent) => void): Promise<void> {
if (this.isPolling) {
this.logger.warn('EthereumChainMonitor: already polling, skipping duplicate subscribe');
return;
}

this.isPolling = true;
this.logger.log('EthereumChainMonitor: starting block polling');

let lastBlock = await this.provider.getBlockNumber().catch(() => undefined);

const poll = async () => {
try {
const currentBlock = await this.provider.getBlockNumber();

if (lastBlock === undefined) {
lastBlock = currentBlock;
}

// Fetch new blocks since last poll
const fromBlock = lastBlock + 1;
if (fromBlock <= currentBlock) {
for (let blockNum = fromBlock; blockNum <= currentBlock; blockNum++) {
try {
const block = await this.provider.getBlock(blockNum, true);
if (!block?.transactions) continue;

const txs = Array.isArray(block.transactions) ? block.transactions : [];

for (const tx of txs) {
if (typeof tx === 'object' && tx !== null) {
// Access getter properties from the TransactionResponse.
const txObj = tx as ethers.TransactionResponse;
const txPlain: Record<string, unknown> = {
hash: txObj.hash,
from: txObj.from,
to: txObj.to,
value: typeof txObj.value === 'bigint' ? txObj.value.toString() : txObj.value,
data: txObj.data,
};

const rawTx: Record<string, unknown> = {
...txPlain,
blockTimestamp:
typeof block.timestamp === 'number' ? block.timestamp : undefined,
blockNumber: block.number,
};

const normalized = this.normalizeEvent(rawTx);
onEvent(normalized);
}
}
} catch (blockError) {
this.logger.warn(
`EthereumChainMonitor: error fetching block ${blockNum}: ${String(blockError)}`,
);
}
}

lastBlock = currentBlock;
}
} catch (error) {
this.logger.warn(`EthereumChainMonitor: poll error: ${String(error)}`);
}

setTimeout(() => void poll(), this.pollIntervalMs);
};

await poll();
}

async isHealthy(): Promise<boolean> {
try {
const blockNumber = await this.provider.getBlockNumber();
return typeof blockNumber === 'number' && blockNumber >= 0;
} catch (error) {
this.logger.warn(`EthereumChainMonitor: health check failed: ${String(error)}`);
return false;
}
}
}
Loading