Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:

- name: Install dependencies for Linux
if: matrix.os == 'ubuntu-24.04'
run: sudo apt-get update && sudo apt-get install -y llvm-18-dev libclang-18-dev protobuf-compiler libsasl2-dev librabbitmq-dev
run: sudo apt-get update && sudo apt-get install -y llvm-18-dev libclang-18-dev protobuf-compiler libsasl2-dev librabbitmq-dev librdkafka-dev

- name: Install protobuf for Macos
if: matrix.os == 'macos-14'
Expand All @@ -110,7 +110,7 @@ jobs:
bcmath, calendar, ctype, dom, exif, gettext, iconv, intl, json, mbstring,
mysqli, mysqlnd, opcache, pdo, pdo_mysql, phar, posix, readline, redis,
memcached, swoole-${{ matrix.flag.swoole_version }}, xml, xmlreader, xmlwriter,
yaml, zip, mongodb, memcache, amqp
yaml, zip, mongodb, memcache, amqp, rdkafka

- name: Setup php-fpm for Linux
if: matrix.os == 'ubuntu-24.04'
Expand Down
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ futures-util = "0.3.31"
hostname = "0.4.1"
libc = "0.2.174"
once_cell = "1.21.3"
phper = "0.17.5"
phper = "0.17.3"
prost = "0.14.1"
rdkafka = { workspace = true, optional = true }
skywalking = { version = "0.10.0", features = ["management"] }
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ SkyWalking PHP Agent requires SkyWalking 8.4+ and PHP 7.2+
* [x] [Memcached](https://www.php.net/manual/en/book.memcached.php)
* [x] [phpredis](https://github.com/phpredis/phpredis)
* [x] [php-amqp](https://github.com/php-amqp/php-amqp) for Message Queuing Producer
* [ ] [php-rdkafka](https://github.com/arnaud-lb/php-rdkafka)
* [x] [php-rdkafka](https://github.com/php-rdkafka/php-rdkafka) for Message Queuing Producer
* [x] [predis](https://github.com/predis/predis)
* [x] [php-amqplib](https://github.com/php-amqplib/php-amqplib) for Message Queuing Producer
* [x] [MongoDB](https://www.php.net/manual/en/set.mongodb.php)
Expand Down
2 changes: 1 addition & 1 deletion dist-material/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ MulanPSL-2.0 licenses
The following components are provided under the MulanPSL-2.0 License. See project link for details.
The text of each license is also included in licenses/LICENSE-[project].txt.

https://crates.io/crates/phper/0.17.5 0.17.5 MulanPSL-2.0
https://crates.io/crates/phper/0.17.3 0.17.3 MulanPSL-2.0
https://crates.io/crates/phper-alloc/0.16.3 0.16.3 MulanPSL-2.0
https://crates.io/crates/phper-build/0.15.6 0.15.6 MulanPSL-2.0
https://crates.io/crates/phper-macros/0.15.3 0.15.3 MulanPSL-2.0
Expand Down
23 changes: 23 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,29 @@ services:
timeout: 5s
retries: 10

kafka:
image: apache/kafka:3.9.2
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
healthcheck:
test: ["CMD", "/opt/kafka/bin/kafka-topics.sh", "--bootstrap-server", "127.0.0.1:9092", "--list"]
interval: 10s
timeout: 5s
retries: 10

mongo:
image: mongo:4.4.10
ports:
Expand Down
2 changes: 2 additions & 0 deletions docs/en/setup/service-agent/php-agent/Supported-list.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ The following plugins provide the distributed tracing capability.
* [phpredis](https://github.com/phpredis/phpredis)
* [MongoDB](https://www.php.net/manual/en/set.mongodb.php)
* [Memcache](https://www.php.net/manual/en/book.memcache.php)
* [php-amqp](https://github.com/php-amqp/php-amqp) for Message Queuing Producer
* [php-rdkafka](https://github.com/php-rdkafka/php-rdkafka) for Message Queuing Producer

## Supported PHP library

Expand Down
1 change: 1 addition & 0 deletions src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ pub const COMPONENT_PHP_PREDIS_ID: i32 = 8006;
pub const COMPONENT_PHP_MEMCACHED_ID: i32 = 20;
pub const COMPONENT_PHP_REDIS_ID: i32 = 7;
pub const COMPONENT_AMQP_PRODUCER_ID: i32 = 144;
pub const COMPONENT_KAFKA_PRODUCER_ID: i32 = 40;
pub const COMPONENT_MONGODB_ID: i32 = 9;
2 changes: 2 additions & 0 deletions src/plugin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod plugin_mysqli;
mod plugin_pdo;
mod plugin_predis;
mod plugin_psr3;
mod plugin_rdkafka;
mod plugin_redis;
mod plugin_swoole;
mod style;
Expand Down Expand Up @@ -51,6 +52,7 @@ static PLUGINS: Lazy<Vec<Box<DynPlugin>>> = Lazy::new(|| {
Box::<plugin_redis::RedisPlugin>::default(),
Box::<plugin_amqp::AmqpPlugin>::default(),
Box::<plugin_amqplib::AmqplibPlugin>::default(),
Box::<plugin_rdkafka::RdkafkaPlugin>::default(),
Box::<plugin_mongodb::MongodbPlugin>::default(),
Box::<plugin_memcache::MemcachePlugin>::default(),
];
Expand Down
62 changes: 4 additions & 58 deletions src/plugin/plugin_amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,11 @@
use super::{Plugin, log_exception};
use crate::{
component::COMPONENT_AMQP_PRODUCER_ID,
context::{RequestContext, SW_HEADER},
context::RequestContext,
execute::{AfterExecuteHook, BeforeExecuteHook, get_this_mut, validate_num_args},
tag::{TAG_MQ_BROKER, TAG_MQ_QUEUE, TAG_MQ_TOPIC},
};
use phper::{
arrays::ZArray,
objects::ZObj,
values::{ExecuteData, ZVal},
};
use phper::objects::ZObj;
use skywalking::{
proto::v3::SpanLayer,
trace::span::{HandleSpanObject, Span},
Expand Down Expand Up @@ -101,7 +97,8 @@ impl AmqpPlugin {
&routing_key,
)?;

Self::inject_sw_header(request_id, execute_data, &peer)?;
// TODO: php-amqp extension call parameter injection is
// difficult, will implement later.

Ok(Box::new(span))
}),
Expand Down Expand Up @@ -149,55 +146,4 @@ impl AmqpPlugin {

Ok(span)
}

fn inject_sw_header(
request_id: Option<i64>, execute_data: &mut ExecuteData, peer: &str,
) -> crate::Result<()> {
let sw_header = RequestContext::try_get_sw_header(request_id, peer)?;

let attributes = Self::ensure_attributes(execute_data)?;
let headers = Self::ensure_headers(attributes)?;
headers.insert(SW_HEADER, sw_header);

Ok(())
}

fn ensure_attributes(
execute_data: &mut ExecuteData,
) -> crate::Result<&mut phper::arrays::ZArr> {
// php-amqp parses publish() as `s|s!l!a/`, so the 4th `headers`
// argument is only visible after the missing optional parameters are
// materialized and `num_args` reflects that.
execute_data.materialize_missing([
ZVal::from(()),
ZVal::from(()),
ZVal::from(ZArray::new()),
])?;

let attributes = execute_data.get_mut_parameter(3);
if attributes.as_z_arr().is_none() {
*attributes = ZArray::new().into();
}

Ok(attributes
.as_mut_z_arr()
.ok_or_else(|| anyhow::anyhow!("attributes isn't array"))?)
}

fn ensure_headers(
attributes: &mut phper::arrays::ZArr,
) -> crate::Result<&mut phper::arrays::ZArr> {
let has_headers = attributes
.get("headers")
.and_then(|headers| headers.as_z_arr())
.is_some();
if !has_headers {
attributes.insert("headers", ZArray::new());
}

Ok(attributes
.get_mut("headers")
.and_then(|headers| headers.as_mut_z_arr())
.ok_or_else(|| anyhow::anyhow!("headers isn't array"))?)
}
}
Loading
Loading