Skip to content
This repository was archived by the owner on Jan 19, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions spring-cloud-gcp-pubsub/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
Expand Down Expand Up @@ -33,15 +34,17 @@
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-core</artifactId>
</dependency>

<!-- Tests -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-core</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Copyright 2017 original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gcp.pubsub;

import java.util.List;

import com.google.cloud.pubsub.v1.PagedResponseWrappers;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.common.collect.Lists;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;

import org.springframework.cloud.gcp.core.GcpProjectIdProvider;
import org.springframework.util.Assert;

/**
* Pub/Sub admin utility that creates new topics and subscriptions on Google Cloud Pub/Sub.
*
* @author João André Martins
*/
public class PubsubAdmin {

private final String projectId;

private TopicAdminClient topicAdminClient;

private SubscriptionAdminClient subscriptionAdminClient;

/** Default inspired in the subscription creation web UI. */
private static final int DEFAULT_ACK_DEADLINE = 10;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to expose it as a configuration property?
Something like @Value("${spring.cloud.gcp.pubsub.defaultAckDeadline:10}").

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The createSubscription() API lets you specify an ackDeadline parameter. I didn't add the value here because the user can do that by themselves in another class (i.e., they can get it from the properties or any other source).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that the user can call the version of the method that takes ackDeadline but being able to configure the default externally could still be useful. Hardcoded defaults seem to be a bit contrary to Spring's philosophy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should have setDefaultAckDeadline().
Like we have KafkaTemplate.setDefautTopic() or RabbitTemplate.setReplyTimeout()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaultAckDeadline is now configurable through properties.
Does that address your concerns?


public PubsubAdmin(GcpProjectIdProvider projectIdProvider, TopicAdminClient topicAdminClient,
SubscriptionAdminClient subscriptionAdminClient) {
Assert.notNull(projectIdProvider, "The project ID provider can't be null.");
Assert.notNull(topicAdminClient, "The topic administration client can't be null");
Assert.notNull(subscriptionAdminClient,
"The subscription administration client can't be null");

this.projectId = projectIdProvider.getProjectId();
Assert.hasText(this.projectId, "The project ID can't be null or empty.");
this.topicAdminClient = topicAdminClient;
this.subscriptionAdminClient = subscriptionAdminClient;
}

/**
* Creates a new topic on Google Cloud Pub/Sub.
*
* @param topicName the name for the new topic
* @return the created topic
*/
public Topic createTopic(String topicName) {
Assert.hasText(topicName, "No topic name was specified.");

return this.topicAdminClient.createTopic(TopicName.create(this.projectId, topicName));
}

/**
* Deletes a topic from Google Cloud Pub/Sub.
*
* @param topicName the name of the topic to be deleted
*/
public void deleteTopic(String topicName) {
Assert.hasText(topicName, "No topic name was specified.");

this.topicAdminClient.deleteTopic(TopicName.create(this.projectId, topicName));
}

/**
* Returns every topic in a project.
*
* <p>If there are multiple pages, they will all be merged into the same result.
*/
public List<Topic> listTopics() {
PagedResponseWrappers.ListTopicsPagedResponse topicListPage =
this.topicAdminClient.listTopics(ProjectName.create(this.projectId));

return Lists.newArrayList(topicListPage.iterateAll());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the local variable topicListPage seems to imply that it only contains a single page of results. Are you sure that iterateAll() actually return all results rather than a single page? If so, maybe rename topicListPage to just topicsList.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, my comment about the variable name still applies I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would make sense for a local variable containing the result of iterateAll() to be called topicsList, not the topicAdminClient.listTopics() result. Also, the object type is ListTopicsPagedResponse, which also hints to it being a page, rather than the whole thing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"paged response" does not necessarily imply "page". You can also call it verbosely listTopicsPagedResponse. In any case, it's so minor, it's not worth discussing further. :)

}

/**
* Creates a new subscription on Google Cloud Pub/Sub.
*
* @param subscriptionName the name of the new subscription
* @param topicName the name of the topic being subscribed to
* @return the created subscription
*/
public Subscription createSubscription(String subscriptionName, String topicName) {
return createSubscription(subscriptionName, topicName, null, null);
}

/**
* Creates a new subscription on Google Cloud Pub/Sub.
*
* @param subscriptionName the name of the new subscription
* @param topicName the name of the topic being subscribed to
* @param ackDeadline deadline in seconds before a message is resent. If not provided, set to
* default of 10 seconds
* @return the created subscription
*/
public Subscription createSubscription(String subscriptionName, String topicName,
Integer ackDeadline) {
return createSubscription(subscriptionName, topicName, ackDeadline, null);
}

/**
* Creates a new subscription on Google Cloud Pub/Sub.
*
* @param subscriptionName the name of the new subscription
* @param topicName the name of the topic being subscribed to
* @param pushEndpoint URL of the service receiving the push messages. If not provided, uses
* message pulling by default
* @return the created subscription
*/
public Subscription createSubscription(String subscriptionName, String topicName,
String pushEndpoint) {
return createSubscription(subscriptionName, topicName, null, pushEndpoint);
}

