diff --git a/spring-cloud-gcp-pubsub/pom.xml b/spring-cloud-gcp-pubsub/pom.xml index 99e0eb7fb2..f8101e3a0e 100644 --- a/spring-cloud-gcp-pubsub/pom.xml +++ b/spring-cloud-gcp-pubsub/pom.xml @@ -1,5 +1,6 @@ - 4.0.0 @@ -33,15 +34,17 @@ io.projectreactor reactor-core + + org.springframework.cloud + spring-cloud-gcp-core + + + org.springframework spring-test test - - org.springframework.cloud - spring-cloud-gcp-core - diff --git a/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/PubsubAdmin.java b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/PubsubAdmin.java new file mode 100644 index 0000000000..cb351875a8 --- /dev/null +++ b/spring-cloud-gcp-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/PubsubAdmin.java @@ -0,0 +1,236 @@ +/* + * Copyright 2017 original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.gcp.pubsub; + +import java.io.IOException; +import java.util.List; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.cloud.pubsub.v1.PagedResponseWrappers; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminSettings; +import com.google.common.collect.Lists; +import com.google.pubsub.v1.ProjectName; +import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; + +import org.springframework.cloud.gcp.core.GcpProjectIdProvider; +import org.springframework.util.Assert; + +/** + * Pub/Sub admin utility that creates new topics and subscriptions on Google Cloud Pub/Sub. + * + * @author João André Martins + */ +public class PubsubAdmin { + + private final String projectId; + + private final TopicAdminClient topicAdminClient; + + private final SubscriptionAdminClient subscriptionAdminClient; + + /** Default inspired in the subscription creation web UI. */ + private int defaultAckDeadline = 10; + + /** + * This constructor instantiates TopicAdminClient and SubscriptionAdminClient with all their + * defaults and the provided credentials provider. + */ + public PubsubAdmin(GcpProjectIdProvider projectIdProvider, + CredentialsProvider credentialsProvider) throws IOException { + this(projectIdProvider, + TopicAdminClient.create( + TopicAdminSettings.defaultBuilder() + .setCredentialsProvider(credentialsProvider) + .build()), + SubscriptionAdminClient.create( + SubscriptionAdminSettings.defaultBuilder() + .setCredentialsProvider(credentialsProvider) + .build())); + } + + public PubsubAdmin(GcpProjectIdProvider projectIdProvider, TopicAdminClient topicAdminClient, + SubscriptionAdminClient subscriptionAdminClient) { + Assert.notNull(projectIdProvider, "The project ID provider can't be null."); + Assert.notNull(topicAdminClient, "The topic administration client can't be null"); + Assert.notNull(subscriptionAdminClient, + "The subscription administration client can't be null"); + + this.projectId = projectIdProvider.getProjectId(); + Assert.hasText(this.projectId, "The project ID can't be null or empty."); + this.topicAdminClient = topicAdminClient; + this.subscriptionAdminClient = subscriptionAdminClient; + } + + /** + * Create a new topic on Google Cloud Pub/Sub. + * + * @param topicName the name for the new topic + * @return the created topic + */ + public Topic createTopic(String topicName) { + Assert.hasText(topicName, "No topic name was specified."); + + return this.topicAdminClient.createTopic(TopicName.create(this.projectId, topicName)); + } + + /** + * Delete a topic from Google Cloud Pub/Sub. + * + * @param topicName the name of the topic to be deleted + */ + public void deleteTopic(String topicName) { + Assert.hasText(topicName, "No topic name was specified."); + + this.topicAdminClient.deleteTopic(TopicName.create(this.projectId, topicName)); + } + + /** + * Return every topic in a project. + * + *

If there are multiple pages, they will all be merged into the same result. + */ + public List listTopics() { + PagedResponseWrappers.ListTopicsPagedResponse topicListPage = + this.topicAdminClient.listTopics(ProjectName.create(this.projectId)); + + return Lists.newArrayList(topicListPage.iterateAll()); + } + + /** + * Create a new subscription on Google Cloud Pub/Sub. + * + * @param subscriptionName the name of the new subscription + * @param topicName the name of the topic being subscribed to + * @return the created subscription + */ + public Subscription createSubscription(String subscriptionName, String topicName) { + return createSubscription(subscriptionName, topicName, null, null); + } + + /** + * Create a new subscription on Google Cloud Pub/Sub. + * + * @param subscriptionName the name of the new subscription + * @param topicName the name of the topic being subscribed to + * @param ackDeadline deadline in seconds before a message is resent. If not provided, set to + * default of 10 seconds + * @return the created subscription + */ + public Subscription createSubscription(String subscriptionName, String topicName, + Integer ackDeadline) { + return createSubscription(subscriptionName, topicName, ackDeadline, null); + } + + /** + * Create a new subscription on Google Cloud Pub/Sub. + * + * @param subscriptionName the name of the new subscription + * @param topicName the name of the topic being subscribed to + * @param pushEndpoint URL of the service receiving the push messages. If not provided, uses + * message pulling by default + * @return the created subscription + */ + public Subscription createSubscription(String subscriptionName, String topicName, + String pushEndpoint) { + return createSubscription(subscriptionName, topicName, null, pushEndpoint); + } + + /** + * Create a new subscription on Google Cloud Pub/Sub. + * + * @param subscriptionName the name of the new subscription + * @param topicName the name of the topic being subscribed to + * @param ackDeadline deadline in seconds before a message is resent. If not provided, set to + * default of 10 seconds + * @param pushEndpoint URL of the service receiving the push messages. If not provided, uses + * message pulling by default + * @return the created subscription + */ + public Subscription createSubscription(String subscriptionName, String topicName, + Integer ackDeadline, String pushEndpoint) { + Assert.hasText(subscriptionName, "No subscription name was specified."); + Assert.hasText(topicName, "No topic name was specified."); + + int finalAckDeadline = this.defaultAckDeadline; + if (ackDeadline != null) { + Assert.isTrue(ackDeadline >= 0, + "The acknowledgement deadline value can't be negative."); + finalAckDeadline = ackDeadline; + } + + PushConfig.Builder pushConfigBuilder = PushConfig.newBuilder(); + if (pushEndpoint != null) { + pushConfigBuilder.setPushEndpoint(pushEndpoint); + } + + return this.subscriptionAdminClient.createSubscription( + SubscriptionName.create(this.projectId, subscriptionName), + TopicName.create(this.projectId, topicName), + pushConfigBuilder.build(), + finalAckDeadline); + } + + /** + * Delete a subscription from Google Cloud Pub/Sub. + * + * @param subscriptionName + */ + public void deleteSubscription(String subscriptionName) { + Assert.hasText(subscriptionName, "No subscription name was specified"); + + this.subscriptionAdminClient.deleteSubscription( + SubscriptionName.create(this.projectId, subscriptionName)); + } + + /** + * Return every subscription in a project. + * + *

If there are multiple pages, they will all be merged into the same result. + */ + public List listSubscriptions() { + PagedResponseWrappers.ListSubscriptionsPagedResponse subscriptionsPage = + this.subscriptionAdminClient.listSubscriptions(ProjectName.create(this.projectId)); + + return Lists.newArrayList(subscriptionsPage.iterateAll()); + } + + /** + * @return the default acknowledgement deadline value in seconds + */ + public int getDefaultAckDeadline() { + return this.defaultAckDeadline; + } + + /** + * Set the default acknowledgement deadline value. + * + * @param defaultAckDeadline default acknowledgement deadline value in seconds + */ + public void setDefaultAckDeadline(int defaultAckDeadline) { + Assert.isTrue(defaultAckDeadline >= 0, + "The acknowledgement deadline value can't be negative."); + + this.defaultAckDeadline = defaultAckDeadline; + } +} diff --git a/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/PubsubAdminTests.java b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/PubsubAdminTests.java new file mode 100644 index 0000000000..b57fef19ef --- /dev/null +++ b/spring-cloud-gcp-pubsub/src/test/java/org/springframework/cloud/gcp/pubsub/PubsubAdminTests.java @@ -0,0 +1,52 @@ +/* + * Copyright 2017 original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.gcp.pubsub; + +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminClient; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +/** + * @author João André Martins + */ +@RunWith(MockitoJUnitRunner.class) +public class PubsubAdminTests { + + @Mock + private TopicAdminClient mockTopicAdminClient; + + @Mock + private SubscriptionAdminClient mockSubscriptionAdminClient; + + @Test(expected = IllegalArgumentException.class) + public void testNewPubsubAdmin_nullProjectProvider() { + new PubsubAdmin(null, this.mockTopicAdminClient, this.mockSubscriptionAdminClient); + } + + @Test(expected = IllegalArgumentException.class) + public void testNewPubsubAdmin_nullTopicAdminClient() { + new PubsubAdmin(() -> "test-project", null, this.mockSubscriptionAdminClient); + } + + @Test(expected = IllegalArgumentException.class) + public void testNewPubsubAdmin_nullSubscriptionAdminClient() { + new PubsubAdmin(() -> "test-project", this.mockTopicAdminClient, null); + } +} diff --git a/spring-cloud-gcp-starters/spring-cloud-gcp-starter-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/autoconfig/GcpPubsubAutoConfiguration.java b/spring-cloud-gcp-starters/spring-cloud-gcp-starter-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/autoconfig/GcpPubsubAutoConfiguration.java index 8c45eee18c..08860cf61e 100644 --- a/spring-cloud-gcp-starters/spring-cloud-gcp-starter-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/autoconfig/GcpPubsubAutoConfiguration.java +++ b/spring-cloud-gcp-starters/spring-cloud-gcp-starter-pubsub/src/main/java/org/springframework/cloud/gcp/pubsub/autoconfig/GcpPubsubAutoConfiguration.java @@ -16,13 +16,16 @@ package org.springframework.cloud.gcp.pubsub.autoconfig; +import java.io.IOException; import java.util.concurrent.Executors; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.grpc.ChannelProvider; import com.google.api.gax.grpc.ExecutorProvider; import com.google.api.gax.grpc.FixedExecutorProvider; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; +import com.google.cloud.pubsub.v1.TopicAdminClient; import com.google.cloud.pubsub.v1.TopicAdminSettings; import org.springframework.beans.factory.annotation.Qualifier; @@ -31,6 +34,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.cloud.gcp.core.GcpProjectIdProvider; import org.springframework.cloud.gcp.core.autoconfig.GcpContextAutoConfiguration; +import org.springframework.cloud.gcp.pubsub.PubsubAdmin; +import org.springframework.cloud.gcp.pubsub.core.PubsubException; import org.springframework.cloud.gcp.pubsub.core.PubsubTemplate; import org.springframework.cloud.gcp.pubsub.support.DefaultPublisherFactory; import org.springframework.cloud.gcp.pubsub.support.DefaultSubscriberFactory; @@ -112,4 +117,42 @@ public PublisherFactory defaultPublisherFactory(GcpProjectIdProvider projectIdPr return new DefaultPublisherFactory(projectIdProvider, subscriberProvider, channelProvider, credentialsProvider); } + + @Bean + @ConditionalOnMissingBean + public PubsubAdmin pubsubAdmin(GcpProjectIdProvider projectIdProvider, + TopicAdminClient topicAdminClient, + SubscriptionAdminClient subscriptionAdminClient) { + return new PubsubAdmin(projectIdProvider, topicAdminClient, subscriptionAdminClient); + } + + @Bean + @ConditionalOnMissingBean + public TopicAdminClient topicAdminClient(CredentialsProvider credentialsProvider) { + try { + return TopicAdminClient.create( + TopicAdminSettings.defaultBuilder() + .setCredentialsProvider(credentialsProvider) + .build()); + } + catch (IOException ioe) { + throw new PubsubException("An error occurred while creating TopicAdminClient.", ioe); + } + } + + @Bean + @ConditionalOnMissingBean + public SubscriptionAdminClient subscriptionAdminClient( + CredentialsProvider credentialsProvider) { + try { + return SubscriptionAdminClient.create( + SubscriptionAdminSettings.defaultBuilder() + .setCredentialsProvider(credentialsProvider) + .build()); + } + catch (IOException ioe) { + throw new PubsubException("An error occurred while creating SubscriptionAdminClient.", + ioe); + } + } }