diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index c4fb562..b17caea 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -95,7 +95,7 @@ jobs: - name: Install dependencies for Linux if: matrix.os == 'ubuntu-24.04' - run: sudo apt-get update && sudo apt-get install -y llvm-18-dev libclang-18-dev protobuf-compiler libsasl2-dev librabbitmq-dev + run: sudo apt-get update && sudo apt-get install -y llvm-18-dev libclang-18-dev protobuf-compiler libsasl2-dev librabbitmq-dev librdkafka-dev - name: Install protobuf for Macos if: matrix.os == 'macos-14' @@ -110,7 +110,7 @@ jobs: bcmath, calendar, ctype, dom, exif, gettext, iconv, intl, json, mbstring, mysqli, mysqlnd, opcache, pdo, pdo_mysql, phar, posix, readline, redis, memcached, swoole-${{ matrix.flag.swoole_version }}, xml, xmlreader, xmlwriter, - yaml, zip, mongodb, memcache, amqp + yaml, zip, mongodb, memcache, amqp, rdkafka - name: Setup php-fpm for Linux if: matrix.os == 'ubuntu-24.04' diff --git a/Cargo.lock b/Cargo.lock index cb6a6f8..3243c03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1285,7 +1285,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ "cfg-if", - "windows-targets 0.53.2", + "windows-targets 0.52.6", ] [[package]] @@ -1695,9 +1695,9 @@ dependencies = [ [[package]] name = "phper" -version = "0.17.5" +version = "0.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a6ded58ef5936b049dcf4b73307889a21b49b9ebd51069f0a554c75275064a7" +checksum = "2f1a07e8df098947d7c0eab4b1e57a0f1689253402b4edd4e3feea509a070858" dependencies = [ "cfg-if", "derive_more", diff --git a/Cargo.toml b/Cargo.toml index 6c646e7..6ff839d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,7 +60,7 @@ futures-util = "0.3.31" hostname = "0.4.1" libc = "0.2.174" once_cell = "1.21.3" -phper = "0.17.5" +phper = "0.17.3" prost = "0.14.1" rdkafka = { workspace = true, optional = true } skywalking = { version = "0.10.0", features = ["management"] } diff --git a/README.md b/README.md index 7fb66bb..c86f56e 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ SkyWalking PHP Agent requires SkyWalking 8.4+ and PHP 7.2+ * [x] [Memcached](https://www.php.net/manual/en/book.memcached.php) * [x] [phpredis](https://github.com/phpredis/phpredis) * [x] [php-amqp](https://github.com/php-amqp/php-amqp) for Message Queuing Producer - * [ ] [php-rdkafka](https://github.com/arnaud-lb/php-rdkafka) + * [x] [php-rdkafka](https://github.com/php-rdkafka/php-rdkafka) for Message Queuing Producer * [x] [predis](https://github.com/predis/predis) * [x] [php-amqplib](https://github.com/php-amqplib/php-amqplib) for Message Queuing Producer * [x] [MongoDB](https://www.php.net/manual/en/set.mongodb.php) diff --git a/dist-material/LICENSE b/dist-material/LICENSE index 89cfb69..a20ea78 100644 --- a/dist-material/LICENSE +++ b/dist-material/LICENSE @@ -654,7 +654,7 @@ MulanPSL-2.0 licenses The following components are provided under the MulanPSL-2.0 License. See project link for details. The text of each license is also included in licenses/LICENSE-[project].txt. - https://crates.io/crates/phper/0.17.5 0.17.5 MulanPSL-2.0 + https://crates.io/crates/phper/0.17.3 0.17.3 MulanPSL-2.0 https://crates.io/crates/phper-alloc/0.16.3 0.16.3 MulanPSL-2.0 https://crates.io/crates/phper-build/0.15.6 0.15.6 MulanPSL-2.0 https://crates.io/crates/phper-macros/0.15.3 0.15.3 MulanPSL-2.0 diff --git a/docker-compose.yml b/docker-compose.yml index 8f0caa1..b957c20 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -62,6 +62,29 @@ services: timeout: 5s retries: 10 + kafka: + image: apache/kafka:3.9.2 + ports: + - "9092:9092" + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:9093 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: true + healthcheck: + test: ["CMD", "/opt/kafka/bin/kafka-topics.sh", "--bootstrap-server", "127.0.0.1:9092", "--list"] + interval: 10s + timeout: 5s + retries: 10 + mongo: image: mongo:4.4.10 ports: diff --git a/docs/en/setup/service-agent/php-agent/Supported-list.md b/docs/en/setup/service-agent/php-agent/Supported-list.md index c548229..db5106b 100644 --- a/docs/en/setup/service-agent/php-agent/Supported-list.md +++ b/docs/en/setup/service-agent/php-agent/Supported-list.md @@ -16,6 +16,8 @@ The following plugins provide the distributed tracing capability. * [phpredis](https://github.com/phpredis/phpredis) * [MongoDB](https://www.php.net/manual/en/set.mongodb.php) * [Memcache](https://www.php.net/manual/en/book.memcache.php) +* [php-amqp](https://github.com/php-amqp/php-amqp) for Message Queuing Producer +* [php-rdkafka](https://github.com/php-rdkafka/php-rdkafka) for Message Queuing Producer ## Supported PHP library diff --git a/src/component.rs b/src/component.rs index 59adfff..d979e3c 100644 --- a/src/component.rs +++ b/src/component.rs @@ -25,4 +25,5 @@ pub const COMPONENT_PHP_PREDIS_ID: i32 = 8006; pub const COMPONENT_PHP_MEMCACHED_ID: i32 = 20; pub const COMPONENT_PHP_REDIS_ID: i32 = 7; pub const COMPONENT_AMQP_PRODUCER_ID: i32 = 144; +pub const COMPONENT_KAFKA_PRODUCER_ID: i32 = 40; pub const COMPONENT_MONGODB_ID: i32 = 9; diff --git a/src/plugin/mod.rs b/src/plugin/mod.rs index 9f97e22..a2f9139 100644 --- a/src/plugin/mod.rs +++ b/src/plugin/mod.rs @@ -23,6 +23,7 @@ mod plugin_mysqli; mod plugin_pdo; mod plugin_predis; mod plugin_psr3; +mod plugin_rdkafka; mod plugin_redis; mod plugin_swoole; mod style; @@ -51,6 +52,7 @@ static PLUGINS: Lazy>> = Lazy::new(|| { Box::::default(), Box::::default(), Box::::default(), + Box::::default(), Box::::default(), Box::::default(), ]; diff --git a/src/plugin/plugin_amqp.rs b/src/plugin/plugin_amqp.rs index 61cbced..d770175 100644 --- a/src/plugin/plugin_amqp.rs +++ b/src/plugin/plugin_amqp.rs @@ -16,15 +16,11 @@ use super::{Plugin, log_exception}; use crate::{ component::COMPONENT_AMQP_PRODUCER_ID, - context::{RequestContext, SW_HEADER}, + context::RequestContext, execute::{AfterExecuteHook, BeforeExecuteHook, get_this_mut, validate_num_args}, tag::{TAG_MQ_BROKER, TAG_MQ_QUEUE, TAG_MQ_TOPIC}, }; -use phper::{ - arrays::ZArray, - objects::ZObj, - values::{ExecuteData, ZVal}, -}; +use phper::objects::ZObj; use skywalking::{ proto::v3::SpanLayer, trace::span::{HandleSpanObject, Span}, @@ -101,7 +97,8 @@ impl AmqpPlugin { &routing_key, )?; - Self::inject_sw_header(request_id, execute_data, &peer)?; + // TODO: php-amqp extension call parameter injection is + // difficult, will implement later. Ok(Box::new(span)) }), @@ -149,55 +146,4 @@ impl AmqpPlugin { Ok(span) } - - fn inject_sw_header( - request_id: Option, execute_data: &mut ExecuteData, peer: &str, - ) -> crate::Result<()> { - let sw_header = RequestContext::try_get_sw_header(request_id, peer)?; - - let attributes = Self::ensure_attributes(execute_data)?; - let headers = Self::ensure_headers(attributes)?; - headers.insert(SW_HEADER, sw_header); - - Ok(()) - } - - fn ensure_attributes( - execute_data: &mut ExecuteData, - ) -> crate::Result<&mut phper::arrays::ZArr> { - // php-amqp parses publish() as `s|s!l!a/`, so the 4th `headers` - // argument is only visible after the missing optional parameters are - // materialized and `num_args` reflects that. - execute_data.materialize_missing([ - ZVal::from(()), - ZVal::from(()), - ZVal::from(ZArray::new()), - ])?; - - let attributes = execute_data.get_mut_parameter(3); - if attributes.as_z_arr().is_none() { - *attributes = ZArray::new().into(); - } - - Ok(attributes - .as_mut_z_arr() - .ok_or_else(|| anyhow::anyhow!("attributes isn't array"))?) - } - - fn ensure_headers( - attributes: &mut phper::arrays::ZArr, - ) -> crate::Result<&mut phper::arrays::ZArr> { - let has_headers = attributes - .get("headers") - .and_then(|headers| headers.as_z_arr()) - .is_some(); - if !has_headers { - attributes.insert("headers", ZArray::new()); - } - - Ok(attributes - .get_mut("headers") - .and_then(|headers| headers.as_mut_z_arr()) - .ok_or_else(|| anyhow::anyhow!("headers isn't array"))?) - } } diff --git a/src/plugin/plugin_rdkafka.rs b/src/plugin/plugin_rdkafka.rs new file mode 100644 index 0000000..452bb7c --- /dev/null +++ b/src/plugin/plugin_rdkafka.rs @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::{Plugin, log_exception}; +use crate::{ + component::COMPONENT_KAFKA_PRODUCER_ID, + context::RequestContext, + execute::{AfterExecuteHook, BeforeExecuteHook, get_this_mut, validate_num_args}, + tag::{TAG_MQ_BROKER, TAG_MQ_QUEUE, TAG_MQ_TOPIC}, +}; +use dashmap::DashMap; +use once_cell::sync::Lazy; +use phper::{objects::ZObj, sys}; +use skywalking::{ + proto::v3::SpanLayer, + trace::span::{HandleSpanObject, Span}, +}; + +/// Maps Producer object handle -> broker string. +static PRODUCER_BROKERS: Lazy> = Lazy::new(DashMap::new); + +/// Maps Topic object handle -> broker string. +static TOPIC_BROKERS: Lazy> = Lazy::new(DashMap::new); + +/// Maps object handle -> original dtor. +static DTOR_MAP: Lazy> = Lazy::new(DashMap::new); + +#[derive(Default, Clone)] +pub struct RdkafkaPlugin; + +impl Plugin for RdkafkaPlugin { + fn class_names(&self) -> Option<&'static [&'static str]> { + Some(&["RdKafka", "RdKafka\\ProducerTopic"]) + } + + fn function_name_prefix(&self) -> Option<&'static str> { + None + } + + fn hook( + &self, class_name: Option<&str>, function_name: &str, + ) -> Option<(Box, Box)> { + tracing::debug!(?class_name, function_name, "rdkafka plugin hook called"); + match (class_name, function_name) { + (Some("RdKafka"), "addBrokers") => Some(self.hook_add_brokers()), + (Some("RdKafka"), "newTopic") => Some(self.hook_new_topic()), + (Some("RdKafka\\ProducerTopic"), "producev") => Some(self.hook_producev()), + _ => None, + } + } +} + +impl RdkafkaPlugin { + fn hook_add_brokers(&self) -> (Box, Box) { + ( + Box::new(move |_request_id, execute_data| { + validate_num_args(execute_data, 1)?; + + let this = get_this_mut(execute_data)?; + let handle = this.handle(); + hack_dtor(this, Some(producer_dtor)); + + let broker_list = execute_data.get_parameter(0).expect_str()?.to_owned(); + + PRODUCER_BROKERS.insert(handle, broker_list); + + Ok(Box::new(())) + }), + Box::new(|_, _, _, _| Ok(())), + ) + } + + fn hook_new_topic(&self) -> (Box, Box) { + ( + Box::new(move |_request_id, execute_data| { + let this = get_this_mut(execute_data)?; + + Ok(Box::new(this.handle())) + }), + Box::new(move |_, data, _, return_value| { + let producer_handle = *data.downcast::().unwrap(); + + if let Some(topic) = return_value.as_mut_z_obj() { + let class_name = topic.get_class().get_name().to_str()?; + if class_name == "RdKafka\\ProducerTopic" { + hack_dtor(topic, Some(producer_topic_dtor)); + if let Some(broker) = PRODUCER_BROKERS.get(&producer_handle) { + TOPIC_BROKERS.insert(topic.handle(), broker.clone()); + } + } + } + + Ok(()) + }), + ) + } + + fn hook_producev(&self) -> (Box, Box) { + let class_name = "RdKafka\\ProducerTopic".to_owned(); + let function_name = "producev".to_owned(); + + ( + Box::new(move |request_id, execute_data| { + validate_num_args(execute_data, 1)?; + + let this = get_this_mut(execute_data)?; + + let topic_name = this + .call("getName", []) + .ok() + .and_then(|v| { + v.as_z_str() + .and_then(|s| s.to_str().ok()) + .map(ToOwned::to_owned) + }) + .unwrap_or_default(); + + let topic_handle = this.handle(); + let broker = TOPIC_BROKERS + .get(&topic_handle) + .map(|b| b.clone()) + .unwrap_or_default(); + + let span = Self::create_exit_span( + request_id, + &class_name, + &function_name, + &broker, + &topic_name, + )?; + + // TODO: rdkafka extension call parameter injection is + // difficult, will implement later. + + Ok(Box::new(span)) + }), + Box::new(move |_, span, _, _| { + let mut span = span.downcast::().unwrap(); + log_exception(&mut *span); + Ok(()) + }), + ) + } + + fn create_exit_span( + request_id: Option, class_name: &str, function_name: &str, peer: &str, topic: &str, + ) -> crate::Result { + let mut span = RequestContext::try_with_global_ctx(request_id, |ctx| { + Ok(ctx.create_exit_span(&format!("{}->{}", class_name, function_name), peer)) + })?; + + let span_object = span.span_object_mut(); + span_object.set_span_layer(SpanLayer::Mq); + span_object.component_id = COMPONENT_KAFKA_PRODUCER_ID; + span_object.add_tag(TAG_MQ_BROKER, peer); + span_object.add_tag(TAG_MQ_TOPIC, topic); + span_object.add_tag(TAG_MQ_QUEUE, ""); + + Ok(span) + } +} + +fn hack_dtor(this: &mut ZObj, new_dtor: sys::zend_object_dtor_obj_t) { + assert!(new_dtor.is_some(), "new_dtor should not be null"); + + let handle = this.handle(); + + unsafe { + let ori_dtor = (*(*this.as_mut_ptr()).handlers).dtor_obj; + DTOR_MAP.insert(handle, ori_dtor); + (*((*this.as_mut_ptr()).handlers as *mut sys::zend_object_handlers)).dtor_obj = new_dtor; + } +} + +unsafe extern "C" fn producer_dtor(object: *mut sys::zend_object) { + unsafe { + let handle = ZObj::from_ptr(object).handle(); + PRODUCER_BROKERS.remove(&handle); + if let Some((_, Some(dtor))) = DTOR_MAP.remove(&handle) { + dtor(object); + } + } +} + +unsafe extern "C" fn producer_topic_dtor(object: *mut sys::zend_object) { + unsafe { + let handle = ZObj::from_ptr(object).handle(); + TOPIC_BROKERS.remove(&handle); + if let Some((_, Some(dtor))) = DTOR_MAP.remove(&handle) { + dtor(object); + } + } +} diff --git a/tests/data/expected_context.yaml b/tests/data/expected_context.yaml index 4882270..8146658 100644 --- a/tests/data/expected_context.yaml +++ b/tests/data/expected_context.yaml @@ -15,7 +15,7 @@ segmentItems: - serviceName: skywalking-agent-test-1 - segmentSize: 21 + segmentSize: 22 segments: - segmentId: "not null" spans: @@ -2121,6 +2121,98 @@ segmentItems: - { key: url, value: "http://127.0.0.1:9502/memcache" } - { key: http.method, value: GET } - { key: http.status_code, value: "200" } + - segmentId: "not null" + spans: + - operationName: "RdKafka\\ProducerTopic->producev" + parentSpanId: 0 + spanId: 1 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 40 + isError: false + spanType: Exit + peer: "127.0.0.1:9092" + skipAnalysis: false + tags: + - { key: mq.broker, value: "127.0.0.1:9092" } + - { key: mq.topic, value: test_topic } + - { key: mq.queue, value: "" } + - operationName: "RdKafka\\ProducerTopic->producev" + parentSpanId: 0 + spanId: 2 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 40 + isError: false + spanType: Exit + peer: "127.0.0.1:9092" + skipAnalysis: false + tags: + - { key: mq.broker, value: "127.0.0.1:9092" } + - { key: mq.topic, value: test_topic } + - { key: mq.queue, value: "" } + - operationName: "RdKafka\\ProducerTopic->producev" + parentSpanId: 0 + spanId: 3 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 40 + isError: false + spanType: Exit + peer: "127.0.0.1:9092" + skipAnalysis: false + tags: + - { key: mq.broker, value: "127.0.0.1:9092" } + - { key: mq.topic, value: test_topic } + - { key: mq.queue, value: "" } + - operationName: "RdKafka\\ProducerTopic->producev" + parentSpanId: 0 + spanId: 4 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 40 + isError: false + spanType: Exit + peer: "127.0.0.1:9092" + skipAnalysis: false + tags: + - { key: mq.broker, value: "127.0.0.1:9092" } + - { key: mq.topic, value: test_topic } + - { key: mq.queue, value: "" } + - operationName: "RdKafka\\ProducerTopic->producev" + parentSpanId: 0 + spanId: 5 + spanLayer: MQ + startTime: gt 0 + endTime: gt 0 + componentId: 40 + isError: false + spanType: Exit + peer: "127.0.0.1:9092" + skipAnalysis: false + tags: + - { key: mq.broker, value: "127.0.0.1:9092" } + - { key: mq.topic, value: test_topic } + - { key: mq.queue, value: "" } + - operationName: GET:/kafka.php + parentSpanId: -1 + spanId: 0 + spanLayer: Http + startTime: gt 0 + endTime: gt 0 + componentId: 8001 + isError: false + spanType: Entry + peer: "" + skipAnalysis: false + tags: + - { key: url, value: "http://127.0.0.1:9011/kafka.php" } + - { key: http.method, value: GET } + - { key: http.status_code, value: "200" } logItems: - serviceName: skywalking-agent-test-1 diff --git a/tests/e2e.rs b/tests/e2e.rs index 7b325c6..8d8678a 100644 --- a/tests/e2e.rs +++ b/tests/e2e.rs @@ -63,6 +63,7 @@ async fn run_e2e() { request_fpm_redis().await; request_fpm_rabbitmq().await; request_fpm_amqp().await; + request_fpm_kafka().await; request_fpm_mongodb().await; request_fpm_memcache().await; request_fpm_monolog().await; @@ -161,6 +162,14 @@ async fn request_fpm_amqp() { .await; } +async fn request_fpm_kafka() { + request_common( + HTTP_CLIENT.get(format!("http://{}/kafka.php", PROXY_SERVER_1_ADDRESS)), + "ok", + ) + .await; +} + async fn request_fpm_mongodb() { request_common( HTTP_CLIENT.get(format!("http://{}/mongodb.php", PROXY_SERVER_1_ADDRESS)), diff --git a/tests/php/fpm/amqp.php b/tests/php/fpm/amqp.php index 7c30608..7b206e1 100644 --- a/tests/php/fpm/amqp.php +++ b/tests/php/fpm/amqp.php @@ -82,7 +82,6 @@ function expectEnvelope(AMQPQueue $queue, string $body, callable $assertions): v $exchange->publish('One Arg Message'); expectEnvelope($fanoutQueue, 'One Arg Message', static function (AMQPEnvelope $envelope): void { - assertTrue($envelope->hasHeader('sw8'), 'missing sw8 header for one-arg publish'); }); } @@ -91,7 +90,6 @@ function expectEnvelope(AMQPQueue $queue, string $body, callable $assertions): v $exchange->publish('Two Arg Message', 'queue_test'); expectEnvelope($queue, 'Two Arg Message', static function (AMQPEnvelope $envelope): void { - assertTrue($envelope->hasHeader('sw8'), 'missing sw8 header for two-arg publish'); }); } @@ -101,7 +99,6 @@ function expectEnvelope(AMQPQueue $queue, string $body, callable $assertions): v $exchange->publish('Three Arg Message', 'routing_test', AMQP_NOPARAM); expectEnvelope($queue, 'Three Arg Message', static function (AMQPEnvelope $envelope): void { - assertTrue($envelope->hasHeader('sw8'), 'missing sw8 header for three-arg publish'); }); } @@ -110,7 +107,6 @@ function expectEnvelope(AMQPQueue $queue, string $body, callable $assertions): v $exchange->publish('Four Arg Message', 'queue_test', AMQP_NOPARAM, ['headers' => ['foo' => 'bar']]); expectEnvelope($queue, 'Four Arg Message', static function (AMQPEnvelope $envelope): void { - assertTrue($envelope->hasHeader('sw8'), 'missing sw8 header for four-arg publish'); assertTrue($envelope->hasHeader('foo'), 'missing custom header for four-arg publish'); assertSameValue('bar', $envelope->getHeader('foo'), 'custom header should be preserved'); }); @@ -121,7 +117,6 @@ function expectEnvelope(AMQPQueue $queue, string $body, callable $assertions): v $exchange->publish('Default Exchange Message', 'queue_test', AMQP_NOPARAM); expectEnvelope($queue, 'Default Exchange Message', static function (AMQPEnvelope $envelope): void { - assertTrue($envelope->hasHeader('sw8'), 'missing sw8 header for default exchange publish'); }); } diff --git a/tests/php/fpm/kafka.php b/tests/php/fpm/kafka.php new file mode 100644 index 0000000..29a8f3f --- /dev/null +++ b/tests/php/fpm/kafka.php @@ -0,0 +1,110 @@ +set('bootstrap.servers', '127.0.0.1:9092'); +$conf->set('group.id', 'sw_test_' . uniqid()); +$conf->set('auto.offset.reset', 'earliest'); +$conf->set('enable.auto.commit', 'true'); +$consumer = new RdKafka\KafkaConsumer($conf); +$consumer->subscribe(['test_topic']); + +// Drain leftover messages from previous runs. +while (true) { + $msg = $consumer->consume(1000); + if ($msg === null || $msg->err === RD_KAFKA_RESP_ERR__TIMED_OUT) { + break; + } +} + +// Produce messages. +$conf = new RdKafka\Conf(); +$conf->set('bootstrap.servers', '127.0.0.1:9092'); +$producer = new RdKafka\Producer($conf); +$producer->addBrokers('127.0.0.1:9092'); + +$topic = $producer->newTopic('test_topic'); + +$topic->producev(RD_KAFKA_PARTITION_UA, 0); +$topic->producev(RD_KAFKA_PARTITION_UA, 0, 'Message With Payload'); +$topic->producev(RD_KAFKA_PARTITION_UA, 0, 'Message With Key', 'my_key'); +$topic->producev(RD_KAFKA_PARTITION_UA, 0, 'Message With Headers', 'my_key', ['foo' => 'bar']); +$topic->producev(RD_KAFKA_PARTITION_UA, 0, 'Full Message', 'my_key', ['foo' => 'bar'], 1234567890); + +$producer->flush(10000); + +// Consume and verify new messages. +$received = 0; +$maxMessages = 5; +$maxAttempts = 30; + +for ($attempt = 0; $attempt < $maxAttempts && $received < $maxMessages; $attempt++) { + $msg = $consumer->consume(2000); + + if ($msg === null) { + continue; + } + + if ($msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) { + continue; + } + + if ($msg->err === RD_KAFKA_RESP_ERR__TIMED_OUT) { + continue; + } + + if ($msg->err !== RD_KAFKA_RESP_ERR_NO_ERROR) { + throw new RuntimeException(sprintf('consume error: %s', $msg->errstr())); + } + + $msgIndex = $received + 1; + if ($msgIndex === 1) { + assertSameValue(null, $msg->payload, 'message 1 should have no payload'); + } elseif ($msgIndex === 2) { + assertSameValue('Message With Payload', $msg->payload, 'message 2 payload mismatch'); + } elseif ($msgIndex === 3) { + assertSameValue('Message With Key', $msg->payload, 'message 3 payload mismatch'); + } elseif ($msgIndex === 4) { + assertSameValue('Message With Headers', $msg->payload, 'message 4 payload mismatch'); + assertTrue(isset($msg->headers['foo']), 'message 4 missing header foo'); + assertSameValue('bar', $msg->headers['foo'], 'message 4 header foo value mismatch'); + } elseif ($msgIndex === 5) { + assertSameValue('Full Message', $msg->payload, 'message 5 payload mismatch'); + assertTrue(isset($msg->headers['foo']), 'message 5 missing header foo'); + assertSameValue('bar', $msg->headers['foo'], 'message 5 header foo value mismatch'); + } + + $received++; +} + +assertTrue($received === $maxMessages, sprintf('expected %d messages, got %d', $maxMessages, $received)); + +echo "ok";