/**
* Creates a new subscription on Google Cloud Pub/Sub.
*
* @param subscriptionName the name of the new subscription
* @param topicName the name of the topic being subscribed to
* @param ackDeadline deadline in seconds before a message is resent. If not provided, set to
* default of 10 seconds
* @param pushEndpoint URL of the service receiving the push messages. If not provided, uses
* message pulling by default
* @return the created subscription
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to have JavaDocs for all the public API, even if it is copy/paste.

Thanks

public Subscription createSubscription(String subscriptionName, String topicName,
Integer ackDeadline, String pushEndpoint) {
Assert.hasText(subscriptionName, "No subscription name was specified.");
Assert.hasText(topicName, "No topic name was specified.");

int finalAckDeadline = DEFAULT_ACK_DEADLINE;
if (ackDeadline != null) {
Assert.isTrue(ackDeadline >= 0,
"The acknowledgement deadline value can't be negative.");
finalAckDeadline = ackDeadline;
}

PushConfig.Builder pushConfigBuilder = PushConfig.newBuilder();
if (pushEndpoint != null) {
pushConfigBuilder.setPushEndpoint(pushEndpoint);
}

return this.subscriptionAdminClient.createSubscription(
SubscriptionName.create(this.projectId, subscriptionName),
TopicName.create(this.projectId, topicName),
pushConfigBuilder.build(),
finalAckDeadline);
}

/**
* Deletes a subscription from Google Cloud Pub/Sub.
*
* @param subscriptionName
*/
public void deleteSubscription(String subscriptionName) {
Assert.hasText(subscriptionName, "No subscription name was specified");

this.subscriptionAdminClient.deleteSubscription(
SubscriptionName.create(this.projectId, subscriptionName));
}

/**
* Returns every subscription in a project.
*
* <p>If there are multiple pages, they will all be merged into the same result.
*/
public List<Subscription> listSubscriptions() {
PagedResponseWrappers.ListSubscriptionsPagedResponse subscriptionsPage =
this.subscriptionAdminClient.listSubscriptions(ProjectName.create(this.projectId));

return Lists.newArrayList(subscriptionsPage.iterateAll());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2017 original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gcp.pubsub;

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

/**
* @author João André Martins
*/
@RunWith(MockitoJUnitRunner.class)
public class PubsubAdminTests {

@Mock
private TopicAdminClient mockTopicAdminClient;

@Mock
private SubscriptionAdminClient mockSubscriptionAdminClient;

@Test(expected = IllegalArgumentException.class)
public void testNewPubsubAdmin_nullProjectProvider() {
new PubsubAdmin(null, this.mockTopicAdminClient, this.mockSubscriptionAdminClient);
}

@Test(expected = IllegalArgumentException.class)
public void testNewPubsubAdmin_nullTopicAdminClient() {
new PubsubAdmin(() -> "test-project", null, this.mockSubscriptionAdminClient);
}

@Test(expected = IllegalArgumentException.class)
public void testNewPubsubAdmin_nullSubscriptionAdminClient() {
new PubsubAdmin(() -> "test-project", this.mockTopicAdminClient, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@

package org.springframework.cloud.gcp.pubsub.autoconfig;

import java.io.IOException;
import java.util.concurrent.Executors;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.grpc.ChannelProvider;
import com.google.api.gax.grpc.ExecutorProvider;
import com.google.api.gax.grpc.FixedExecutorProvider;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;

import org.springframework.beans.factory.annotation.Qualifier;
Expand All @@ -31,6 +34,8 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cloud.gcp.core.GcpProjectIdProvider;
import org.springframework.cloud.gcp.core.autoconfig.GcpContextAutoConfiguration;
import org.springframework.cloud.gcp.pubsub.PubsubAdmin;
import org.springframework.cloud.gcp.pubsub.core.PubsubException;
import org.springframework.cloud.gcp.pubsub.core.PubsubTemplate;
import org.springframework.cloud.gcp.pubsub.support.DefaultPublisherFactory;
import org.springframework.cloud.gcp.pubsub.support.DefaultSubscriberFactory;
Expand Down Expand Up @@ -112,4 +117,16 @@ public PublisherFactory defaultPublisherFactory(GcpProjectIdProvider projectIdPr
return new DefaultPublisherFactory(projectIdProvider, subscriberProvider, channelProvider,
credentialsProvider);
}

@Bean
@ConditionalOnMissingBean
public PubsubAdmin pubsubAdmin(GcpProjectIdProvider projectIdProvider) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, the real auto-config is missed.
That is what I mean by the comment here: GoogleCloudPlatform/java-docs-samples#755 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, not sure what you mean with the autoconfig is missed?

I removed @IntegrationComponentScan and @ComponentScan and PubsubAdmin is correctly autowired. You were right that I didn't need them, they must had been there from previous experiments...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Sorry. I thought the PubsubAdmin is in its own auto-config class.
But that is good to hear that everything is fine there already.
Since that is a sample it would be good to keep it as simple as possible to avoid unexpected questions in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agreed. It was exactly that kind of feedback I was seeking from you in the sample PR!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Sorry. I thought the PubsubAdmin is in its own auto-config class.
But that is good to hear that everything is fine there already.
Since that is a sample it would be good to keep it as simple as possible to avoid unexpected questions in the future.

try {
return new PubsubAdmin(projectIdProvider, TopicAdminClient.create(),
SubscriptionAdminClient.create());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

???
I asked before security.
How all these .create() understand with what GC user they work?

Why this default, auto-configured PubsubAdmin doesn't depend on the CredentialsProvider?

That was really my first concern when I saw the same void create() in the PubsubAdmin ctors before.

Thanks

}
catch (IOException ioe) {
throw new PubsubException("An error occurred while creating PubsubAdmin.", ioe);
}
}
}