Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
}
1 change: 1 addition & 0 deletions .github/workflows/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ PostCommit Jobs run in a schedule against master branch and generally do not get
| [ PostCommit Java ValidatesRunner Dataflow Streaming Engine ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.yml) | N/A |`beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.json`| [![.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.yml?query=event%3Aschedule) |
| [ PostCommit Python Portable Flink ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Python_Portable_Flink.yml) | N/A |`beam_PostCommit_Python_Portable_Flink.json`| [![.github/workflows/beam_PostCommit_Python_Portable_Flink.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Python_Portable_Flink.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Python_Portable_Flink.yml?query=event%3Aschedule) |
| [ PostCommit Python Xlang IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Python_Xlang_IO_Direct.yml) | N/A |`beam_PostCommit_Python_Xlang_IO_Direct.json`| [![.github/workflows/beam_PostCommit_Python_Xlang_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Python_Xlang_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Python_Xlang_IO_Direct.yml?query=event%3Aschedule) |
| [ PostCommit Python Xlang Messaging Direct ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Python_Xlang_Messaging_Direct.yml) | N/A |`beam_PostCommit_Python_Xlang_Messaging_Direct.json`| [![.github/workflows/beam_PostCommit_Python_Xlang_Messaging_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Python_Xlang_Messaging_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Python_Xlang_Messaging_Direct.yml?query=event%3Aschedule) |

### PerformanceTests and Benchmark Jobs

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: PostCommit Python Xlang Messaging Direct

on:
schedule:
- cron: '45 5/6 * * *'
pull_request_target:
paths: ['release/trigger_all_tests.json', '.github/trigger_files/beam_PostCommit_Python_Xlang_Messaging_Direct.json']
workflow_dispatch:

#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
permissions:
actions: write
pull-requests: write
checks: write
contents: read
deployments: read
id-token: none
issues: write
discussions: read
packages: read
pages: read
repository-projects: read
security-events: read
statuses: read

# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: '${{ github.workflow }} @ ${{ github.event.pull_request.number || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}'
cancel-in-progress: true

env:
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}

jobs:
beam_PostCommit_Python_Xlang_Messaging_Direct:
if: |
github.event_name == 'workflow_dispatch' ||
github.event_name == 'pull_request_target' ||
(github.event_name == 'schedule' && github.repository == 'apache/beam') ||
github.event.comment.body == 'Run Python_Xlang_Messaging_Direct PostCommit'
runs-on: [self-hosted, ubuntu-24.04, main]
timeout-minutes: 100
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
strategy:
matrix:
job_name: ["beam_PostCommit_Python_Xlang_Messaging_Direct"]
job_phrase: ["Run Python_Xlang_Messaging_Direct PostCommit"]
steps:
- uses: actions/checkout@v6
- name: Setup repository
uses: ./.github/actions/setup-action
with:
comment_phrase: ${{ matrix.job_phrase }}
github_token: ${{ secrets.GITHUB_TOKEN }}
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
- name: Setup environment
uses: ./.github/actions/setup-environment-action
with:
python-version: |
3.10
3.14
- name: run PostCommit Python Xlang Messaging Direct script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:python:test-suites:direct:messagingCrossLanguagePostCommit
arguments: -PuseWheelDistribution
- name: Archive Python Test Results
uses: actions/upload-artifact@v7
if: failure()
with:
name: Python Test Results
path: '**/pytest*.xml'
- name: Publish Python Test Results
uses: EnricoMi/publish-unit-test-result-action@v2
if: always()
with:
commit: '${{ env.prsha || env.GITHUB_SHA }}'
comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }}
files: '**/pytest*.xml'
large_files: true
259 changes: 259 additions & 0 deletions sdks/python/apache_beam/io/external/xlang_mqttio_it_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Integration tests for the cross-language MQTT IO transforms
(ReadFromMqtt / WriteToMqtt), served by the messaging expansion service.

Runs against an MQTT broker (Eclipse Mosquitto) started once per test class
via testcontainers. MqttIO reads are unbounded (streaming), so the end-to-end
read/write test runs on the Prism portable streaming runner -- the legacy
DirectRunner cannot execute an unbounded read (see the
MqttReadSchemaTransformProvider description).
"""

import logging
import threading
import time
import unittest

import pytest

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.typehints.row_type import RowTypeConstraint

# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
from apache_beam.io import ReadFromMqtt
from apache_beam.io import WriteToMqtt
except ImportError:
ReadFromMqtt = None
WriteToMqtt = None

try:
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs
except ImportError:
DockerContainer = None

NUM_RECORDS = 3
BYTES_ROW = RowTypeConstraint.from_fields([('bytes', bytes)])


@pytest.mark.uses_messaging_java_expansion_service
@unittest.skipIf(
DockerContainer is None, 'testcontainers package is not installed')
@unittest.skipIf(
ReadFromMqtt is None or WriteToMqtt is None,
'MQTT cross-language wrappers are not generated')
@unittest.skipIf(
TestPipeline().get_pipeline_options().view_as(StandardOptions).runner
is None,
'Do not run this test on precommit suites.')
@unittest.skipIf(
'Dataflow' in (
TestPipeline().get_pipeline_options().view_as(StandardOptions).runner or
''),
'The testcontainers broker is not reachable from Dataflow workers; '
'a Dataflow variant would need a remotely hosted MQTT broker.')
class CrossLanguageMqttIOTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
# The broker is expensive to spin up and tear down, so start a single
# shared instance for the whole class; each test uses its own topic(s).
cls.start_mqtt_container(retries=3)
host = cls.broker.get_container_host_ip()
port = cls.broker.get_exposed_port(1883)
cls.server_uri = 'tcp://%s:%s' % (host, port)

@classmethod
def tearDownClass(cls):
# Sometimes stopping the container raises ReadTimeout. We can ignore it
# here to avoid the test failure.
try:
cls.broker.stop()
except Exception:
logging.error('Could not stop the MQTT broker container.')

# Creating a container with testcontainers sometimes raises ReadTimeout
# error, so retry a couple of times.
@classmethod
def start_mqtt_container(cls, retries):
for i in range(retries):
try:
# /mosquitto-no-auth.conf ships with the image and enables an
# anonymous listener on port 1883.
cls.broker = DockerContainer('eclipse-mosquitto:2').with_command(
'mosquitto -c /mosquitto-no-auth.conf').with_exposed_ports(1883)
cls.broker.start()
wait_for_logs(cls.broker, 'mosquitto version .* running', timeout=30)
break
except Exception as e:
# If start() succeeded but a later step (e.g. wait_for_logs) failed,
# stop the partially started container so the next retry / the raised
# error does not leak a running Docker container.
try:
cls.broker.stop()
except Exception:
pass
if i == retries - 1:
logging.error('Unable to initialize the MQTT broker container.')
raise e
Comment thread
tkaymak marked this conversation as resolved.

def _connection_configuration(self, topic, client_id):
return {
'server_uri': self.server_uri, 'topic': topic, 'client_id': client_id
}

def test_xlang_mqtt_write(self):
topic = 'xlang-mqtt-write-topic'
expected_payloads = [b'msg-%d' % i for i in range(NUM_RECORDS)]
subscriber_result = {}

def subscribe():
# mosquitto_sub exits after receiving NUM_RECORDS messages (-C) or
# after the timeout (-W), printing one payload per line.
container = self.broker.get_wrapped_container()
exit_code, output = container.exec_run([
'mosquitto_sub',
'-t',
topic,
'-q',
'1',
'-C',
str(NUM_RECORDS),
'-W',
'120'
])
subscriber_result['exit_code'] = exit_code
subscriber_result['output'] = output

subscriber = threading.Thread(target=subscribe, daemon=True)
subscriber.start()
# Give the subscriber time to connect before publishing.
time.sleep(5)

with TestPipeline() as p:
p.not_use_test_runner_api = True
_ = (
p
| 'CreatePayloads' >> beam.Create(expected_payloads)
| 'ToRow' >> beam.Map(lambda payload: beam.Row(bytes=payload)).
with_output_types(BYTES_ROW)
| 'WriteToMqtt' >> WriteToMqtt(
connection_configuration=self._connection_configuration(
topic, 'xlang-mqtt-write')))

subscriber.join(timeout=150)
self.assertEqual(subscriber_result.get('exit_code'), 0)
received = sorted(subscriber_result.get('output', b'').split())
self.assertEqual(sorted(expected_payloads), received)

def test_xlang_mqtt_read_write_streaming(self):
"""Exercises ReadFromMqtt and WriteToMqtt end to end on the Prism portable
streaming runner. MqttIO read is unbounded, which the legacy DirectRunner
cannot execute, so this is the single read test: an unbounded ReadFromMqtt
on a source topic feeds a WriteToMqtt on a sink topic, the result is
observed with a mosquitto_sub subscriber on the sink topic, and the
(never-terminating) pipeline is then cancelled.

MQTT does not retain regular messages, so the reader must already be
subscribed when messages are published -- a Kafka-style sequential
write-then-read would read nothing. A background publisher therefore feeds
the source topic continuously while the streaming pipeline runs.
"""
source_topic = 'xlang-mqtt-streaming-source'
sink_topic = 'xlang-mqtt-streaming-sink'
stop_publishing = threading.Event()
subscriber_result = {}

def publish_loop():
container = self.broker.get_wrapped_container()
i = 0
while not stop_publishing.is_set():
container.exec_run([
'mosquitto_pub', '-t', source_topic, '-m', 'msg-%d' % i, '-q', '1'
])
i += 1
time.sleep(0.5)

def subscribe():
container = self.broker.get_wrapped_container()
exit_code, output = container.exec_run([
'mosquitto_sub',
'-t',
sink_topic,
'-q',
'1',
'-C',
str(NUM_RECORDS),
'-W',
'180'
])
subscriber_result['exit_code'] = exit_code
subscriber_result['output'] = output

publisher = threading.Thread(target=publish_loop, daemon=True)
subscriber = threading.Thread(target=subscribe, daemon=True)
publisher.start()
subscriber.start()

options = PipelineOptions([

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests should use TestPipeline. I'm not sure how the coding agent ends up with two read tests, while one test is sufficient (mqttio read is a streaming IO).

If the typical test doesn't work it's some issue in that test, and should be fixed there, do not create a new one.

If we follow other xlang tests, a single test run two pipelines, first write then read, and check for results.

@tkaymak tkaymak Jun 22, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, it took me some time to get back to this @Abacn
I removed the duplicate read test and there is now a single read test (test_xlang_mqtt_read_write_streaming), and it uses TestPipeline. Thank your for the catch and sorry, I don't know why I did not see this in the first place.

One constraint worth flagging on the "single test runs two pipelines, first write then read" pattern: unlike Kafka, MQTT does not retain regular messages, so a sequential write-then-read reads nothing.
By the time the read pipeline subscribes, the published messages are already gone. The reader has to be subscribed before anything is published. Since MqttIO read is also unbounded (streaming), the legacy DirectRunner cannot run it at all, at least this is my understanding.

So the read test is structured as an end-to-end streaming pipeline on Prism (ReadFromMqtt(source) -> WriteToMqtt(sink)), with a background publisher feeding the source while the pipeline runs and a mosquitto_sub subscriber verifying the sink. That exercises both transforms and checks results in one test, while respecting the no-retention semantics. The standalone test_xlang_mqtt_write (bounded Create -> WriteToMqtt, verified by a subscriber) is kept since it gives unambiguous, fast write-only coverage on the default runner. I you want I can fold it in if you would prefer strictly one test.

'--runner=PrismRunner',
'--environment_type=LOOPBACK',
'--streaming',
])
p = TestPipeline(options=options)
p.not_use_test_runner_api = True
_ = (
p
| 'ReadFromMqtt' >> ReadFromMqtt(
connection_configuration=self._connection_configuration(
source_topic, 'xlang-mqtt-streaming-read'))
| 'Passthrough' >> beam.Map(
lambda row: beam.Row(bytes=row.bytes)).with_output_types(BYTES_ROW)
| 'WriteToMqtt' >> WriteToMqtt(
connection_configuration=self._connection_configuration(
sink_topic, 'xlang-mqtt-streaming-write')))
result = p.run()
try:
# The subscriber exits once NUM_RECORDS messages flowed through the
# streaming pipeline (or fails the assertions below on its timeout).
subscriber.join(timeout=200)
finally:
stop_publishing.set()
publisher.join()
try:
result.cancel()
except Exception: # pylint: disable=broad-except
# The unbounded pipeline never finishes on its own; cancellation
# after the assertion data was collected is best-effort.
logging.warning('Ignoring error while cancelling the pipeline.')

self.assertEqual(subscriber_result.get('exit_code'), 0)
payloads = subscriber_result.get('output', b'').split()
self.assertEqual(NUM_RECORDS, len(payloads))
for payload in payloads:
self.assertTrue(
payload.startswith(b'msg-'), 'Unexpected payload: %s' % payload)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
1 change: 1 addition & 0 deletions sdks/python/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ markers =
uses_java_expansion_service: collect Cross Language Java transforms test runs
uses_python_expansion_service: collect Cross Language Python transforms test runs
uses_io_java_expansion_service: collect Cross Language IO Java transform test runs (with Kafka bootstrap server)
uses_messaging_java_expansion_service: collect Cross Language Messaging IO Java transform test runs
xlang_wrapper_generation: collect tests that validate Cross Language wrapper generation
uses_transform_service: collect Cross Language test runs that uses the Transform Service
xlang_sql_expansion_service: collect for Cross Language with SQL expansion service test runs
Expand Down
6 changes: 6 additions & 0 deletions sdks/python/test-suites/direct/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ task ioCrossLanguagePostCommit {
}
}

task messagingCrossLanguagePostCommit {
getVersionsAsList('cross_language_validates_py_versions').each {
dependsOn.add(":sdks:python:test-suites:direct:py${getVersionSuffix(it)}:messagingCrossLanguagePythonUsingJava")
}
}

task crossLanguageWrapperValidationPreCommit {
// Different python versions may output types that look different and lead to
// false failures. To be consistent, we test on the lowest version only
Expand Down
Loading
Loading