Skip to content

Add Spring Pulsar transaction support #40189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.thread.Threading;
import org.springframework.boot.util.LambdaSafe;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.env.Environment;
Expand All @@ -57,6 +59,8 @@
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
import org.springframework.pulsar.transaction.PulsarAwareTransactionManager;
import org.springframework.pulsar.transaction.PulsarTransactionManager;

/**
* {@link EnableAutoConfiguration Auto-configuration} for Apache Pulsar.
Expand Down Expand Up @@ -126,8 +130,11 @@ private void applyProducerBuilderCustomizers(List<ProducerBuilderCustomizer<?>>
PulsarTemplate<?> pulsarTemplate(PulsarProducerFactory<?> pulsarProducerFactory,
ObjectProvider<ProducerInterceptor> producerInterceptors, SchemaResolver schemaResolver,
TopicResolver topicResolver) {
return new PulsarTemplate<>(pulsarProducerFactory, producerInterceptors.orderedStream().toList(),
schemaResolver, topicResolver, this.properties.getTemplate().isObservationsEnabled());
PulsarTemplate<?> template = new PulsarTemplate<>(pulsarProducerFactory,
producerInterceptors.orderedStream().toList(), schemaResolver, topicResolver,
this.properties.getTemplate().isObservationsEnabled());
this.propertiesMapper.customizeTemplate(template);
return template;
}

@Bean
Expand All @@ -142,6 +149,13 @@ DefaultPulsarConsumerFactory<?> pulsarConsumerFactory(PulsarClient pulsarClient,
return new DefaultPulsarConsumerFactory<>(pulsarClient, lambdaSafeCustomizers);
}

@Bean
@ConditionalOnMissingBean
@Conditional(TransactionsEnabledCondition.class)
public PulsarAwareTransactionManager pulsarTransactionManager(PulsarClient pulsarClient) {
return new PulsarTransactionManager(pulsarClient);
}

@SuppressWarnings("unchecked")
private void applyConsumerBuilderCustomizers(List<ConsumerBuilderCustomizer<?>> customizers,
ConsumerBuilder<?> builder) {
Expand All @@ -153,13 +167,15 @@ private void applyConsumerBuilderCustomizers(List<ConsumerBuilderCustomizer<?>>
@ConditionalOnMissingBean(name = "pulsarListenerContainerFactory")
ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
PulsarConsumerFactory<Object> pulsarConsumerFactory, SchemaResolver schemaResolver,
TopicResolver topicResolver, Environment environment) {
TopicResolver topicResolver, ObjectProvider<PulsarAwareTransactionManager> pulsarTransactionManager,
Environment environment) {
PulsarContainerProperties containerProperties = new PulsarContainerProperties();
containerProperties.setSchemaResolver(schemaResolver);
containerProperties.setTopicResolver(topicResolver);
if (Threading.VIRTUAL.isActive(environment)) {
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor("pulsar-consumer-"));
}
pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager);
this.propertiesMapper.customizeContainerProperties(containerProperties);
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
}
Expand Down Expand Up @@ -203,4 +219,26 @@ static class EnablePulsarConfiguration {

}

/**
* Custom condition that is true when transactions have been enabled in the template
* and/or in the listener.
*/
static final class TransactionsEnabledCondition extends AnyNestedCondition {

TransactionsEnabledCondition() {
super(ConfigurationPhase.REGISTER_BEAN);
}

@ConditionalOnProperty(prefix = "spring.pulsar.template.transaction", name = "enabled", havingValue = "true")
static class TemplateTransactionEnabledCondition {

}

@ConditionalOnProperty(prefix = "spring.pulsar.listener.transaction", name = "enabled", havingValue = "true")
static class ListenerTransactionEnabledCondition {

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -103,6 +104,14 @@ public Template getTemplate() {
return this.template;
}

/**
* Whether transactions are enabled for either the template or the listener.
* @return whether transactions are enabled for either the template or the listener
*/
boolean isTransactionEnabled() {
return this.template.getTransaction().isEnabled() || this.listener.getTransaction().isEnabled();
}

public static class Client {

/**
Expand Down Expand Up @@ -763,6 +772,11 @@ public static class Listener {
*/
private boolean observationEnabled;

/**
* Transaction settings.
*/
private final Transaction transaction = new ListenerTransaction();

public SchemaType getSchemaType() {
return this.schemaType;
}
Expand All @@ -779,6 +793,10 @@ public void setObservationEnabled(boolean observationEnabled) {
this.observationEnabled = observationEnabled;
}

public Transaction getTransaction() {
return this.transaction;
}

}

public static class Reader {
Expand Down Expand Up @@ -858,6 +876,11 @@ public static class Template {
*/
private boolean observationsEnabled;

/**
* Transaction settings.
*/
private final Transaction transaction = new TemplateTransaction();

public boolean isObservationsEnabled() {
return this.observationsEnabled;
}
Expand All @@ -866,6 +889,89 @@ public void setObservationsEnabled(boolean observationsEnabled) {
this.observationsEnabled = observationsEnabled;
}

public Transaction getTransaction() {
return this.transaction;
}

}

public abstract static class Transaction {

/**
* Whether transactions are enabled for the component.
*/
private boolean enabled;

/**
* Whether the component requires transactions.
*/
private boolean required;

/**
* Duration representing the transaction timeout - null to use default timeout of
* the underlying transaction system, or none if timeouts are not supported.
*/
private Duration timeout;

public boolean isEnabled() {
return this.enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public boolean isRequired() {
return this.required;
}

public void setRequired(boolean required) {
this.required = required;
}

public Duration getTimeout() {
return this.timeout;
}

public void setTimeout(Duration timeout) {
this.timeout = timeout;
}

void validate() {
if (this.required && !this.enabled) {
String requiredProp = "%s.required".formatted(this.propertyPath());
String enabledProp = "%s.enabled".formatted(this.propertyPath());
throw new InvalidConfigurationPropertyValueException(requiredProp, this.required,
"Transactions must be enabled in order to be required. "
+ "Either set %s to 'true' or make transactions optional by setting %s to 'false'"
.formatted(enabledProp, requiredProp));
}
}

/**
* Gets the property path that the transaction properties are mapped to.
* @return the property path that the transaction properties are mapped to
*/
protected abstract String propertyPath();

}

static class TemplateTransaction extends Transaction {

@Override
protected String propertyPath() {
return "spring.pulsar.template.transaction";
}

}

static class ListenerTransaction extends Transaction {

@Override
protected String propertyPath() {
return "spring.pulsar.listener.transaction";
}

}

public static class Authentication {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pulsar.client.impl.AutoClusterFailover.AutoClusterFailoverBuilderImpl;

import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
import org.springframework.util.StringUtils;
Expand All @@ -64,6 +65,9 @@ void customizeClientBuilder(ClientBuilder clientBuilder, PulsarConnectionDetails
map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout));
map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout));
map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout));
if (this.properties.isTransactionEnabled()) {
clientBuilder.enableTransaction(true);
}
customizeAuthentication(properties.getAuthentication(), clientBuilder::authentication);
customizeServiceUrlProviderBuilder(clientBuilder::serviceUrl, clientBuilder::serviceUrlProvider, properties,
connectionDetails);
Expand Down Expand Up @@ -157,6 +161,15 @@ <T> void customizeProducerBuilder(ProducerBuilder<T> producerBuilder) {
map.from(properties::getAccessMode).to(producerBuilder::accessMode);
}

