diff --git a/README.md b/README.md index d2e93c9..71b6be0 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,21 @@ Resolvers can be combined with any adapter. Implementing the `Resolver` interfac Adapters are responsible only for receiving and returning raw packets. They call back into the server with the payload, source IP, and port so your resolver logic stays isolated. +## PROXY protocol + +When the DNS server sits behind a proxy or load balancer that speaks the [HAProxy PROXY protocol](https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt) (AWS NLB, HAProxy, nginx, Envoy, etc.), enable it on the `Server` so resolver callbacks see the real client address instead of the proxy's. Both v1 (text) and v2 (binary) headers are parsed, and the feature applies to both UDP datagrams and TCP connections. + +```php +$server = new Server($adapter, $resolver); +$server->setProxyProtocol(enabled: true); +$server->start(); +``` + +Detection is per-connection (TCP) or per-datagram (UDP): traffic that begins with a PROXY v1/v2 signature is parsed and the real client address is passed to the resolver; traffic without a signature is handled as a direct DNS request. This keeps health checks and direct clients working during rollouts. + +> [!WARNING] +> Only enable PROXY protocol on listeners that are exclusively reachable by trusted proxies. An untrusted client can forge a PROXY header to spoof its source address. Bind to an internal network interface or enforce network-level ACLs. + ## DNS client The bundled client can query any DNS server and returns fully decoded messages. diff --git a/src/DNS/Adapter.php b/src/DNS/Adapter.php index 86e5eb0..fc9db10 100644 --- a/src/DNS/Adapter.php +++ b/src/DNS/Adapter.php @@ -2,10 +2,36 @@ namespace Utopia\DNS; +/** + * Transport adapter contract. + * + * Adapters own the wire — UDP sockets, TCP sockets, connection framing, + * and PROXY protocol preamble handling. From the {@see Server}'s + * perspective, DNS messages appear as generic "request with a client + * address"; the adapter hides everything below. + */ abstract class Adapter { /** - * Worker start + * Whether incoming traffic may be prefixed with a PROXY protocol + * preamble. When enabled, the adapter strips the preamble (if + * present) and reports the real client address through the + * {@see onMessage()} callback. + */ + protected bool $enableProxyProtocol = false; + + public function setProxyProtocol(bool $enabled): void + { + $this->enableProxyProtocol = $enabled; + } + + public function hasProxyProtocol(): bool + { + return $this->enableProxyProtocol; + } + + /** + * Worker start callback. Invoked once per worker. * * @param callable(int $workerId): void $callback * @phpstan-param callable(int $workerId): void $callback @@ -13,22 +39,30 @@ abstract class Adapter abstract public function onWorkerStart(callable $callback): void; /** - * Packet handler + * Register the DNS message handler. + * + * Invoked once per complete DNS message, regardless of transport. + * The adapter has already stripped any PROXY preamble and extracted + * the framed message; $ip/$port reflect the real client address. + * + * The callback returns the response bytes to send back to the + * client, or an empty string to suppress the response. + * + * $maxResponseSize is the maximum response size appropriate for the + * transport (UDP: 512 per RFC 1035 unless EDNS0; TCP: 65535). * * @param callable(string $buffer, string $ip, int $port, ?int $maxResponseSize): string $callback - * @phpstan-param callable(string $buffer, string $ip, int $port, ?int $maxResponseSize):string $callback + * @phpstan-param callable(string $buffer, string $ip, int $port, ?int $maxResponseSize): string $callback */ - abstract public function onPacket(callable $callback): void; + abstract public function onMessage(callable $callback): void; /** - * Start the DNS server + * Start the DNS server. */ abstract public function start(): void; /** - * Get the name of the adapter - * - * @return string + * Get the name of the adapter. */ abstract public function getName(): string; } diff --git a/src/DNS/Adapter/Composite.php b/src/DNS/Adapter/Composite.php new file mode 100644 index 0000000..fe195ec --- /dev/null +++ b/src/DNS/Adapter/Composite.php @@ -0,0 +1,69 @@ + */ + protected array $adapters; + + public function __construct(Adapter ...$adapters) + { + $this->adapters = array_values($adapters); + } + + public function setProxyProtocol(bool $enabled): void + { + parent::setProxyProtocol($enabled); + foreach ($this->adapters as $adapter) { + $adapter->setProxyProtocol($enabled); + } + } + + public function onWorkerStart(callable $callback): void + { + foreach ($this->adapters as $adapter) { + $adapter->onWorkerStart($callback); + } + } + + public function onMessage(callable $callback): void + { + foreach ($this->adapters as $adapter) { + $adapter->onMessage($callback); + } + } + + public function start(): void + { + foreach ($this->adapters as $adapter) { + $adapter->start(); + } + } + + public function getName(): string + { + $names = array_map(fn (Adapter $a) => $a->getName(), $this->adapters); + return 'composite(' . implode('+', $names) . ')'; + } + + /** + * @return list + */ + public function getAdapters(): array + { + return $this->adapters; + } +} diff --git a/src/DNS/Adapter/Native.php b/src/DNS/Adapter/Native.php deleted file mode 100644 index 5635996..0000000 --- a/src/DNS/Adapter/Native.php +++ /dev/null @@ -1,347 +0,0 @@ - */ - protected array $tcpClients = []; - - /** @var array */ - protected array $tcpBuffers = []; - - /** @var array Track last activity time per TCP client for idle timeout */ - protected array $tcpLastActivity = []; - - /** @var callable(string $buffer, string $ip, int $port, ?int $maxResponseSize): string */ - protected mixed $onPacket; - - /** @var list */ - protected array $onWorkerStart = []; - - /** - * @param string $host Host to bind to - * @param int $port Port to listen on - * @param bool $enableTcp Enable TCP support (RFC 5966) - * @param int $maxTcpClients Maximum concurrent TCP clients - * @param int $maxTcpBufferSize Maximum buffer size per TCP client - * @param int $maxTcpFrameSize Maximum DNS message size over TCP - * @param int $tcpIdleTimeout Seconds before idle TCP connections are closed (RFC 7766) - */ - public function __construct( - protected string $host = '0.0.0.0', - protected int $port = 8053, - protected bool $enableTcp = true, - protected int $maxTcpClients = 100, - protected int $maxTcpBufferSize = 16384, - protected int $maxTcpFrameSize = 65535, - protected int $tcpIdleTimeout = 30 - ) { - - $server = \socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); - if (!$server) { - throw new Exception('Could not start server.'); - } - $this->udpServer = $server; - - if ($this->enableTcp) { - $tcp = \socket_create(AF_INET, SOCK_STREAM, SOL_TCP); - if (!$tcp) { - throw new Exception('Could not start TCP server.'); - } - - socket_set_option($tcp, SOL_SOCKET, SO_REUSEADDR, 1); - $this->tcpServer = $tcp; - } - } - - /** - * Worker start callback - * - * @param callable(int $workerId): void $callback - * @phpstan-param callable(int $workerId): void $callback - */ - public function onWorkerStart(callable $callback): void - { - $this->onWorkerStart[] = $callback; - } - - /** - * @param callable $callback - * @phpstan-param callable(string $buffer, string $ip, int $port, ?int $maxResponseSize):string $callback - */ - public function onPacket(callable $callback): void - { - $this->onPacket = $callback; - } - - /** - * Start the DNS server - */ - public function start(): void - { - if (socket_bind($this->udpServer, $this->host, $this->port) == false) { - throw new Exception('Could not bind server to a server.'); - } - - if ($this->tcpServer) { - if (socket_bind($this->tcpServer, $this->host, $this->port) == false) { - throw new Exception('Could not bind TCP server.'); - } - - if (socket_listen($this->tcpServer, 128) == false) { - throw new Exception('Could not listen on TCP server.'); - } - - socket_set_nonblock($this->tcpServer); - } - - foreach ($this->onWorkerStart as $callback) { - \call_user_func($callback, 0); - } - - /** @phpstan-ignore-next-line */ - while (1) { - // RFC 7766 Section 6.2.3: Close idle TCP connections - $this->closeIdleTcpClients(); - - $readSockets = [$this->udpServer]; - - if ($this->tcpServer) { - $readSockets[] = $this->tcpServer; - } - - foreach ($this->tcpClients as $client) { - $readSockets[] = $client; - } - - $write = []; - $except = []; - - // Use 1 second timeout for socket_select to periodically check idle connections - $changed = socket_select($readSockets, $write, $except, 1); - - if ($changed === false || $changed === 0) { - continue; - } - - foreach ($readSockets as $socket) { - if ($socket === $this->udpServer) { - $buf = ''; - $ip = ''; - $port = 0; - $len = socket_recvfrom($this->udpServer, $buf, 1024 * 4, 0, $ip, $port); - - if ($len > 0 && is_string($buf) && is_string($ip) && is_int($port)) { - $answer = call_user_func($this->onPacket, $buf, $ip, $port, 512); - - if ($answer !== '') { - socket_sendto($this->udpServer, $answer, strlen($answer), 0, $ip, $port); - } - } - - continue; - } - - if ($this->tcpServer !== null && $socket === $this->tcpServer) { - $client = @socket_accept($this->tcpServer); - - if ($client instanceof Socket) { - if (count($this->tcpClients) >= $this->maxTcpClients) { - @socket_close($client); - continue; - } - - if (@socket_set_nonblock($client) === false) { - @socket_close($client); - continue; - } - - socket_set_option($client, SOL_SOCKET, SO_KEEPALIVE, 1); - socket_set_option($client, SOL_SOCKET, SO_RCVTIMEO, ['sec' => 5, 'usec' => 0]); - socket_set_option($client, SOL_SOCKET, SO_SNDTIMEO, ['sec' => 5, 'usec' => 0]); - - $id = spl_object_id($client); - $this->tcpClients[$id] = $client; - $this->tcpBuffers[$id] = ''; - $this->tcpLastActivity[$id] = time(); - } - - continue; - } - - // Remaining readable sockets are TCP clients. - $this->handleTcpClient($socket); - } - } - } - - /** - * Get the name of the adapter - * - * @return string - */ - public function getName(): string - { - return 'native'; - } - - protected function handleTcpClient(Socket $client): void - { - $clientId = spl_object_id($client); - - $chunk = @socket_read($client, 8192, PHP_BINARY_READ); - - if ($chunk === '' || $chunk === false) { - $error = socket_last_error($client); - - if ($chunk === '' || !in_array($error, [SOCKET_EAGAIN, SOCKET_EWOULDBLOCK], true)) { - $this->closeTcpClient($client); - } - - return; - } - - // Update activity timestamp for idle timeout tracking - $this->tcpLastActivity[$clientId] = time(); - - $currentBufferSize = strlen($this->tcpBuffers[$clientId] ?? ''); - $chunkSize = strlen($chunk); - - if ($currentBufferSize + $chunkSize > $this->maxTcpBufferSize) { - printf("TCP buffer size limit exceeded for client %d\n", $clientId); - $this->closeTcpClient($client); - return; - } - - $this->tcpBuffers[$clientId] = ($this->tcpBuffers[$clientId] ?? '') . $chunk; - - while (strlen($this->tcpBuffers[$clientId]) >= 2) { - $unpacked = unpack('n', substr($this->tcpBuffers[$clientId], 0, 2)); - $payloadLength = (is_array($unpacked) && array_key_exists(1, $unpacked) && is_int($unpacked[1])) ? $unpacked[1] : 0; - - // Close connection for invalid zero-length payloads - if ($payloadLength === 0) { - $this->closeTcpClient($client); - return; - } - - // DNS TCP messages have a 2-byte length prefix (max 65535), but we enforce - // a stricter limit to prevent memory exhaustion from malicious clients - if ($payloadLength > $this->maxTcpFrameSize) { - printf("Invalid TCP frame size %d for client %d\n", $payloadLength, $clientId); - $this->closeTcpClient($client); - return; - } - - if (strlen($this->tcpBuffers[$clientId]) < ($payloadLength + 2)) { - return; - } - - $message = substr($this->tcpBuffers[$clientId], 2, $payloadLength); - $this->tcpBuffers[$clientId] = substr($this->tcpBuffers[$clientId], $payloadLength + 2); - - $ip = ''; - $port = 0; - socket_getpeername($client, $ip, $port); - - if (is_string($ip) && is_int($port)) { - $answer = call_user_func($this->onPacket, $message, $ip, $port, self::MAX_TCP_MESSAGE_SIZE); - - if ($answer !== '') { - $this->sendTcpResponse($client, $answer); - } - } - } - } - - /** - * Send a TCP DNS response with length prefix. - * - * Per RFC 1035 Section 4.2.2, TCP messages use a 2-byte length prefix. - * This limits maximum message size to 65535 bytes. Oversized responses - * are rejected to prevent silent data corruption from integer overflow. - */ - protected function sendTcpResponse(Socket $client, string $payload): void - { - $payloadLength = strlen($payload); - - // RFC 1035: TCP uses 2-byte length prefix, max 65535 bytes - if ($payloadLength > self::MAX_TCP_MESSAGE_SIZE) { - // This should not happen if truncation is working correctly - // Log and close connection rather than send corrupted data - printf( - "TCP response too large (%d bytes > %d max), dropping\n", - $payloadLength, - self::MAX_TCP_MESSAGE_SIZE - ); - $this->closeTcpClient($client); - return; - } - - $frame = pack('n', $payloadLength) . $payload; - $total = strlen($frame); - $sent = 0; - - while ($sent < $total) { - $written = @socket_write($client, substr($frame, $sent)); - - if ($written === false) { - $error = socket_last_error($client); - - if (in_array($error, [SOCKET_EAGAIN, SOCKET_EWOULDBLOCK], true)) { - socket_clear_error($client); - usleep(1000); - continue; - } - - $this->closeTcpClient($client); - return; - } - - $sent += $written; - } - } - - /** - * Close idle TCP connections per RFC 7766 Section 6.2.3 - * - * Servers should close idle connections to free resources. - * This prevents resource exhaustion from slow or abandoned clients. - */ - protected function closeIdleTcpClients(): void - { - $now = time(); - - foreach ($this->tcpClients as $id => $client) { - $lastActivity = $this->tcpLastActivity[$id] ?? 0; - - if (($now - $lastActivity) > $this->tcpIdleTimeout) { - $this->closeTcpClient($client); - } - } - } - - protected function closeTcpClient(Socket $client): void - { - $id = spl_object_id($client); - - unset($this->tcpClients[$id], $this->tcpBuffers[$id], $this->tcpLastActivity[$id]); - - @socket_close($client); - } -} diff --git a/src/DNS/Adapter/NativeTcp.php b/src/DNS/Adapter/NativeTcp.php new file mode 100644 index 0000000..867da9a --- /dev/null +++ b/src/DNS/Adapter/NativeTcp.php @@ -0,0 +1,279 @@ + Active client sockets keyed by spl_object_id. */ + protected array $clients = []; + + /** @var array Per-client message stream. */ + protected array $streams = []; + + /** @var array Last-activity timestamp for idle-timeout enforcement. */ + protected array $lastActivity = []; + + protected ?Socket $server = null; + + /** @var callable(string $buffer, string $ip, int $port, ?int $maxResponseSize): string */ + protected mixed $onMessage; + + /** @var list */ + protected array $onWorkerStartCallbacks = []; + + /** + * @param string $host Host to bind to + * @param int $port Port to listen on + * @param int $maxClients Maximum concurrent TCP clients + * @param int $idleTimeout Seconds before idle TCP connections are closed (RFC 7766) + */ + public function __construct( + protected string $host = '0.0.0.0', + protected int $port = 8053, + protected int $maxClients = 100, + protected int $idleTimeout = 30, + ) { + $socket = \socket_create(AF_INET, SOCK_STREAM, SOL_TCP); + if (!$socket) { + throw new Exception('Could not create TCP socket.'); + } + + socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1); + $this->server = $socket; + } + + public function onWorkerStart(callable $callback): void + { + $this->onWorkerStartCallbacks[] = $callback; + } + + public function onMessage(callable $callback): void + { + $this->onMessage = $callback; + } + + public function start(): void + { + if ($this->server === null) { + throw new Exception('TCP server socket is not available.'); + } + + if (socket_bind($this->server, $this->host, $this->port) === false) { + throw new Exception('Could not bind TCP server.'); + } + + if (socket_listen($this->server, 128) === false) { + throw new Exception('Could not listen on TCP server.'); + } + + socket_set_nonblock($this->server); + + foreach ($this->onWorkerStartCallbacks as $callback) { + \call_user_func($callback, 0); + } + + /** @phpstan-ignore-next-line */ + while (1) { + $this->closeIdleClients(); + + $readSockets = [$this->server]; + foreach ($this->clients as $client) { + $readSockets[] = $client; + } + + $write = []; + $except = []; + + // 1s timeout keeps the idle sweep responsive. + $changed = socket_select($readSockets, $write, $except, 1); + if ($changed === false || $changed === 0) { + continue; + } + + foreach ($readSockets as $sock) { + if ($sock === $this->server) { + $this->acceptClient(); + continue; + } + + $this->readClient($sock); + } + } + } + + public function getName(): string + { + return 'native-tcp'; + } + + public function getServerSocket(): ?Socket + { + return $this->server; + } + + /** + * @return list + */ + public function getClientSockets(): array + { + return array_values($this->clients); + } + + protected function acceptClient(): void + { + if ($this->server === null) { + return; + } + + $client = @socket_accept($this->server); + + if (!$client instanceof Socket) { + return; + } + + if (count($this->clients) >= $this->maxClients) { + @socket_close($client); + return; + } + + if (@socket_set_nonblock($client) === false) { + @socket_close($client); + return; + } + + socket_set_option($client, SOL_SOCKET, SO_KEEPALIVE, 1); + socket_set_option($client, SOL_SOCKET, SO_RCVTIMEO, ['sec' => 5, 'usec' => 0]); + socket_set_option($client, SOL_SOCKET, SO_SNDTIMEO, ['sec' => 5, 'usec' => 0]); + + $peerIp = ''; + $peerPort = 0; + socket_getpeername($client, $peerIp, $peerPort); + + $fd = spl_object_id($client); + $this->clients[$fd] = $client; + $this->lastActivity[$fd] = time(); + $this->streams[$fd] = new TcpMessageStream( + peerIp: is_string($peerIp) ? $peerIp : '', + peerPort: is_int($peerPort) ? $peerPort : 0, + enableProxyProtocol: $this->enableProxyProtocol, + ); + } + + public function readClient(Socket $client): void + { + $fd = spl_object_id($client); + + $chunk = @socket_read($client, 8192, PHP_BINARY_READ); + + if ($chunk === '' || $chunk === false) { + $error = socket_last_error($client); + + if ($chunk === '' || !in_array($error, [SOCKET_EAGAIN, SOCKET_EWOULDBLOCK], true)) { + $this->closeClient($client); + } + + return; + } + + $this->lastActivity[$fd] = time(); + + $stream = $this->streams[$fd] ?? null; + if ($stream === null) { + $this->closeClient($client); + return; + } + + try { + foreach ($stream->feed($chunk) as [$message, $ip, $port]) { + $response = \call_user_func($this->onMessage, $message, $ip, $port, TcpMessageStream::MAX_MESSAGE_SIZE); + + if ($response !== '') { + $this->writeFramed($client, $response); + } + } + } catch (ProxyDecodingException | MessageDecodingException) { + $this->closeClient($client); + } + } + + protected function writeFramed(Socket $client, string $response): void + { + $length = strlen($response); + + if ($length > TcpMessageStream::MAX_MESSAGE_SIZE) { + // Truncation should have been applied upstream; oversize payloads + // would silently corrupt framing via the 2-byte length prefix. + $this->closeClient($client); + return; + } + + $frame = pack('n', $length) . $response; + $total = strlen($frame); + $sent = 0; + + while ($sent < $total) { + $written = @socket_write($client, substr($frame, $sent)); + + if ($written === false) { + $error = socket_last_error($client); + + if (in_array($error, [SOCKET_EAGAIN, SOCKET_EWOULDBLOCK], true)) { + socket_clear_error($client); + usleep(1000); + continue; + } + + $this->closeClient($client); + return; + } + + $sent += $written; + } + } + + /** + * Close idle TCP connections per RFC 7766 § 6.2.3. + */ + protected function closeIdleClients(): void + { + $now = time(); + + foreach ($this->clients as $fd => $client) { + $last = $this->lastActivity[$fd] ?? 0; + + if (($now - $last) > $this->idleTimeout) { + $this->closeClient($client); + } + } + } + + protected function closeClient(Socket $client): void + { + $fd = spl_object_id($client); + + if (!isset($this->clients[$fd])) { + return; + } + + unset($this->clients[$fd], $this->lastActivity[$fd], $this->streams[$fd]); + + @socket_close($client); + } +} diff --git a/src/DNS/Adapter/NativeUdp.php b/src/DNS/Adapter/NativeUdp.php new file mode 100644 index 0000000..6fadf0b --- /dev/null +++ b/src/DNS/Adapter/NativeUdp.php @@ -0,0 +1,130 @@ + */ + protected array $onWorkerStartCallbacks = []; + + public function __construct( + protected string $host = '0.0.0.0', + protected int $port = 8053, + ) { + $socket = \socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); + if (!$socket) { + throw new Exception('Could not create UDP socket.'); + } + $this->socket = $socket; + } + + public function onWorkerStart(callable $callback): void + { + $this->onWorkerStartCallbacks[] = $callback; + } + + public function onMessage(callable $callback): void + { + $this->onMessage = $callback; + } + + public function start(): void + { + if (socket_bind($this->socket, $this->host, $this->port) === false) { + throw new Exception('Could not bind UDP server.'); + } + + foreach ($this->onWorkerStartCallbacks as $callback) { + \call_user_func($callback, 0); + } + + /** @phpstan-ignore-next-line */ + while (1) { + $readSockets = [$this->socket]; + $write = []; + $except = []; + + $changed = socket_select($readSockets, $write, $except, null); + if ($changed === false || $changed === 0) { + continue; + } + + $this->handleReadable(); + } + } + + public function getName(): string + { + return 'native-udp'; + } + + /** + * Non-blocking single iteration — useful for composite adapters that + * run their own select loop over the sockets exposed via + * {@see getSocket()}. + */ + public function handleReadable(): void + { + $buf = ''; + $ip = ''; + $port = 0; + $len = socket_recvfrom($this->socket, $buf, 1024 * 4, 0, $ip, $port); + + if ($len === false || $len <= 0 || !is_string($buf) || !is_string($ip) || !is_int($port)) { + return; + } + + // Reply goes back to the actual UDP peer, not the PROXY-declared source. + $replyIp = $ip; + $replyPort = $port; + + if ($this->enableProxyProtocol) { + try { + $header = ProxyProtocolStream::unwrapDatagram($buf); + } catch (ProxyDecodingException) { + return; + } + + if ($header !== null && $header->sourceAddress !== null && $header->sourcePort !== null) { + $ip = $header->sourceAddress; + $port = $header->sourcePort; + } + } + + $response = \call_user_func($this->onMessage, $buf, $ip, $port, self::UDP_MAX_MESSAGE_SIZE); + + if ($response !== '') { + socket_sendto($this->socket, $response, strlen($response), 0, $replyIp, $replyPort); + } + } + + public function getSocket(): Socket + { + return $this->socket; + } +} diff --git a/src/DNS/Adapter/Swoole.php b/src/DNS/Adapter/Swoole.php deleted file mode 100644 index a2c8ce4..0000000 --- a/src/DNS/Adapter/Swoole.php +++ /dev/null @@ -1,131 +0,0 @@ -server = new Server($this->host, $this->port, SWOOLE_PROCESS, SWOOLE_SOCK_UDP); - $this->server->set([ - 'worker_num' => $this->numWorkers, - 'max_coroutine' => $this->maxCoroutines, - ]); - - if ($this->enableTcp) { - $port = $this->server->addListener($this->host, $this->port, SWOOLE_SOCK_TCP); - - if ($port instanceof Port) { - $this->tcpPort = $port; - $this->tcpPort->set([ - 'open_length_check' => true, - 'package_length_type' => 'n', - 'package_length_offset' => 0, - 'package_body_offset' => 2, - 'package_max_length' => 65537, - ]); - } - } - } - - /** - * Worker start callback - * - * @param callable(int $workerId): void $callback - */ - public function onWorkerStart(callable $callback): void - { - $this->server->on('WorkerStart', function ($server, $workerId) use ($callback) { - if (is_int($workerId)) { - \call_user_func($callback, $workerId); - } - }); - } - - /** - * @param callable $callback - * @phpstan-param callable(string $buffer, string $ip, int $port, ?int $maxResponseSize):string $callback - */ - public function onPacket(callable $callback): void - { - $this->onPacket = $callback; - - // UDP handler - enforces 512-byte limit per RFC 1035 - $this->server->on('Packet', function ($server, $data, $clientInfo) { - if (!is_string($data) || !is_array($clientInfo)) { - return; - } - - $ip = is_string($clientInfo['address'] ?? null) ? $clientInfo['address'] : ''; - $port = is_int($clientInfo['port'] ?? null) ? $clientInfo['port'] : 0; - - $response = \call_user_func($this->onPacket, $data, $ip, $port, 512); - - if ($response !== '' && $server instanceof Server) { - $server->sendto($ip, $port, $response); - } - }); - - // TCP handler - supports larger responses with length-prefixed framing per RFC 5966 - if ($this->tcpPort instanceof Port) { - $this->tcpPort->on('Receive', function (Server $server, int $fd, int $reactorId, string $data) { - $info = $server->getClientInfo($fd, $reactorId); - if (!is_array($info)) { - return; - } - - $payload = substr($data, 2); // strip 2-byte length prefix - $ip = is_string($info['remote_ip'] ?? null) ? $info['remote_ip'] : ''; - $port = is_int($info['remote_port'] ?? null) ? $info['remote_port'] : 0; - - $response = \call_user_func($this->onPacket, $payload, $ip, $port, self::MAX_TCP_MESSAGE_SIZE); - - if ($response !== '') { - $server->send($fd, pack('n', strlen($response)) . $response); - } - }); - } - } - - /** - * Start the DNS server - */ - public function start(): void - { - Runtime::enableCoroutine(); - $this->server->start(); - } - - /** - * Get the name of the adapter - * - * @return string - */ - public function getName(): string - { - return 'swoole'; - } -} diff --git a/src/DNS/Adapter/SwooleTcp.php b/src/DNS/Adapter/SwooleTcp.php new file mode 100644 index 0000000..749fae2 --- /dev/null +++ b/src/DNS/Adapter/SwooleTcp.php @@ -0,0 +1,157 @@ + Per-fd message stream. */ + protected array $streams = []; + + /** @var callable(string $buffer, string $ip, int $port, ?int $maxResponseSize): string */ + protected mixed $onMessage; + + public function __construct( + protected string $host = '0.0.0.0', + protected int $tcpPort = 53, + protected int $numWorkers = 1, + protected int $maxCoroutines = 3000, + ?Server $server = null, + ) { + if ($server === null) { + $this->server = new Server($this->host, $this->tcpPort, SWOOLE_PROCESS, SWOOLE_SOCK_TCP); + $this->server->set([ + 'worker_num' => $this->numWorkers, + 'max_coroutine' => $this->maxCoroutines, + ]); + $this->port = $this->server; + $this->owned = true; + } else { + $this->server = $server; + $listener = $server->addListener($this->host, $this->tcpPort, SWOOLE_SOCK_TCP); + if (!$listener instanceof Port) { + throw new \RuntimeException('Could not add TCP listener to Swoole server.'); + } + $this->port = $listener; + $this->owned = false; + } + + $this->port->set([ + 'open_length_check' => false, + ]); + } + + public function onWorkerStart(callable $callback): void + { + $this->server->on('WorkerStart', function ($server, $workerId) use ($callback) { + if (is_int($workerId)) { + \call_user_func($callback, $workerId); + } + }); + } + + public function onMessage(callable $callback): void + { + $this->onMessage = $callback; + + $this->port->on('Connect', function (Server $server, int $fd) { + $info = $server->getClientInfo($fd); + $ip = is_array($info) && is_string($info['remote_ip'] ?? null) ? $info['remote_ip'] : ''; + $port = is_array($info) && is_int($info['remote_port'] ?? null) ? $info['remote_port'] : 0; + + $this->streams[$fd] = new TcpMessageStream( + peerIp: $ip, + peerPort: $port, + enableProxyProtocol: $this->enableProxyProtocol, + ); + }); + + $this->port->on('Close', function (Server $server, int $fd) { + unset($this->streams[$fd]); + }); + + $this->port->on('Receive', function (Server $server, int $fd, int $reactorId, string $data) { + $stream = $this->streams[$fd] ?? null; + + if ($stream === null) { + $info = $server->getClientInfo($fd, $reactorId); + $ip = is_array($info) && is_string($info['remote_ip'] ?? null) ? $info['remote_ip'] : ''; + $port = is_array($info) && is_int($info['remote_port'] ?? null) ? $info['remote_port'] : 0; + + $stream = new TcpMessageStream( + peerIp: $ip, + peerPort: $port, + enableProxyProtocol: $this->enableProxyProtocol, + ); + $this->streams[$fd] = $stream; + } + + try { + foreach ($stream->feed($data) as [$message, $ip, $port]) { + $response = \call_user_func( + $this->onMessage, + $message, + $ip, + $port, + TcpMessageStream::MAX_MESSAGE_SIZE, + ); + + if ($response !== '') { + if (strlen($response) > TcpMessageStream::MAX_MESSAGE_SIZE) { + $server->close($fd); + return; + } + $server->send($fd, pack('n', strlen($response)) . $response); + } + } + } catch (ProxyDecodingException | MessageDecodingException) { + $server->close($fd); + } + }); + } + + public function start(): void + { + if ($this->owned) { + Runtime::enableCoroutine(); + $this->server->start(); + } + } + + public function getName(): string + { + return 'swoole-tcp'; + } + + public function getServer(): Server + { + return $this->server; + } +} diff --git a/src/DNS/Adapter/SwooleUdp.php b/src/DNS/Adapter/SwooleUdp.php new file mode 100644 index 0000000..56e7caa --- /dev/null +++ b/src/DNS/Adapter/SwooleUdp.php @@ -0,0 +1,114 @@ +server = new Server($this->host, $this->port, SWOOLE_PROCESS, SWOOLE_SOCK_UDP); + $this->server->set([ + 'worker_num' => $this->numWorkers, + 'max_coroutine' => $this->maxCoroutines, + ]); + $this->owned = true; + } else { + $this->server = $server; + $this->owned = false; + } + } + + public function onWorkerStart(callable $callback): void + { + $this->server->on('WorkerStart', function ($server, $workerId) use ($callback) { + if (is_int($workerId)) { + \call_user_func($callback, $workerId); + } + }); + } + + public function onMessage(callable $callback): void + { + $this->onMessage = $callback; + + $this->server->on('Packet', function ($server, $data, $clientInfo) { + if (!is_string($data) || !is_array($clientInfo)) { + return; + } + + $peerIp = is_string($clientInfo['address'] ?? null) ? $clientInfo['address'] : ''; + $peerPort = is_int($clientInfo['port'] ?? null) ? $clientInfo['port'] : 0; + $ip = $peerIp; + $port = $peerPort; + $payload = $data; + + if ($this->enableProxyProtocol) { + try { + $header = ProxyProtocolStream::unwrapDatagram($payload); + } catch (ProxyDecodingException) { + return; + } + + if ($header !== null && $header->sourceAddress !== null && $header->sourcePort !== null) { + $ip = $header->sourceAddress; + $port = $header->sourcePort; + } + } + + $response = \call_user_func($this->onMessage, $payload, $ip, $port, self::UDP_MAX_MESSAGE_SIZE); + + if ($response !== '' && $server instanceof Server) { + // Reply goes back to the actual UDP peer (the proxy), not the parsed client. + $server->sendto($peerIp, $peerPort, $response); + } + }); + } + + public function start(): void + { + if ($this->owned) { + Runtime::enableCoroutine(); + $this->server->start(); + } + } + + public function getName(): string + { + return 'swoole-udp'; + } + + public function getServer(): Server + { + return $this->server; + } +} diff --git a/src/DNS/Exception/ProxyProtocol/DecodingException.php b/src/DNS/Exception/ProxyProtocol/DecodingException.php new file mode 100644 index 0000000..b3a0b35 --- /dev/null +++ b/src/DNS/Exception/ProxyProtocol/DecodingException.php @@ -0,0 +1,10 @@ += self::V1_MAX_LENGTH) { + throw new DecodingException('PROXY v1 header missing CRLF within 107 bytes.'); + } + return null; + } + + $lineLength = $terminator + 2; + + if ($lineLength > self::V1_MAX_LENGTH) { + throw new DecodingException('PROXY v1 header exceeds 107 bytes.'); + } + + $line = \substr($buffer, 0, $terminator); + $parts = \explode(' ', $line); + + if ($parts[0] !== 'PROXY') { + throw new DecodingException('PROXY v1 header missing PROXY token.'); + } + + $proto = $parts[1] ?? ''; + + // Per spec: receivers MUST ignore everything past UNKNOWN on the line. + if ($proto === 'UNKNOWN') { + return new self( + version: self::VERSION_1, + isLocal: false, + family: self::FAMILY_UNKNOWN, + sourceAddress: null, + sourcePort: null, + destinationAddress: null, + destinationPort: null, + bytesConsumed: $lineLength, + ); + } + + if ($proto !== 'TCP4' && $proto !== 'TCP6') { + throw new DecodingException('PROXY v1 header has unsupported protocol: ' . $proto); + } + + if (\count($parts) !== 6) { + throw new DecodingException('PROXY v1 header is malformed.'); + } + + [, , $srcAddr, $dstAddr, $srcPort, $dstPort] = $parts; + + $ipFlag = $proto === 'TCP4' ? \FILTER_FLAG_IPV4 : \FILTER_FLAG_IPV6; + + if (\filter_var($srcAddr, \FILTER_VALIDATE_IP, $ipFlag) === false) { + throw new DecodingException('PROXY v1 invalid source address: ' . $srcAddr); + } + + if (\filter_var($dstAddr, \FILTER_VALIDATE_IP, $ipFlag) === false) { + throw new DecodingException('PROXY v1 invalid destination address: ' . $dstAddr); + } + + return new self( + version: self::VERSION_1, + isLocal: false, + family: $proto, + sourceAddress: $srcAddr, + sourcePort: self::decodeV1Port($srcPort, 'source'), + destinationAddress: $dstAddr, + destinationPort: self::decodeV1Port($dstPort, 'destination'), + bytesConsumed: $lineLength, + ); + } + + private static function decodeV1Port(string $value, string $label): int + { + if (!\ctype_digit($value) || ($value[0] === '0' && $value !== '0')) { + throw new DecodingException('PROXY v1 invalid ' . $label . ' port: ' . $value); + } + + $port = (int) $value; + + if ($port > 65535) { + throw new DecodingException('PROXY v1 ' . $label . ' port out of range: ' . $value); + } + + return $port; + } + + private static function decodeV2(string $buffer): ?self + { + if (\strlen($buffer) < self::V2_HEADER_LENGTH) { + return null; + } + + // Single unpack call: ver/cmd byte, family/transport byte, 2-byte length. + $fields = \unpack('CverCmd/CfamTrans/nlength', $buffer, self::V2_SIGNATURE_LENGTH); + + if ($fields === false + || !\is_int($fields['verCmd'] ?? null) + || !\is_int($fields['famTrans'] ?? null) + || !\is_int($fields['length'] ?? null) + ) { + throw new DecodingException('PROXY v2 header could not be unpacked.'); + } + + $verCmd = $fields['verCmd']; + $famTrans = $fields['famTrans']; + $payloadLength = $fields['length']; + + if (($verCmd & 0xF0) !== 0x20) { + throw new DecodingException('PROXY v2 header has invalid version.'); + } + + $command = $verCmd & 0x0F; + + if ($command !== self::COMMAND_LOCAL && $command !== self::COMMAND_PROXY) { + throw new DecodingException('PROXY v2 header has invalid command: ' . $command); + } + + $totalLength = self::V2_HEADER_LENGTH + $payloadLength; + + if (\strlen($buffer) < $totalLength) { + return null; + } + + if ($command === self::COMMAND_LOCAL) { + return new self( + version: self::VERSION_2, + isLocal: true, + family: self::FAMILY_UNKNOWN, + sourceAddress: null, + sourcePort: null, + destinationAddress: null, + destinationPort: null, + bytesConsumed: $totalLength, + ); + } + + $addressFamily = ($famTrans & 0xF0) >> 4; + $transport = $famTrans & 0x0F; + + if ($transport !== self::TRANSPORT_STREAM && $transport !== self::TRANSPORT_DGRAM) { + return self::opaqueV2($totalLength); + } + + return match ($addressFamily) { + self::ADDRESS_FAMILY_INET => self::decodeV2Inet( + $buffer, + $transport, + $totalLength, + self::V2_INET_PAYLOAD_LENGTH, + 4, + ), + self::ADDRESS_FAMILY_INET6 => self::decodeV2Inet( + $buffer, + $transport, + $totalLength, + self::V2_INET6_PAYLOAD_LENGTH, + 16, + ), + self::ADDRESS_FAMILY_UNIX => self::decodeV2Unix( + $buffer, + $payloadLength, + $totalLength, + ), + default => self::opaqueV2($totalLength), + }; + } + + private static function decodeV2Inet(string $buffer, int $transport, int $totalLength, int $minPayload, int $addrSize): self + { + if ($totalLength - self::V2_HEADER_LENGTH < $minPayload) { + throw new DecodingException('PROXY v2 INET payload too short for declared family.'); + } + + $offset = self::V2_HEADER_LENGTH; + + $srcRaw = \substr($buffer, $offset, $addrSize); + $dstRaw = \substr($buffer, $offset + $addrSize, $addrSize); + $srcAddr = \inet_ntop($srcRaw); + $dstAddr = \inet_ntop($dstRaw); + + if ($srcAddr === false || $dstAddr === false) { + throw new DecodingException('PROXY v2 INET address could not be decoded.'); + } + + $ports = \unpack('nsrc/ndst', $buffer, $offset + ($addrSize * 2)); + + if ($ports === false || !\is_int($ports['src'] ?? null) || !\is_int($ports['dst'] ?? null)) { + throw new DecodingException('PROXY v2 INET ports could not be decoded.'); + } + + $isInet6 = $addrSize === 16; + + if ($transport === self::TRANSPORT_STREAM) { + $family = $isInet6 ? self::FAMILY_TCP6 : self::FAMILY_TCP4; + } else { + $family = $isInet6 ? self::FAMILY_UDP6 : self::FAMILY_UDP4; + } + + return new self( + version: self::VERSION_2, + isLocal: false, + family: $family, + sourceAddress: $srcAddr, + sourcePort: $ports['src'], + destinationAddress: $dstAddr, + destinationPort: $ports['dst'], + bytesConsumed: $totalLength, + ); + } + + private static function decodeV2Unix(string $buffer, int $payloadLength, int $totalLength): self + { + if ($payloadLength < self::V2_UNIX_PAYLOAD_LENGTH) { + throw new DecodingException('PROXY v2 UNIX payload too short.'); + } + + $offset = self::V2_HEADER_LENGTH; + $src = \rtrim(\substr($buffer, $offset, 108), "\x00"); + $dst = \rtrim(\substr($buffer, $offset + 108, 108), "\x00"); + + return new self( + version: self::VERSION_2, + isLocal: false, + family: self::FAMILY_UNIX, + sourceAddress: $src !== '' ? $src : null, + sourcePort: null, + destinationAddress: $dst !== '' ? $dst : null, + destinationPort: null, + bytesConsumed: $totalLength, + ); + } + + /** + * v2 headers with an unknown family or transport are passed through as + * opaque frames: we still advance past them so downstream parsing lines + * up, but the address info is discarded. + */ + private static function opaqueV2(int $totalLength): self + { + return new self( + version: self::VERSION_2, + isLocal: false, + family: self::FAMILY_UNKNOWN, + sourceAddress: null, + sourcePort: null, + destinationAddress: null, + destinationPort: null, + bytesConsumed: $totalLength, + ); + } +} diff --git a/src/DNS/ProxyProtocolStream.php b/src/DNS/ProxyProtocolStream.php new file mode 100644 index 0000000..b351fd8 --- /dev/null +++ b/src/DNS/ProxyProtocolStream.php @@ -0,0 +1,132 @@ +state !== self::STATE_UNRESOLVED) { + return $this->state; + } + + $version = ProxyProtocol::detect($buffer); + + if ($version === null) { + return self::STATE_UNRESOLVED; + } + + if ($version === 0) { + $this->state = self::STATE_DIRECT; + return $this->state; + } + + $header = ProxyProtocol::decode($buffer); + + if ($header === null) { + return self::STATE_UNRESOLVED; + } + + $this->header = $header; + $buffer = \substr($buffer, $header->bytesConsumed); + $this->state = self::STATE_PROXIED; + + return $this->state; + } + + public function state(): int + { + return $this->state; + } + + public function header(): ?ProxyProtocol + { + return $this->header; + } + + public function isResolved(): bool + { + return $this->state !== self::STATE_UNRESOLVED; + } + + public function isProxied(): bool + { + return $this->state === self::STATE_PROXIED; + } + + /** + * Strip a PROXY preamble from a single UDP datagram. + * + * Returns the parsed header when a preamble was present and consumed + * (the buffer is stripped in place). Returns null when the datagram + * does not start with a PROXY signature — callers should treat it as a + * direct datagram. Throws when the datagram starts with a signature + * but the preamble is malformed or incomplete; callers should drop the + * datagram in that case (unlike TCP, UDP has no "wait for more"). + * + * @throws DecodingException + */ + public static function unwrapDatagram(string &$buffer): ?ProxyProtocol + { + $version = ProxyProtocol::detect($buffer); + + if ($version === 0) { + return null; + } + + if ($version === null) { + throw new DecodingException('PROXY datagram is too short to classify.'); + } + + $header = ProxyProtocol::decode($buffer); + + if ($header === null) { + throw new DecodingException('PROXY datagram preamble is incomplete.'); + } + + $buffer = \substr($buffer, $header->bytesConsumed); + + return $header; + } +} diff --git a/src/DNS/Server.php b/src/DNS/Server.php index 5345baa..d34cb7c 100644 --- a/src/DNS/Server.php +++ b/src/DNS/Server.php @@ -67,9 +67,6 @@ class Server protected bool $debug = false; - /** - * Telemetry metrics - */ protected ?Histogram $duration = null; protected ?Counter $queriesTotal = null; protected ?Counter $responsesTotal = null; @@ -81,11 +78,6 @@ public function __construct(Adapter $adapter, Resolver $resolver) $this->setTelemetry(new NoTelemetry()); } - /** - * Set telemetry adapter - * - * @param Telemetry $telemetry - */ public function setTelemetry(Telemetry $telemetry): void { $this->duration = $telemetry->createHistogram( @@ -95,7 +87,6 @@ public function setTelemetry(Telemetry $telemetry): void ['ExplicitBucketBoundaries' => [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1]] ); - // Initialize additional telemetry metrics $this->queriesTotal = $telemetry->createCounter('dns.queries.total'); $this->responsesTotal = $telemetry->createCounter('dns.responses.total'); } @@ -128,12 +119,6 @@ public function onWorkerStart(callable $handler): self return $this; } - /** - * Set Debug Mode - * - * @param bool $status - * @return self - */ public function setDebug(bool $status): self { $this->debug = $status; @@ -141,11 +126,20 @@ public function setDebug(bool $status): self } /** - * Handle Error + * Expect a PROXY protocol (v1 or v2) preamble on each UDP datagram and + * TCP connection. Traffic without a preamble is still handled as direct + * DNS, so health checks and direct clients keep working. * - * @param Throwable $error - * @return void + * Only enable when the listener is reachable solely from trusted + * proxies — untrusted clients can forge a PROXY preamble to spoof their + * source address. */ + public function setProxyProtocol(bool $enabled): self + { + $this->adapter->setProxyProtocol($enabled); + return $this; + } + protected function handleError(Throwable $error): void { foreach ($this->errors as $handler) { @@ -154,16 +148,14 @@ protected function handleError(Throwable $error): void } /** - * Handle packet - * - * @param string $buffer - * @param string $ip - * @param int $port - * @param int|null $maxResponseSize + * Handle a complete DNS message. * - * @return string + * Called once per decoded query, regardless of transport. $ip and + * $port are the real client address (already resolved from PROXY + * protocol when enabled). Subclasses override this hook to customize + * message handling; the default runs decode → resolve → encode. */ - protected function onPacket(string $buffer, string $ip, int $port, ?int $maxResponseSize = null): string + protected function onMessage(string $buffer, string $ip, int $port, ?int $maxResponseSize = null): string { $span = Span::init('dns.packet'); $span->set('client.ip', $ip); @@ -172,7 +164,6 @@ protected function onPacket(string $buffer, string $ip, int $port, ?int $maxResp $response = null; try { - // 1. Parse Message. $decodeStart = microtime(true); try { $query = Message::decode($buffer); @@ -196,7 +187,6 @@ protected function onPacket(string $buffer, string $ip, int $port, ?int $maxResp $span->set('dns.duration.decode', $decodeDuration); // RFC 1035: Only OPCODE 0 (QUERY) is supported - // Return NOTIMP for other opcodes (IQUERY=1 is obsolete, STATUS=2, others reserved) if ($query->header->opcode !== 0) { $response = Message::response( $query->header, @@ -224,7 +214,6 @@ protected function onPacket(string $buffer, string $ip, int $port, ?int $maxResp 'type' => $question->type ?? null, ]); - // 2. Resolve query $resolveStart = microtime(true); try { $response = $this->resolver->resolve($query); @@ -246,7 +235,6 @@ protected function onPacket(string $buffer, string $ip, int $port, ?int $maxResp ]); $span->set('dns.duration.resolve', $resolveDuration); - // 3. Encode response $encodeStart = microtime(true); try { return $response->encode($maxResponseSize); @@ -288,8 +276,7 @@ protected function onPacket(string $buffer, string $ip, int $port, ?int $maxResp public function start(): void { try { - $onPacket = $this->onPacket(...); - $this->adapter->onPacket($onPacket); + $this->adapter->onMessage($this->onMessage(...)); $this->adapter->start(); } catch (Throwable $error) { $this->handleError($error); diff --git a/src/DNS/TcpMessageStream.php b/src/DNS/TcpMessageStream.php new file mode 100644 index 0000000..71cd17b --- /dev/null +++ b/src/DNS/TcpMessageStream.php @@ -0,0 +1,117 @@ +proxyStream = $enableProxyProtocol ? new ProxyProtocolStream() : null; + } + + /** + * Feed newly-received bytes and iterate over any complete DNS + * messages that are now available. + * + * Each yielded value is a tuple [message bytes, peer ip, peer port]. + * The peer fields reflect the current resolved address — for a + * PROXY-enabled stream they switch from the transport peer to the + * PROXY-declared source after the preamble is consumed. + * + * @return Generator + * + * @throws ProxyDecodingException if the PROXY preamble is malformed. + * @throws MessageDecodingException if framing is invalid or the buffer overflows. + */ + public function feed(string $bytes): Generator + { + $this->buffer .= $bytes; + + if (\strlen($this->buffer) > self::MAX_BUFFER_SIZE) { + throw new MessageDecodingException('TCP buffer exceeded maximum size.'); + } + + if ($this->proxyStream !== null && $this->proxyStream->state() === ProxyProtocolStream::STATE_UNRESOLVED) { + $state = $this->proxyStream->resolve($this->buffer); + + if ($state === ProxyProtocolStream::STATE_UNRESOLVED) { + return; + } + + $header = $this->proxyStream->header(); + if ($header !== null && $header->sourceAddress !== null && $header->sourcePort !== null) { + $this->peerIp = $header->sourceAddress; + $this->peerPort = $header->sourcePort; + } + } + + while (\strlen($this->buffer) >= 2) { + $unpacked = \unpack('n', \substr($this->buffer, 0, 2)); + $frameLength = (\is_array($unpacked) && \is_int($unpacked[1] ?? null)) ? $unpacked[1] : 0; + + if ($frameLength === 0) { + throw new MessageDecodingException('TCP frame announced zero length.'); + } + + if ($frameLength > self::MAX_MESSAGE_SIZE) { + throw new MessageDecodingException("TCP frame length {$frameLength} exceeds maximum."); + } + + if (\strlen($this->buffer) < $frameLength + 2) { + return; + } + + $message = \substr($this->buffer, 2, $frameLength); + $this->buffer = \substr($this->buffer, $frameLength + 2); + + yield [$message, $this->peerIp, $this->peerPort]; + } + } + + public function peerIp(): string + { + return $this->peerIp; + } + + public function peerPort(): int + { + return $this->peerPort; + } +} diff --git a/tests/resources/server.php b/tests/resources/server.php index 0e15140..5501186 100644 --- a/tests/resources/server.php +++ b/tests/resources/server.php @@ -3,7 +3,8 @@ require __DIR__ . '/../../vendor/autoload.php'; use Utopia\DNS\Server; -use Utopia\DNS\Adapter\Swoole; +use Utopia\DNS\Adapter\SwooleUdp; +use Utopia\DNS\Adapter\SwooleTcp; use Utopia\DNS\Message; use Utopia\DNS\Message\Record; use Utopia\DNS\Resolver; @@ -22,7 +23,12 @@ Span::addExporter(new Exporter\Stdout()); $port = (int) (getenv('PORT') ?: 5300); -$server = new Swoole('0.0.0.0', $port); + +// Share one Swoole\Server between the UDP adapter and the TCP adapter so +// both listeners run under a single event loop on the same port. +$udpAdapter = new SwooleUdp('0.0.0.0', $port); +$tcpAdapter = new SwooleTcp('0.0.0.0', $port, server: $udpAdapter->getServer()); +$adapter = new \Utopia\DNS\Adapter\Composite($udpAdapter, $tcpAdapter); $records = [ // Single A @@ -108,7 +114,7 @@ public function getName(): string } }; -$dns = new Server($server, $multiZoneResolver); +$dns = new Server($adapter, $multiZoneResolver); $dns->setDebug(false); $dns->onWorkerStart(function (Server $server, int $workerId) { diff --git a/tests/unit/DNS/ProxyProtocolStreamTest.php b/tests/unit/DNS/ProxyProtocolStreamTest.php new file mode 100644 index 0000000..df42121 --- /dev/null +++ b/tests/unit/DNS/ProxyProtocolStreamTest.php @@ -0,0 +1,272 @@ +assertFalse($stream->isResolved()); + $this->assertFalse($stream->isProxied()); + $this->assertNull($stream->header()); + $this->assertSame(ProxyProtocolStream::STATE_UNRESOLVED, $stream->state()); + } + + public function testResolveWithDnsLikeBufferTransitionsToDirect(): void + { + $stream = new ProxyProtocolStream(); + $buffer = "\x12\x34\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00"; + $original = $buffer; + + $state = $stream->resolve($buffer); + + $this->assertSame(ProxyProtocolStream::STATE_DIRECT, $state); + $this->assertTrue($stream->isResolved()); + $this->assertFalse($stream->isProxied()); + $this->assertNull($stream->header()); + $this->assertSame($original, $buffer, 'DIRECT state must not modify buffer'); + } + + public function testResolveWithCompleteV1PreambleConsumesAndResolves(): void + { + $stream = new ProxyProtocolStream(); + $preamble = "PROXY TCP4 1.2.3.4 5.6.7.8 111 222\r\n"; + $payload = "\x00\x0aHELLODNS!"; + $buffer = $preamble . $payload; + + $state = $stream->resolve($buffer); + + $this->assertSame(ProxyProtocolStream::STATE_PROXIED, $state); + $this->assertTrue($stream->isResolved()); + $this->assertTrue($stream->isProxied()); + $this->assertSame($payload, $buffer, 'preamble bytes should be stripped from buffer'); + + $header = $stream->header(); + $this->assertNotNull($header); + $this->assertSame('1.2.3.4', $header->sourceAddress); + $this->assertSame(111, $header->sourcePort); + } + + public function testResolveWithCompleteV2PreambleConsumesAndResolves(): void + { + $stream = new ProxyProtocolStream(); + $addrPayload = inet_pton('10.0.0.1') . inet_pton('10.0.0.2') . pack('nn', 5000, 53); + $preamble = ProxyProtocol::V2_SIGNATURE + . chr(0x21) + . chr(0x11) + . pack('n', strlen($addrPayload)) + . $addrPayload; + $payload = "REMAINING"; + $buffer = $preamble . $payload; + + $state = $stream->resolve($buffer); + + $this->assertSame(ProxyProtocolStream::STATE_PROXIED, $state); + $this->assertSame($payload, $buffer); + $this->assertNotNull($stream->header()); + $this->assertSame('10.0.0.1', $stream->header()->sourceAddress); + } + + public function testResolveWithPartialPreambleStaysUnresolved(): void + { + $stream = new ProxyProtocolStream(); + $buffer = 'PROXY TCP4 1.2.3.4 5.6.7.8'; + + $state = $stream->resolve($buffer); + + $this->assertSame(ProxyProtocolStream::STATE_UNRESOLVED, $state); + $this->assertFalse($stream->isResolved()); + $this->assertSame('PROXY TCP4 1.2.3.4 5.6.7.8', $buffer, 'unresolved must not modify buffer'); + } + + public function testResolveIsIdempotentAfterDirectResolution(): void + { + $stream = new ProxyProtocolStream(); + $buffer = "\x00\x01\x00\x00"; + $stream->resolve($buffer); + + // Subsequent calls with different buffers should return the cached state. + $newBuffer = 'anything'; + $state = $stream->resolve($newBuffer); + $this->assertSame(ProxyProtocolStream::STATE_DIRECT, $state); + $this->assertSame('anything', $newBuffer); + } + + public function testResolveIsIdempotentAfterProxiedResolution(): void + { + $stream = new ProxyProtocolStream(); + $buffer = "PROXY TCP4 1.2.3.4 5.6.7.8 1 2\r\nHELLO"; + $stream->resolve($buffer); + $this->assertSame('HELLO', $buffer); + + // A second call must not strip more bytes. + $state = $stream->resolve($buffer); + $this->assertSame(ProxyProtocolStream::STATE_PROXIED, $state); + $this->assertSame('HELLO', $buffer); + } + + public function testResolveChunkedV1AcrossManyCalls(): void + { + $full = "PROXY TCP4 192.168.1.1 10.0.0.1 56324 443\r\n"; + $trailing = "\x00\x04TAIL"; + + $stream = new ProxyProtocolStream(); + $buffer = ''; + + for ($i = 0; $i < strlen($full) - 1; $i++) { + $buffer .= $full[$i]; + $this->assertSame( + ProxyProtocolStream::STATE_UNRESOLVED, + $stream->resolve($buffer), + "Expected unresolved at byte {$i}" + ); + } + + // Deliver the last preamble byte plus the trailing data in one chunk. + $buffer .= $full[strlen($full) - 1] . $trailing; + $state = $stream->resolve($buffer); + + $this->assertSame(ProxyProtocolStream::STATE_PROXIED, $state); + $this->assertSame($trailing, $buffer); + $this->assertSame('192.168.1.1', $stream->header()?->sourceAddress); + } + + public function testResolveChunkedV2AcrossManyCalls(): void + { + $payload = inet_pton('1.2.3.4') . inet_pton('5.6.7.8') . pack('nn', 1, 2); + $full = ProxyProtocol::V2_SIGNATURE + . chr(0x21) + . chr(0x11) + . pack('n', strlen($payload)) + . $payload; + + $stream = new ProxyProtocolStream(); + $buffer = ''; + + for ($i = 0; $i < strlen($full) - 1; $i++) { + $buffer .= $full[$i]; + $this->assertSame( + ProxyProtocolStream::STATE_UNRESOLVED, + $stream->resolve($buffer), + "Expected unresolved at byte {$i}" + ); + } + + $buffer .= $full[strlen($full) - 1]; + $this->assertSame(ProxyProtocolStream::STATE_PROXIED, $stream->resolve($buffer)); + $this->assertSame('', $buffer); + } + + public function testResolveThrowsOnMalformedPreamble(): void + { + $stream = new ProxyProtocolStream(); + $buffer = "PROXY TCP4 not-an-ip 5.6.7.8 1 2\r\n"; + + $this->expectException(DecodingException::class); + $stream->resolve($buffer); + } + + public function testResolveThrowOnMalformedLeavesStreamUnresolved(): void + { + $stream = new ProxyProtocolStream(); + $buffer = "PROXY TCP4 not-an-ip 5.6.7.8 1 2\r\n"; + + try { + $stream->resolve($buffer); + $this->fail('Expected DecodingException'); + } catch (DecodingException) { + // Expected. + } + + // Stream stays unresolved so callers can close the connection. + $this->assertFalse($stream->isResolved()); + } + + // --------------------------------------------------------------------- + // unwrapDatagram() (stateless UDP use) + // --------------------------------------------------------------------- + + public function testUnwrapDatagramReturnsNullForDirectDns(): void + { + $buffer = "\x12\x34\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00"; + $original = $buffer; + + $header = ProxyProtocolStream::unwrapDatagram($buffer); + + $this->assertNull($header); + $this->assertSame($original, $buffer, 'direct datagram must not be modified'); + } + + public function testUnwrapDatagramStripsCompleteV1Preamble(): void + { + $preamble = "PROXY TCP4 1.2.3.4 5.6.7.8 1 2\r\n"; + $payload = "\x12\x34\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00"; + $buffer = $preamble . $payload; + + $header = ProxyProtocolStream::unwrapDatagram($buffer); + + $this->assertNotNull($header); + $this->assertSame('1.2.3.4', $header->sourceAddress); + $this->assertSame($payload, $buffer); + } + + public function testUnwrapDatagramStripsCompleteV2Preamble(): void + { + $addrPayload = inet_pton('10.0.0.1') . inet_pton('10.0.0.2') . pack('nn', 53, 5353); + $preamble = ProxyProtocol::V2_SIGNATURE + . chr(0x21) + . chr(0x12) // UDP4 + . pack('n', strlen($addrPayload)) + . $addrPayload; + $payload = 'DNSDATA'; + $buffer = $preamble . $payload; + + $header = ProxyProtocolStream::unwrapDatagram($buffer); + + $this->assertNotNull($header); + $this->assertSame(ProxyProtocol::FAMILY_UDP4, $header->family); + $this->assertSame($payload, $buffer); + } + + public function testUnwrapDatagramThrowsOnIncompleteDatagram(): void + { + // Datagram begins with 'P' but is too short to be a complete PROXY signature. + $buffer = 'PRO'; + + $this->expectException(DecodingException::class); + ProxyProtocolStream::unwrapDatagram($buffer); + } + + public function testUnwrapDatagramThrowsOnIncompleteV2Datagram(): void + { + // Datagram starts with v2 signature but payload is truncated. + $addrPayload = inet_pton('1.2.3.4') . inet_pton('5.6.7.8') . pack('nn', 1, 2); + $preamble = ProxyProtocol::V2_SIGNATURE + . chr(0x21) + . chr(0x11) + . pack('n', strlen($addrPayload)) + . substr($addrPayload, 0, -2); // chop off part of the ports + + $this->expectException(DecodingException::class); + ProxyProtocolStream::unwrapDatagram($preamble); + } + + public function testUnwrapDatagramThrowsOnMalformedPreamble(): void + { + $buffer = "PROXY TCP4 not-an-ip 5.6.7.8 1 2\r\nDNSDATA"; + + $this->expectException(DecodingException::class); + ProxyProtocolStream::unwrapDatagram($buffer); + } +} diff --git a/tests/unit/DNS/ProxyProtocolTest.php b/tests/unit/DNS/ProxyProtocolTest.php new file mode 100644 index 0000000..f15e8b1 --- /dev/null +++ b/tests/unit/DNS/ProxyProtocolTest.php @@ -0,0 +1,559 @@ +assertNull(ProxyProtocol::detect('')); + } + + public function testDetectFullV1Match(): void + { + $this->assertSame( + ProxyProtocol::VERSION_1, + ProxyProtocol::detect("PROXY TCP4 1.2.3.4 5.6.7.8 1111 2222\r\n") + ); + } + + public function testDetectFullV2Match(): void + { + $header = ProxyProtocol::V2_SIGNATURE . "\x21\x11\x00\x0C"; + $this->assertSame(ProxyProtocol::VERSION_2, ProxyProtocol::detect($header)); + } + + /** @return iterable */ + public static function partialV1Provider(): iterable + { + yield 'single P' => ['P']; + yield 'two chars' => ['PR']; + yield 'three chars' => ['PRO']; + yield 'four chars' => ['PROX']; + yield 'five chars' => ['PROXY']; + } + + #[DataProvider('partialV1Provider')] + public function testDetectPartialV1ReturnsNull(string $buffer): void + { + $this->assertNull(ProxyProtocol::detect($buffer)); + } + + /** @return iterable */ + public static function partialV2Provider(): iterable + { + for ($i = 1; $i < ProxyProtocol::V2_SIGNATURE_LENGTH; $i++) { + yield "first {$i} bytes" => [substr(ProxyProtocol::V2_SIGNATURE, 0, $i)]; + } + } + + #[DataProvider('partialV2Provider')] + public function testDetectPartialV2ReturnsNull(string $buffer): void + { + $this->assertNull(ProxyProtocol::detect($buffer)); + } + + public function testDetectRejectsDnsQueryHeader(): void + { + // Standard DNS query: ID 0x1234, flags 0x0100, qd=1, everything else 0. + $dnsPacket = "\x12\x34\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00"; + $this->assertSame(0, ProxyProtocol::detect($dnsPacket)); + } + + public function testDetectRejectsCrPrefixButNotV2(): void + { + // Second byte differs from 0x0A. + $this->assertSame(0, ProxyProtocol::detect("\r\xFF\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00")); + } + + public function testDetectRejectsPPrefixButNotV1(): void + { + // Starts with 'P' but not "PROXY ". + $this->assertSame(0, ProxyProtocol::detect("PATH /\r\n")); + } + + public function testDetectRejectsNonPnonCrFirstByte(): void + { + $this->assertSame(0, ProxyProtocol::detect("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")); + } + + /** @return iterable */ + public static function detectBoundaryProvider(): iterable + { + yield 'v1 exact prefix' => [ProxyProtocol::VERSION_1, 'PROXY ']; + yield 'v1 prefix + CRLF' => [ProxyProtocol::VERSION_1, "PROXY \r\n"]; // Malformed body but prefix matches + yield 'v2 exact signature' => [ProxyProtocol::VERSION_2, ProxyProtocol::V2_SIGNATURE]; + yield 'v2 signature + junk' => [ProxyProtocol::VERSION_2, ProxyProtocol::V2_SIGNATURE . "\x00\x00\x00\x00"]; + } + + #[DataProvider('detectBoundaryProvider')] + public function testDetectBoundary(?int $expected, string $buffer): void + { + $this->assertSame($expected, ProxyProtocol::detect($buffer)); + } + + // --------------------------------------------------------------------- + // decode() — v1 + // --------------------------------------------------------------------- + + public function testDecodeV1Tcp4(): void + { + $header = "PROXY TCP4 192.168.1.1 10.0.0.1 56324 443\r\n"; + $result = ProxyProtocol::decode($header); + + $this->assertNotNull($result); + $this->assertSame(ProxyProtocol::VERSION_1, $result->version); + $this->assertFalse($result->isLocal); + $this->assertSame(ProxyProtocol::FAMILY_TCP4, $result->family); + $this->assertSame('192.168.1.1', $result->sourceAddress); + $this->assertSame('10.0.0.1', $result->destinationAddress); + $this->assertSame(56324, $result->sourcePort); + $this->assertSame(443, $result->destinationPort); + $this->assertSame(strlen($header), $result->bytesConsumed); + } + + public function testDecodeV1Tcp6(): void + { + $header = "PROXY TCP6 2001:db8::1 2001:db8::2 65535 53\r\n"; + $result = ProxyProtocol::decode($header); + + $this->assertNotNull($result); + $this->assertSame(ProxyProtocol::FAMILY_TCP6, $result->family); + $this->assertSame('2001:db8::1', $result->sourceAddress); + $this->assertSame('2001:db8::2', $result->destinationAddress); + $this->assertSame(65535, $result->sourcePort); + $this->assertSame(53, $result->destinationPort); + } + + /** @return iterable */ + public static function v1PortBoundaryProvider(): iterable + { + yield 'zero src, zero dst' => ["PROXY TCP4 1.2.3.4 5.6.7.8 0 0\r\n", 0, 0]; + yield 'max src, min dst' => ["PROXY TCP4 1.2.3.4 5.6.7.8 65535 1\r\n", 65535, 1]; + yield 'min src, max dst' => ["PROXY TCP4 1.2.3.4 5.6.7.8 1 65535\r\n", 1, 65535]; + yield 'both middle' => ["PROXY TCP4 1.2.3.4 5.6.7.8 8080 8081\r\n", 8080, 8081]; + } + + #[DataProvider('v1PortBoundaryProvider')] + public function testDecodeV1PortBoundaries(string $header, int $expectedSrc, int $expectedDst): void + { + $result = ProxyProtocol::decode($header); + $this->assertNotNull($result); + $this->assertSame($expectedSrc, $result->sourcePort); + $this->assertSame($expectedDst, $result->destinationPort); + } + + public function testDecodeV1Unknown(): void + { + $header = "PROXY UNKNOWN\r\n"; + $result = ProxyProtocol::decode($header); + + $this->assertNotNull($result); + $this->assertSame(ProxyProtocol::FAMILY_UNKNOWN, $result->family); + $this->assertNull($result->sourceAddress); + $this->assertNull($result->sourcePort); + $this->assertNull($result->destinationAddress); + $this->assertNull($result->destinationPort); + $this->assertSame(strlen($header), $result->bytesConsumed); + } + + public function testDecodeV1UnknownIgnoresTrailingTokens(): void + { + // Per spec: receivers MUST ignore everything past UNKNOWN on the line. + $header = "PROXY UNKNOWN ff:ff::1 aa:aa::2 1234 5678\r\n"; + $result = ProxyProtocol::decode($header); + + $this->assertNotNull($result); + $this->assertSame(ProxyProtocol::FAMILY_UNKNOWN, $result->family); + $this->assertSame(strlen($header), $result->bytesConsumed); + } + + public function testDecodeV1ReturnsNullWhenCrlfMissing(): void + { + $this->assertNull(ProxyProtocol::decode('PROXY TCP4 1.2.3.4 5.6.7.8 1 2')); + } + + public function testDecodeV1DoesNotConsumeTrailingData(): void + { + // Preamble is followed by a 4-byte payload; bytesConsumed must cover only the preamble. + $preamble = "PROXY TCP4 1.2.3.4 5.6.7.8 1 2\r\n"; + $payload = "\x00\x0eABCD"; + $result = ProxyProtocol::decode($preamble . $payload); + $this->assertNotNull($result); + $this->assertSame(strlen($preamble), $result->bytesConsumed); + } + + /** @return iterable */ + public static function malformedV1Provider(): iterable + { + yield 'non-digit port' => ["PROXY TCP4 1.2.3.4 5.6.7.8 abc 80\r\n"]; + yield 'leading zero port' => ["PROXY TCP4 1.2.3.4 5.6.7.8 080 81\r\n"]; + yield 'negative port' => ["PROXY TCP4 1.2.3.4 5.6.7.8 -1 80\r\n"]; + yield 'port out of range' => ["PROXY TCP4 1.2.3.4 5.6.7.8 70000 80\r\n"]; + yield 'invalid IPv4' => ["PROXY TCP4 999.999.999.999 10.0.0.1 1 2\r\n"]; + yield 'invalid IPv6' => ["PROXY TCP6 zz::1 2001:db8::2 1 2\r\n"]; + yield 'IPv4 under TCP6' => ["PROXY TCP6 1.2.3.4 5.6.7.8 1 2\r\n"]; + yield 'IPv6 under TCP4' => ["PROXY TCP4 2001:db8::1 10.0.0.1 1 2\r\n"]; + yield 'missing destination address' => ["PROXY TCP4 1.2.3.4 1 2\r\n"]; + yield 'extra token' => ["PROXY TCP4 1.2.3.4 5.6.7.8 1 2 3\r\n"]; + yield 'unsupported protocol' => ["PROXY HTTP 1.2.3.4 5.6.7.8 1 2\r\n"]; + yield 'double space' => ["PROXY TCP4 1.2.3.4 5.6.7.8 1 2\r\n"]; + yield 'empty port' => ["PROXY TCP4 1.2.3.4 5.6.7.8 2\r\n"]; + } + + #[DataProvider('malformedV1Provider')] + public function testDecodeV1MalformedThrows(string $header): void + { + $this->expectException(DecodingException::class); + ProxyProtocol::decode($header); + } + + public function testDecodeV1ThrowsWhenTooLong(): void + { + // 120+ bytes with no CRLF — exceeds the 107-byte cap. + $buffer = 'PROXY TCP4 ' . str_repeat('a', 120); + $this->expectException(DecodingException::class); + ProxyProtocol::decode($buffer); + } + + public function testDecodeV1ThrowsWhenLineLongerThan107Bytes(): void + { + // CRLF is present, but line length exceeds spec maximum. + $longAddress = str_repeat('a', 100); + $header = "PROXY TCP4 {$longAddress} 5.6.7.8 1 2\r\n"; + $this->assertGreaterThan(ProxyProtocol::V1_MAX_LENGTH, strlen($header)); + + $this->expectException(DecodingException::class); + ProxyProtocol::decode($header); + } + + // --------------------------------------------------------------------- + // decode() — v2 + // --------------------------------------------------------------------- + + private static function buildV2(int $verCmd, int $famTrans, string $payload): string + { + return ProxyProtocol::V2_SIGNATURE + . chr($verCmd) + . chr($famTrans) + . pack('n', strlen($payload)) + . $payload; + } + + public function testDecodeV2Inet4Tcp(): void + { + $payload = inet_pton('192.168.1.1') . inet_pton('10.0.0.1') . pack('nn', 56324, 443); + $header = self::buildV2(0x21, 0x11, $payload); + + $result = ProxyProtocol::decode($header); + + $this->assertNotNull($result); + $this->assertSame(ProxyProtocol::VERSION_2, $result->version); + $this->assertFalse($result->isLocal); + $this->assertSame(ProxyProtocol::FAMILY_TCP4, $result->family); + $this->assertSame('192.168.1.1', $result->sourceAddress); + $this->assertSame('10.0.0.1', $result->destinationAddress); + $this->assertSame(56324, $result->sourcePort); + $this->assertSame(443, $result->destinationPort); + $this->assertSame(strlen($header), $result->bytesConsumed); + } + + public function testDecodeV2Inet4Udp(): void + { + $payload = inet_pton('127.0.0.1') . inet_pton('127.0.0.2') . pack('nn', 0, 65535); + $header = self::buildV2(0x21, 0x12, $payload); + + $result = ProxyProtocol::decode($header); + + $this->assertNotNull($result); + $this->assertSame(ProxyProtocol::FAMILY_UDP4, $result->family); + $this->assertSame(0, $result->sourcePort); + $this->assertSame(65535, $result->destinationPort); + } + + public function testDecodeV2Inet6Tcp(): void + { + $payload = inet_pton('fe80::1') . inet_pton('fe80::2') . pack('nn', 11111, 22222); + $header = self::buildV2(0x21, 0x21, $payload); + + $result = ProxyProtocol::decode($header); + + $this->assertNotNull($result); + $this->assertSame(ProxyProtocol::FAMILY_TCP6, $result->family); + $this->assertSame('fe80::1', $result->sourceAddress); + $this->assertSame('fe80::2', $result->destinationAddress); + } + + public function testDecodeV2Inet6Udp(): void + { + $payload = inet_pton('2001:db8::1') . inet_pton('2001:db8::2') . pack('nn', 53, 5353); + $header = self::buildV2(0x21, 0x22, $payload); + + $result = ProxyProtocol::decode($header); + + $this->assertNotNull($result); + $this->assertSame(ProxyProtocol::FAMILY_UDP6, $result->family); + $this->assertSame('2001:db8::1', $result->sourceAddress); + $this->assertSame('2001:db8::2', $result->destinationAddress); + } + + public function testDecodeV2Local(): void + { + // LOCAL commands may carry any payload (health checks). We still + // honour payload length, skip the body and return bytesConsumed. + $payload = str_repeat("\x00", 12); + $header = self::buildV2(0x20, 0x00, $payload); + + $result = ProxyProtocol::decode($header); + + $this->assertNotNull($result); + $this->assertTrue($result->isLocal); + $this->assertNull($result->sourceAddress); + $this->assertSame(strlen($header), $result->bytesConsumed); + } + + public function testDecodeV2LocalEmptyPayload(): void + { + $header = self::buildV2(0x20, 0x00, ''); + $result = ProxyProtocol::decode($header); + + $this->assertNotNull($result); + $this->assertTrue($result->isLocal); + $this->assertSame(ProxyProtocol::V2_HEADER_LENGTH, $result->bytesConsumed); + } + + public function testDecodeV2Unix(): void + { + $srcPath = str_pad('/var/run/src.sock', 108, "\x00"); + $dstPath = str_pad('/var/run/dst.sock', 108, "\x00"); + $payload = $srcPath . $dstPath; + $header = self::buildV2(0x21, 0x31, $payload); + + $result = ProxyProtocol::decode($header); + + $this->assertNotNull($result); + $this->assertSame(ProxyProtocol::FAMILY_UNIX, $result->family); + $this->assertSame('/var/run/src.sock', $result->sourceAddress); + $this->assertSame('/var/run/dst.sock', $result->destinationAddress); + $this->assertNull($result->sourcePort); + $this->assertNull($result->destinationPort); + } + + public function testDecodeV2UnixEmptyPaths(): void + { + $payload = str_repeat("\x00", 216); + $header = self::buildV2(0x21, 0x31, $payload); + + $result = ProxyProtocol::decode($header); + + $this->assertNotNull($result); + $this->assertNull($result->sourceAddress); + $this->assertNull($result->destinationAddress); + } + + /** @return iterable */ + public static function v2UnknownFamilyTransportProvider(): iterable + { + yield 'UNSPEC family, STREAM transport' => [0x01]; + yield 'INET family, UNSPEC transport' => [0x10]; + yield 'INET family, reserved transport 3' => [0x13]; + yield 'reserved family 4, UNSPEC transport' => [0x40]; + yield 'reserved family 0xF, DGRAM transport' => [0xF2]; + } + + #[DataProvider('v2UnknownFamilyTransportProvider')] + public function testDecodeV2UnknownFamilyOrTransportIsOpaque(int $famTrans): void + { + $payload = str_repeat("\x00", 12); + $header = self::buildV2(0x21, $famTrans, $payload); + + $result = ProxyProtocol::decode($header); + + $this->assertNotNull($result); + $this->assertSame(ProxyProtocol::FAMILY_UNKNOWN, $result->family); + $this->assertFalse($result->isLocal); + $this->assertSame(strlen($header), $result->bytesConsumed); + } + + public function testDecodeV2WithTlvSuffixRespectsLength(): void + { + $addrPayload = inet_pton('192.168.1.1') . inet_pton('10.0.0.1') . pack('nn', 1, 2); + $tlv = "\x03\x00\x04ABCD"; + $payload = $addrPayload . $tlv; + $header = self::buildV2(0x21, 0x11, $payload); + + $result = ProxyProtocol::decode($header); + + $this->assertNotNull($result); + $this->assertSame('192.168.1.1', $result->sourceAddress); + $this->assertSame(strlen($header), $result->bytesConsumed); + } + + /** @return iterable */ + public static function v2IncompleteProvider(): iterable + { + $payload = inet_pton('1.2.3.4') . inet_pton('5.6.7.8') . pack('nn', 1, 2); + $full = ProxyProtocol::V2_SIGNATURE . chr(0x21) . chr(0x11) . pack('n', strlen($payload)) . $payload; + + yield 'header only, missing length bytes' => [substr($full, 0, 14)]; + yield 'header + half length' => [substr($full, 0, 15)]; + yield 'truncated address payload' => [substr($full, 0, strlen($full) - 1)]; + yield 'signature only' => [ProxyProtocol::V2_SIGNATURE]; + yield 'missing final byte' => [substr($full, 0, strlen($full) - 1)]; + } + + #[DataProvider('v2IncompleteProvider')] + public function testDecodeV2ReturnsNullWhenIncomplete(string $buffer): void + { + $this->assertNull(ProxyProtocol::decode($buffer)); + } + + public function testDecodeV2InvalidVersionThrows(): void + { + $header = self::buildV2(0x31, 0x11, ''); + $this->expectException(DecodingException::class); + ProxyProtocol::decode($header); + } + + public function testDecodeV2InvalidCommandThrows(): void + { + // Version 2 (high nibble 0x2), command 0x5 (invalid). + $header = self::buildV2(0x25, 0x11, ''); + $this->expectException(DecodingException::class); + ProxyProtocol::decode($header); + } + + public function testDecodeV2InetPayloadTooShortThrows(): void + { + // Declared INET+STREAM but only 4 bytes of payload — not enough. + $header = self::buildV2(0x21, 0x11, str_repeat("\x00", 4)); + $this->expectException(DecodingException::class); + ProxyProtocol::decode($header); + } + + public function testDecodeV2UnixPayloadTooShortThrows(): void + { + $header = self::buildV2(0x21, 0x31, str_repeat("\x00", 100)); + $this->expectException(DecodingException::class); + ProxyProtocol::decode($header); + } + + public function testDecodeRejectsBufferNotStartingWithProxySignature(): void + { + $this->expectException(DecodingException::class); + ProxyProtocol::decode("HELLO WORLD\r\n"); + } + + public function testDecodeReturnsNullForEmptyBuffer(): void + { + $this->assertNull(ProxyProtocol::decode('')); + } + + // --------------------------------------------------------------------- + // Streaming / chunked reads + // --------------------------------------------------------------------- + + public function testDecodeV1StreamingProducesNullUntilCrlfSeen(): void + { + $full = "PROXY TCP4 1.2.3.4 5.6.7.8 1 2\r\n"; + $accumulator = ''; + + for ($i = 0; $i < strlen($full) - 1; $i++) { + $accumulator .= $full[$i]; + $this->assertNull( + ProxyProtocol::decode($accumulator), + "Expected null at byte {$i}" + ); + } + + $accumulator .= $full[strlen($full) - 1]; + $this->assertNotNull(ProxyProtocol::decode($accumulator)); + } + + public function testDecodeV2StreamingProducesNullUntilTotalLength(): void + { + $payload = inet_pton('1.2.3.4') . inet_pton('5.6.7.8') . pack('nn', 1, 2); + $full = self::buildV2(0x21, 0x11, $payload); + + for ($i = 1; $i < strlen($full); $i++) { + $this->assertNull( + ProxyProtocol::decode(substr($full, 0, $i)), + "Expected null at partial length {$i}" + ); + } + + $this->assertNotNull(ProxyProtocol::decode($full)); + } + + // --------------------------------------------------------------------- + // Fuzz: random inputs never crash; they either decode, return null, or throw. + // --------------------------------------------------------------------- + + public function testFuzzRandomBuffersDoNotCrash(): void + { + for ($i = 0; $i < 200; $i++) { + $length = random_int(0, 128); + $buffer = $length > 0 ? random_bytes($length) : ''; + + try { + ProxyProtocol::decode($buffer); + } catch (DecodingException) { + // Expected for malformed input. + } + + // Reaching this point means the parser did not crash on the buffer. + $this->addToAssertionCount(1); + } + } + + public function testFuzzRandomV2LikeBuffersDoNotCrash(): void + { + for ($i = 0; $i < 100; $i++) { + $addrLength = random_int(0, 64); + $payload = $addrLength > 0 ? random_bytes($addrLength) : ''; + $verCmd = random_int(0, 255); + $famTrans = random_int(0, 255); + $buffer = self::buildV2($verCmd, $famTrans, $payload); + + try { + ProxyProtocol::decode($buffer); + } catch (DecodingException) { + // Expected for malformed input. + } + + // Reaching this point means the parser did not crash on the buffer. + $this->addToAssertionCount(1); + } + } + + // --------------------------------------------------------------------- + // Value object invariants + // --------------------------------------------------------------------- + + public function testProxyProtocolIsReadonly(): void + { + $instance = new ProxyProtocol( + version: ProxyProtocol::VERSION_1, + isLocal: false, + family: ProxyProtocol::FAMILY_TCP4, + sourceAddress: '1.2.3.4', + sourcePort: 80, + destinationAddress: '5.6.7.8', + destinationPort: 443, + bytesConsumed: 40, + ); + + $this->expectException(\Error::class); + /** @phpstan-ignore-next-line intentionally asserting readonly enforcement */ + $instance->version = ProxyProtocol::VERSION_2; + } +} diff --git a/tests/unit/DNS/ServerTest.php b/tests/unit/DNS/ServerTest.php new file mode 100644 index 0000000..5232fb7 --- /dev/null +++ b/tests/unit/DNS/ServerTest.php @@ -0,0 +1,203 @@ +start(); + + $query = $this->buildQuery('example.com'); + $response = $adapter->deliver($query, '203.0.113.9', 9876); + + $decoded = Message::decode($response); + $this->assertCount(1, $decoded->answers); + $this->assertSame('example.com', $decoded->answers[0]->name); + } + + public function testPeerAddressIsForwardedToResolverHook(): void + { + $adapter = new FakeAdapter(); + $resolver = new EchoResolver(); + $server = new RecordingServer($adapter, $resolver); + $server->start(); + + $query = $this->buildQuery('example.com'); + $adapter->deliver($query, '203.0.113.9', 9876); + + $this->assertSame('203.0.113.9', $server->lastIp); + $this->assertSame(9876, $server->lastPort); + } + + public function testMalformedMessageReturnsFormerr(): void + { + $adapter = new FakeAdapter(); + $resolver = new EchoResolver(); + $server = new Server($adapter, $resolver); + $errors = []; + $server->error(function (\Throwable $e) use (&$errors) { + $errors[] = $e; + }); + $server->start(); + + // Minimal valid header (12 bytes) but with question count > 0 and no question section. + $malformed = "\x12\x34\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00"; + $response = $adapter->deliver($malformed, '127.0.0.1', 1234); + + $decoded = Message::decode($response); + $this->assertSame(Message::RCODE_FORMERR, $decoded->header->responseCode); + } + + public function testNonQueryOpcodeReturnsNotimp(): void + { + $adapter = new FakeAdapter(); + $resolver = new EchoResolver(); + $server = new Server($adapter, $resolver); + $server->start(); + + // Opcode 2 (STATUS) in flags field. + $packet = "\x12\x34\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00"; + $response = $adapter->deliver($packet, '127.0.0.1', 1234); + + $decoded = Message::decode($response); + $this->assertSame(Message::RCODE_NOTIMP, $decoded->header->responseCode); + } + + public function testResolverExceptionReturnsServfail(): void + { + $adapter = new FakeAdapter(); + $resolver = new ThrowingResolver(); + $server = new Server($adapter, $resolver); + $errors = []; + $server->error(function (\Throwable $e) use (&$errors) { + $errors[] = $e; + }); + $server->start(); + + $query = $this->buildQuery('example.com'); + $response = $adapter->deliver($query, '127.0.0.1', 1234); + + $decoded = Message::decode($response); + $this->assertSame(Message::RCODE_SERVFAIL, $decoded->header->responseCode); + $this->assertCount(1, $errors); + } + + public function testSetProxyProtocolPropagatesToAdapter(): void + { + $adapter = new FakeAdapter(); + $resolver = new EchoResolver(); + $server = new Server($adapter, $resolver); + + $this->assertFalse($adapter->hasProxyProtocol()); + $server->setProxyProtocol(true); + $this->assertTrue($adapter->hasProxyProtocol()); + $server->setProxyProtocol(false); + $this->assertFalse($adapter->hasProxyProtocol()); + } + + private function buildQuery(string $name): string + { + return Message::query(new Question($name, Record::TYPE_A))->encode(); + } +} + +class RecordingServer extends Server +{ + public ?string $lastIp = null; + + public ?int $lastPort = null; + + protected function onMessage(string $buffer, string $ip, int $port, ?int $maxResponseSize = null): string + { + $this->lastIp = $ip; + $this->lastPort = $port; + return parent::onMessage($buffer, $ip, $port, $maxResponseSize); + } +} + +final class EchoResolver implements Resolver +{ + public function getName(): string + { + return 'echo'; + } + + public function resolve(Message $query): Message + { + return Message::response( + header: $query->header, + responseCode: Message::RCODE_NOERROR, + questions: $query->questions, + answers: [ + new Record( + name: $query->questions[0]->name, + type: Record::TYPE_A, + rdata: '127.0.0.1', + ttl: 60, + ), + ], + authoritative: true, + ); + } +} + +final class ThrowingResolver implements Resolver +{ + public function getName(): string + { + return 'throwing'; + } + + public function resolve(Message $query): Message + { + throw new \RuntimeException('resolver failed'); + } +} + +/** + * Minimal transport-neutral adapter for Server unit tests: call + * {@see deliver()} to simulate a message arriving on the wire and + * receive the response the Server would send back. + */ +final class FakeAdapter extends Adapter +{ + /** @var callable(string, string, int, ?int): string */ + private $onMessage; + + public function onWorkerStart(callable $callback): void + { + // Not used in unit tests. + } + + public function onMessage(callable $callback): void + { + $this->onMessage = $callback; + } + + public function start(): void + { + // Tests drive the adapter via deliver(); no loop needed. + } + + public function getName(): string + { + return 'fake'; + } + + public function deliver(string $buffer, string $ip, int $port, ?int $maxResponseSize = 512): string + { + return \call_user_func($this->onMessage, $buffer, $ip, $port, $maxResponseSize); + } +} diff --git a/tests/unit/DNS/TcpMessageStreamTest.php b/tests/unit/DNS/TcpMessageStreamTest.php new file mode 100644 index 0000000..c4f32e8 --- /dev/null +++ b/tests/unit/DNS/TcpMessageStreamTest.php @@ -0,0 +1,178 @@ +feed($frame), preserve_keys: false); + + $this->assertCount(1, $results); + $this->assertSame([$message, '198.51.100.10', 40000], $results[0]); + } + + public function testFramesSplitAcrossChunksAccumulate(): void + { + $stream = new TcpMessageStream('203.0.113.1', 1234); + $message = str_repeat('A', 40); + $frame = pack('n', strlen($message)) . $message; + + $pieces = str_split($frame, 5); + $collected = []; + + foreach ($pieces as $i => $piece) { + foreach ($stream->feed($piece) as $result) { + $collected[] = $result; + } + if ($i < count($pieces) - 1) { + $this->assertSame([], $collected, "should not emit before frame complete (piece {$i})"); + } + } + + $this->assertCount(1, $collected); + $this->assertSame($message, $collected[0][0]); + } + + public function testMultipleFramesInSingleFeedEmitMultipleMessages(): void + { + $stream = new TcpMessageStream('1.2.3.4', 53); + $payload = ''; + foreach (['ONE', 'TWO', 'THREE'] as $m) { + $payload .= pack('n', strlen($m)) . $m; + } + + $results = iterator_to_array($stream->feed($payload), preserve_keys: false); + + $this->assertCount(3, $results); + $this->assertSame('ONE', $results[0][0]); + $this->assertSame('TWO', $results[1][0]); + $this->assertSame('THREE', $results[2][0]); + } + + public function testZeroLengthFrameThrows(): void + { + $stream = new TcpMessageStream('1.2.3.4', 53); + + $this->expectException(MessageDecodingException::class); + foreach ($stream->feed(pack('n', 0)) as $_) { + // consume generator + } + } + + public function testOversizeFrameThrows(): void + { + $stream = new TcpMessageStream('1.2.3.4', 53); + + $this->expectException(MessageDecodingException::class); + foreach ($stream->feed(pack('n', TcpMessageStream::MAX_MESSAGE_SIZE + 1)) as $_) { + } + } + + public function testBufferOverflowThrows(): void + { + $stream = new TcpMessageStream('1.2.3.4', 53, enableProxyProtocol: true); + + // PROXY preamble is pending (not resolved), so no framing runs. Feed + // more bytes than the buffer cap — should reject. + $this->expectException(MessageDecodingException::class); + foreach ($stream->feed(str_repeat('x', TcpMessageStream::MAX_BUFFER_SIZE + 1)) as $_) { + } + } + + public function testProxyV1PreambleUpdatesPeerAddress(): void + { + $stream = new TcpMessageStream('10.0.0.254', 443, enableProxyProtocol: true); + + $preamble = "PROXY TCP4 192.0.2.10 10.0.0.1 55000 443\r\n"; + $message = 'DNS-PAYLOAD'; + $frame = pack('n', strlen($message)) . $message; + + $results = iterator_to_array($stream->feed($preamble . $frame), preserve_keys: false); + + $this->assertCount(1, $results); + $this->assertSame($message, $results[0][0]); + $this->assertSame('192.0.2.10', $results[0][1]); + $this->assertSame(55000, $results[0][2]); + } + + public function testProxyV2PreambleUpdatesPeerAddress(): void + { + $stream = new TcpMessageStream('10.0.0.254', 0, enableProxyProtocol: true); + + $addrPayload = inet_pton('198.51.100.5') . inet_pton('10.0.0.1') . pack('nn', 11000, 53); + $preamble = ProxyProtocol::V2_SIGNATURE + . chr(0x21) + . chr(0x11) + . pack('n', strlen($addrPayload)) + . $addrPayload; + + $message = 'DNS-MESSAGE'; + $frame = pack('n', strlen($message)) . $message; + + $results = iterator_to_array($stream->feed($preamble . $frame), preserve_keys: false); + + $this->assertCount(1, $results); + $this->assertSame('198.51.100.5', $results[0][1]); + } + + public function testDirectConnectionWorksWhenProxyEnabled(): void + { + $stream = new TcpMessageStream('203.0.113.20', 4000, enableProxyProtocol: true); + + $message = 'DNS-PAYLOAD'; + $frame = pack('n', strlen($message)) . $message; + + $results = iterator_to_array($stream->feed($frame), preserve_keys: false); + + $this->assertCount(1, $results); + $this->assertSame('203.0.113.20', $results[0][1]); + } + + public function testMalformedProxyPreambleThrows(): void + { + $stream = new TcpMessageStream('10.0.0.254', 0, enableProxyProtocol: true); + + $this->expectException(ProxyDecodingException::class); + foreach ($stream->feed("PROXY TCP4 bogus 10.0.0.1 1 2\r\nrest") as $_) { + } + } + + public function testPeerAddressAccessors(): void + { + $stream = new TcpMessageStream('1.2.3.4', 5678); + + $this->assertSame('1.2.3.4', $stream->peerIp()); + $this->assertSame(5678, $stream->peerPort()); + } + + public function testPartialPreambleKeepsBufferForNextCall(): void + { + $stream = new TcpMessageStream('10.0.0.254', 0, enableProxyProtocol: true); + + $preamble = "PROXY TCP4 192.0.2.10 10.0.0.1 55000 443\r\n"; + $message = 'DNS'; + $frame = pack('n', strlen($message)) . $message; + + $pieces = str_split($preamble . $frame, 4); + $collected = []; + foreach ($pieces as $piece) { + foreach ($stream->feed($piece) as $result) { + $collected[] = $result; + } + } + + $this->assertCount(1, $collected); + $this->assertSame('192.0.2.10', $collected[0][1]); + } +}