diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java index 6c146f0c8b5..3c7737f0fb9 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java @@ -76,6 +76,9 @@ public Destination createDestination(ConnectionContext context, ActiveMQDestinat if (destination.isQueue()) { if (destination.isTemporary()) { final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination; + if (tempDest.getConnectionId() == null) { + throw new IllegalArgumentException("Temporary queue must have a connectionId"); + } Queue queue = new TempQueue(brokerService, destination, null, destinationStatistics, taskRunnerFactory); configureQueue(queue, destination); queue.initialize(); @@ -88,8 +91,11 @@ public Destination createDestination(ConnectionContext context, ActiveMQDestinat return queue; } } else if (destination.isTemporary()) { - - Topic topic = new Topic(brokerService, destination, null, destinationStatistics, taskRunnerFactory); + final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination; + if (tempDest.getConnectionId() == null) { + throw new IllegalArgumentException("Temporary topic must have a connectionId"); + } + Topic topic = new TempTopic(brokerService, destination, null, destinationStatistics, taskRunnerFactory); configureTopic(topic, destination); topic.initialize(); return topic; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempDestination.java new file mode 100644 index 00000000000..715323ca957 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempDestination.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region; + +public interface TempDestination extends Destination { + + void setAllowTempDestinationStealing(boolean allowTempDestinationStealing); + + boolean isAllowTempDestinationStealing(); +} diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java index 17eeb056472..ebaf331e19f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region; +import jakarta.jms.InvalidDestinationException; import java.io.IOException; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; @@ -33,10 +34,12 @@ * * */ -public class TempQueue extends Queue{ +public class TempQueue extends Queue implements TempDestination{ + private static final Logger LOG = LoggerFactory.getLogger(TempQueue.class); + private final ActiveMQTempDestination tempDest; - + private boolean allowTempDestinationStealing = false; /** * @param brokerService @@ -65,6 +68,11 @@ public void initialize() throws Exception { @Override public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { + final String connectionId = sub.getConsumerInfo().getConsumerId().getConnectionId(); + if (!isAllowTempDestinationStealing() && !tempDest.getConnectionId().equals(connectionId)) { + throw new InvalidDestinationException("Subscribing to a temporary queue created by another connection is not permitted"); + } + // Only consumers on the same connection can consume from // the temporary destination // However, we could have failed over - and we do this @@ -97,4 +105,14 @@ public void dispose(ConnectionContext context) throws IOException { } super.dispose(context); } + + @Override + public boolean isAllowTempDestinationStealing() { + return allowTempDestinationStealing; + } + + @Override + public void setAllowTempDestinationStealing(boolean allowTempDestinationStealing) { + this.allowTempDestinationStealing = allowTempDestinationStealing; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempTopic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempTopic.java index 9bf4658e93e..85cb8c5a0be 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempTopic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempTopic.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region; +import jakarta.jms.InvalidDestinationException; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; @@ -30,8 +31,11 @@ * * */ -public class TempTopic extends Topic implements Task{ +public class TempTopic extends Topic implements Task, TempDestination { + private final ActiveMQTempDestination tempDest; + private boolean allowTempDestinationStealing = false; + /** * @param brokerService * @param destination @@ -50,6 +54,11 @@ public TempTopic(BrokerService brokerService, } public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { + final String connectionId = sub.getConsumerInfo().getConsumerId().getConnectionId(); + if (!isAllowTempDestinationStealing() && !tempDest.getConnectionId().equals(connectionId)) { + throw new InvalidDestinationException("Subscribing to a temporary topic created by another connection is not permitted"); + } + // Only consumers on the same connection can consume from // the temporary destination // However, we could have failed over - and we do this @@ -70,4 +79,14 @@ public void addSubscription(ConnectionContext context, Subscription sub) throws public void initialize() { } + + @Override + public boolean isAllowTempDestinationStealing() { + return allowTempDestinationStealing; + } + + @Override + public void setAllowTempDestinationStealing(boolean allowTempDestinationStealing) { + this.allowTempDestinationStealing = allowTempDestinationStealing; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 8ae052574bc..235ab7d5dc0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -27,6 +27,7 @@ import org.apache.activemq.broker.region.QueueBrowserSubscription; import org.apache.activemq.broker.region.QueueSubscription; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.TempDestination; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; @@ -108,6 +109,7 @@ public class PolicyEntry extends DestinationMapEntry { private boolean useTopicSubscriptionInflightStats = true; private boolean advancedNetworkStatisticsEnabled = false; // [AMQ-9437] private boolean advancedMessageStatisticsEnabled = false; // [AMQ-8463] + private boolean allowTempDestinationStealing = false; /* * percentage of in-flight messages above which optimize message store is disabled @@ -314,6 +316,12 @@ public void baseUpdate(BaseDestination destination, Set includedProperti if (isUpdate("advancedMessageStatisticsEnabled", includedProperties)) { destination.setAdvancedMessageStatisticsEnabled(isAdvancedMessageStatisticsEnabled()); } + if (destination instanceof TempDestination) { + if (isUpdate("allowTempDestinationStealing", includedProperties)) { + ((TempDestination) destination).setAllowTempDestinationStealing( + isAllowTempDestinationStealing()); + } + } } public void baseConfiguration(Broker broker, BaseDestination destination) { @@ -1200,4 +1208,12 @@ public boolean isAdvancedMessageStatisticsEnabled() { public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) { this.advancedMessageStatisticsEnabled = advancedMessageStatisticsEnabled; } + + public boolean isAllowTempDestinationStealing() { + return allowTempDestinationStealing; + } + + public void setAllowTempDestinationStealing(boolean allowTempDestinationStealing) { + this.allowTempDestinationStealing = allowTempDestinationStealing; + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java index 1bf1b6d340a..99f82b4c604 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java @@ -16,6 +16,13 @@ */ package org.apache.activemq; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; @@ -29,6 +36,7 @@ import jakarta.jms.BytesMessage; import jakarta.jms.Connection; import jakarta.jms.DeliveryMode; +import jakarta.jms.Destination; import jakarta.jms.InvalidDestinationException; import jakarta.jms.JMSException; import jakarta.jms.Message; @@ -38,38 +46,79 @@ import jakarta.jms.Session; import jakarta.jms.TemporaryQueue; import jakarta.jms.TextMessage; - -import junit.framework.TestCase; - +import java.util.Arrays; +import java.util.Collection; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.ExceptionResponse; +import org.apache.activemq.command.Response; import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.vm.VMTransport; import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @version */ -public class JmsTempDestinationTest extends TestCase { +@RunWith(Parameterized.class) +public class JmsTempDestinationTest { + + @Parameters(name="allowTempDestinationStealing={0}") + public static Collection data() { + return Arrays.asList(new Object[][] { + {false}, + {true}, + }); + } private static final Logger LOG = LoggerFactory.getLogger(JmsTempDestinationTest.class); private Connection connection; private ActiveMQConnectionFactory factory; - protected List connections = Collections.synchronizedList(new ArrayList()); + protected List connections = Collections.synchronizedList(new ArrayList<>()); + private BrokerService brokerService; + private final boolean allowTempDestinationStealing; + + public JmsTempDestinationTest(boolean allowTempDestinationStealing) { + this.allowTempDestinationStealing = allowTempDestinationStealing; + } - @Override - protected void setUp() throws Exception { - factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + + PolicyEntry tempQueueEntry = new PolicyEntry(); + tempQueueEntry.setTempQueue(true); + tempQueueEntry.setAllowTempDestinationStealing(allowTempDestinationStealing); + PolicyEntry tempTopicEntry = new PolicyEntry(); + tempTopicEntry.setTempTopic(true); + tempTopicEntry.setAllowTempDestinationStealing(allowTempDestinationStealing); + + PolicyMap pMap = new PolicyMap(); + pMap.setPolicyEntries(List.of(tempQueueEntry, tempTopicEntry)); + + brokerService.setDestinationPolicy(pMap); + brokerService.start(); + brokerService.waitUntilStarted(); + + factory = new ActiveMQConnectionFactory("vm://localhost"); factory.setAlwaysSyncSend(true); connection = factory.createConnection(); connections.add(connection); } - /** - * @see junit.framework.TestCase#tearDown() - */ - @Override - protected void tearDown() throws Exception { + @After + public void tearDown() throws Exception { for (Iterator iter = connections.iterator(); iter.hasNext();) { Connection conn = iter.next(); try { @@ -78,6 +127,8 @@ protected void tearDown() throws Exception { } iter.remove(); } + brokerService.stop(); + brokerService.waitUntilStopped(); } /** @@ -85,6 +136,7 @@ protected void tearDown() throws Exception { * * @throws JMSException */ + @Test public void testTempDestOnlyConsumedByLocalConn() throws JMSException { connection.start(); @@ -101,11 +153,12 @@ public void testTempDestOnlyConsumedByLocalConn() throws JMSException { Session otherSession = otherConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryQueue otherQueue = otherSession.createTemporaryQueue(); MessageConsumer consumer = otherSession.createConsumer(otherQueue); - Message msg = consumer.receive(3000); + Message msg = consumer.receive(2000); assertNull(msg); // should throw InvalidDestinationException when consuming a temp - // destination from another connection + // destination from another connection. + // Note that this check is done in the client side try { consumer = otherSession.createConsumer(queue); fail("Send should fail since temp destination should be used from another connection"); @@ -115,17 +168,66 @@ public void testTempDestOnlyConsumedByLocalConn() throws JMSException { // should be able to consume temp destination from the same connection consumer = tempSession.createConsumer(queue); - msg = consumer.receive(3000); + msg = consumer.receive(2000); assertNotNull(msg); + } + // Test broker checks and enforces allowTempDestinationStealing flag + @Test + public void testAllowTempDestStealingQueue() throws Exception { + testAllowTempDestStealing(false, allowTempDestinationStealing); } + // Test broker checks and enforces allowTempDestinationStealing flag + @Test + public void testAllowTempDestStealingTopic() throws Exception { + testAllowTempDestStealing(true, allowTempDestinationStealing); + } + + // Test broker checks and enforces allowTempDestinationStealing flag + private void testAllowTempDestStealing(boolean topic, boolean tempDestStealing) throws Exception { + connection.start(); + + // create a temporary queue on the first connection + Session tempSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination tempDest = topic ? tempSession.createTemporaryTopic() : + tempSession.createTemporaryQueue(); + + // Create another connection/session + ActiveMQConnection otherConnection = (ActiveMQConnection) factory.createConnection(); + connections.add(otherConnection); + ActiveMQSession otherSession = (ActiveMQSession) otherConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Send a direct ConsumerInfo to bypass the client check that would normally block this + // This will try and subscribe the second connection to the first connections + // temporary dest + ConsumerInfo info = new ConsumerInfo(otherSession.getNextConsumerId()); + info.setClientId(otherSession.connection.getClientID()); + info.setDestination(ActiveMQMessageTransformation.transformDestination(tempDest)); + Object result = otherConnection.getTransport().request(info, 1000); + + // The broker should allow because allowTempDestinationStealing = true + if (tempDestStealing) { + assertTrue(result instanceof Response); + assertFalse(((Response) result).isException()); + } else { + // The broker should throw an error because allowTempDestinationStealing = false + assertTrue(result instanceof ExceptionResponse); + assertTrue(((Response) result).isException()); + assertTrue(((ExceptionResponse) result).getException() instanceof InvalidDestinationException); + assertTrue(((ExceptionResponse) result).getException().getMessage() + .contains("created by another connection is not permitted")); + } + } + + /** * Make sure that a temp queue does not drop message if there is an active * consumers. * * @throws JMSException */ + @Test public void testTempQueueHoldsMessagesWithConsumers() throws JMSException { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createTemporaryQueue(); @@ -140,7 +242,8 @@ public void testTempQueueHoldsMessagesWithConsumers() throws JMSException { Message message2 = consumer.receive(1000); assertNotNull(message2); assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage); - assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage)message2).getText().equals(message.getText())); + assertEquals("Expected message to be a '" + message.getText() + "'", + ((TextMessage) message2).getText(), message.getText()); } /** @@ -149,6 +252,7 @@ public void testTempQueueHoldsMessagesWithConsumers() throws JMSException { * * @throws JMSException */ + @Test public void testTempQueueHoldsMessagesWithoutConsumers() throws JMSException { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -160,10 +264,11 @@ public void testTempQueueHoldsMessagesWithoutConsumers() throws JMSException { connection.start(); MessageConsumer consumer = session.createConsumer(queue); - Message message2 = consumer.receive(3000); + Message message2 = consumer.receive(1000); assertNotNull(message2); assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage); - assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage)message2).getText().equals(message.getText())); + assertEquals("Expected message to be a '" + message.getText() + "'", + ((TextMessage) message2).getText(), message.getText()); } @@ -172,11 +277,12 @@ public void testTempQueueHoldsMessagesWithoutConsumers() throws JMSException { * * @throws JMSException */ + @Test public void testTmpQueueWorksUnderLoad() throws JMSException { int count = 500; int dataSize = 1024; - ArrayList list = new ArrayList(count); + ArrayList list = new ArrayList<>(count); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createTemporaryQueue(); MessageProducer producer = session.createProducer(queue); @@ -195,9 +301,9 @@ public void testTmpQueueWorksUnderLoad() throws JMSException { MessageConsumer consumer = session.createConsumer(queue); for (int i = 0; i < count; i++) { Message message2 = consumer.receive(2000); - assertTrue(message2 != null); + assertNotNull(message2); assertEquals(i, message2.getIntProperty("c")); - assertTrue(message2.equals(list.get(i))); + assertEquals(message2, list.get(i)); } } @@ -209,7 +315,10 @@ public void testTmpQueueWorksUnderLoad() throws JMSException { * @throws InterruptedException * @throws URISyntaxException */ + @Test public void testPublishFailsForClosedConnection() throws Exception { + // This test is slow and we only need to run this test with the default + Assume.assumeFalse(allowTempDestinationStealing); Connection tempConnection = factory.createConnection(); connections.add(tempConnection); @@ -252,7 +361,10 @@ public void testPublishFailsForClosedConnection() throws Exception { * @throws JMSException * @throws InterruptedException */ + @Test public void testPublishFailsForDestroyedTempDestination() throws Exception { + // This test is slow and we only need to run this test with the default + Assume.assumeFalse(allowTempDestinationStealing); Connection tempConnection = factory.createConnection(); connections.add(tempConnection); @@ -293,6 +405,7 @@ public void testPublishFailsForDestroyedTempDestination() throws Exception { * * @throws JMSException */ + @Test public void testDeleteDestinationWithSubscribersFails() throws JMSException { Connection connection = factory.createConnection(); connections.add(connection); @@ -313,7 +426,11 @@ public void testDeleteDestinationWithSubscribersFails() throws JMSException { } } + @Test public void testSlowConsumerDoesNotBlockFastTempUsers() throws Exception { + // This test is slow and we only need to run this test with the default + Assume.assumeFalse(allowTempDestinationStealing); + ActiveMQConnectionFactory advisoryConnFactory = new ActiveMQConnectionFactory("vm://localhost?asyncQueueDepth=20"); Connection connection = advisoryConnFactory.createConnection(); connections.add(connection);