<T> void customizeTemplate(PulsarTemplate<T> template) {
PulsarProperties.Transaction properties = this.properties.getTemplate().getTransaction();
properties.validate();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::isEnabled).to(template.transactions()::setEnabled);
map.from(properties::isRequired).to(template.transactions()::setRequired);
map.from(properties::getTimeout).to(template.transactions()::setTimeout);
}

<T> void customizeConsumerBuilder(ConsumerBuilder<T> consumerBuilder) {
PulsarProperties.Consumer properties = this.properties.getConsumer();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
Expand All @@ -183,6 +196,7 @@ private void customizeConsumerBuilderSubscription(ConsumerBuilder<?> consumerBui
void customizeContainerProperties(PulsarContainerProperties containerProperties) {
customizePulsarContainerConsumerSubscriptionProperties(containerProperties);
customizePulsarContainerListenerProperties(containerProperties);
customizePulsarContainerTransactionProperties(containerProperties);
}

private void customizePulsarContainerConsumerSubscriptionProperties(PulsarContainerProperties containerProperties) {
Expand All @@ -198,6 +212,15 @@ private void customizePulsarContainerListenerProperties(PulsarContainerPropertie
map.from(properties::isObservationEnabled).to(containerProperties::setObservationEnabled);
}

private void customizePulsarContainerTransactionProperties(PulsarContainerProperties containerProperties) {
PulsarProperties.Transaction properties = this.properties.getListener().getTransaction();
properties.validate();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::isEnabled).to(containerProperties.transactions()::setEnabled);
map.from(properties::isRequired).to(containerProperties.transactions()::setRequired);
map.from(properties::getTimeout).to(containerProperties.transactions()::setTimeout);
}

<T> void customizeReaderBuilder(ReaderBuilder<T> readerBuilder) {
PulsarProperties.Reader properties = this.properties.getReader();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
Expand Down
Loading