Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public Destination createDestination(ConnectionContext context, ActiveMQDestinat
if (destination.isQueue()) {
if (destination.isTemporary()) {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
if (tempDest.getConnectionId() == null) {
throw new IllegalArgumentException("Temporary queue must have a connectionId");
}
Queue queue = new TempQueue(brokerService, destination, null, destinationStatistics, taskRunnerFactory);
configureQueue(queue, destination);
queue.initialize();
Expand All @@ -88,8 +91,11 @@ public Destination createDestination(ConnectionContext context, ActiveMQDestinat
return queue;
}
} else if (destination.isTemporary()) {

Topic topic = new Topic(brokerService, destination, null, destinationStatistics, taskRunnerFactory);
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
if (tempDest.getConnectionId() == null) {
throw new IllegalArgumentException("Temporary topic must have a connectionId");
}
Topic topic = new TempTopic(brokerService, destination, null, destinationStatistics, taskRunnerFactory);
configureTopic(topic, destination);
topic.initialize();
return topic;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;

public interface TempDestination extends Destination {

void setAllowTempDestinationStealing(boolean allowTempDestinationStealing);

boolean isAllowTempDestinationStealing();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region;

import jakarta.jms.InvalidDestinationException;
import java.io.IOException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
Expand All @@ -33,10 +34,12 @@
*
*
*/
public class TempQueue extends Queue{
public class TempQueue extends Queue implements TempDestination{

private static final Logger LOG = LoggerFactory.getLogger(TempQueue.class);

private final ActiveMQTempDestination tempDest;

private boolean allowTempDestinationStealing = false;

/**
* @param brokerService
Expand Down Expand Up @@ -65,6 +68,11 @@ public void initialize() throws Exception {

@Override
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
final String connectionId = sub.getConsumerInfo().getConsumerId().getConnectionId();
if (!isAllowTempDestinationStealing() && !tempDest.getConnectionId().equals(connectionId)) {
throw new InvalidDestinationException("Subscribing to a temporary queue created by another connection is not permitted");
}

// Only consumers on the same connection can consume from
// the temporary destination
// However, we could have failed over - and we do this
Expand Down Expand Up @@ -97,4 +105,14 @@ public void dispose(ConnectionContext context) throws IOException {
}
super.dispose(context);
}

@Override
public boolean isAllowTempDestinationStealing() {
return allowTempDestinationStealing;
}

@Override
public void setAllowTempDestinationStealing(boolean allowTempDestinationStealing) {
this.allowTempDestinationStealing = allowTempDestinationStealing;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region;

import jakarta.jms.InvalidDestinationException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
Expand All @@ -30,8 +31,11 @@
*
*
*/
public class TempTopic extends Topic implements Task{
public class TempTopic extends Topic implements Task, TempDestination {

private final ActiveMQTempDestination tempDest;
private boolean allowTempDestinationStealing = false;

/**
* @param brokerService
* @param destination
Expand All @@ -50,6 +54,11 @@ public TempTopic(BrokerService brokerService,
}

public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
final String connectionId = sub.getConsumerInfo().getConsumerId().getConnectionId();
if (!isAllowTempDestinationStealing() && !tempDest.getConnectionId().equals(connectionId)) {
throw new InvalidDestinationException("Subscribing to a temporary topic created by another connection is not permitted");
}

// Only consumers on the same connection can consume from
// the temporary destination
// However, we could have failed over - and we do this
Expand All @@ -70,4 +79,14 @@ public void addSubscription(ConnectionContext context, Subscription sub) throws

public void initialize() {
}

@Override
public boolean isAllowTempDestinationStealing() {
return allowTempDestinationStealing;
}

@Override
public void setAllowTempDestinationStealing(boolean allowTempDestinationStealing) {
this.allowTempDestinationStealing = allowTempDestinationStealing;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.activemq.broker.region.QueueBrowserSubscription;
import org.apache.activemq.broker.region.QueueSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TempDestination;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
Expand Down Expand Up @@ -108,6 +109,7 @@ public class PolicyEntry extends DestinationMapEntry {
private boolean useTopicSubscriptionInflightStats = true;
private boolean advancedNetworkStatisticsEnabled = false; // [AMQ-9437]
private boolean advancedMessageStatisticsEnabled = false; // [AMQ-8463]
private boolean allowTempDestinationStealing = false;

/*
* percentage of in-flight messages above which optimize message store is disabled
Expand Down Expand Up @@ -314,6 +316,12 @@ public void baseUpdate(BaseDestination destination, Set<String> includedProperti
if (isUpdate("advancedMessageStatisticsEnabled", includedProperties)) {
destination.setAdvancedMessageStatisticsEnabled(isAdvancedMessageStatisticsEnabled());
}
if (destination instanceof TempDestination) {
if (isUpdate("allowTempDestinationStealing", includedProperties)) {
((TempDestination) destination).setAllowTempDestinationStealing(
isAllowTempDestinationStealing());
}
}
}

public void baseConfiguration(Broker broker, BaseDestination destination) {
Expand Down Expand Up @@ -1200,4 +1208,12 @@ public boolean isAdvancedMessageStatisticsEnabled() {
public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled) {
this.advancedMessageStatisticsEnabled = advancedMessageStatisticsEnabled;
}

public boolean isAllowTempDestinationStealing() {
return allowTempDestinationStealing;
}

public void setAllowTempDestinationStealing(boolean allowTempDestinationStealing) {
this.allowTempDestinationStealing = allowTempDestinationStealing;
}
}
Loading
Loading