Skip to content

Commit 1cdccef

Browse files
authored
deps: update dependency com.google.cloud:google-cloud-pubsublite to v1.15.10 (#716)
* deps: update dependency com.google.cloud:google-cloud-pubsublite to v1.15.10 * Update pom.xml * deps: update dependency com.google.cloud:google-cloud-pubsublite to v1.15.10 * Deps: update samples and snippets * Fix PSL errors from update * Add undeclared dependencies * Add version for flogger * Fix lint
1 parent 510dd08 commit 1cdccef

File tree

10 files changed

+35
-22
lines changed

10 files changed

+35
-22
lines changed

pom.xml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<parent>
44
<groupId>com.google.cloud</groupId>
55
<artifactId>google-cloud-pubsublite-parent</artifactId>
6-
<version>1.9.4</version>
6+
<version>1.15.10</version>
77
</parent>
88
<modelVersion>4.0.0</modelVersion>
99
<artifactId>pubsublite-spark-sql-streaming</artifactId>
@@ -144,6 +144,15 @@
144144
<groupId>com.google.api</groupId>
145145
<artifactId>gax</artifactId>
146146
</dependency>
147+
<dependency>
148+
<groupId>com.google.api</groupId>
149+
<artifactId>gax-grpc</artifactId>
150+
</dependency>
151+
<dependency>
152+
<groupId>com.google.flogger</groupId>
153+
<artifactId>flogger</artifactId>
154+
<version>0.8</version>
155+
</dependency>
147156
<dependency>
148157
<groupId>com.google.auto.value</groupId>
149158
<artifactId>auto-value-annotations</artifactId>

samples/snapshot/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
<dependency>
4545
<groupId>com.google.cloud</groupId>
4646
<artifactId>google-cloud-pubsublite</artifactId>
47-
<version>1.9.4</version>
47+
<version>1.15.10</version>
4848
</dependency>
4949
<dependency>
5050
<groupId>com.google.cloud</groupId>

samples/snippets/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
<dependency>
4545
<groupId>com.google.cloud</groupId>
4646
<artifactId>google-cloud-pubsublite</artifactId>
47-
<version>1.9.4</version>
47+
<version>1.15.10</version>
4848
</dependency>
4949
<dependency>
5050
<groupId>com.google.cloud</groupId>

src/main/java/com/google/cloud/pubsublite/spark/PslContinuousPartitionReader.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,10 @@ public boolean next() {
5656
subscriber.onData().get();
5757
// since next() will not be called concurrently, we are sure that the message
5858
// is available to this thread.
59-
Optional<SequencedMessage> msg = subscriber.messageIfAvailable();
60-
checkState(msg.isPresent());
61-
currentMsg = msg.get();
59+
Optional<com.google.cloud.pubsublite.proto.SequencedMessage> proto_msg =
60+
subscriber.messageIfAvailable();
61+
checkState(proto_msg.isPresent());
62+
currentMsg = SequencedMessage.fromProto(proto_msg.get());
6263
currentOffset =
6364
SparkPartitionOffset.builder()
6465
.partition(currentOffset.partition())

src/main/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReader.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@ public boolean next() {
5757
if (batchFulfilled) {
5858
return false;
5959
}
60-
Optional<SequencedMessage> msg;
60+
Optional<com.google.cloud.pubsublite.proto.SequencedMessage> proto_msg;
6161
while (true) {
6262
try {
6363
subscriber.onData().get(SUBSCRIBER_PULL_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
64-
msg = subscriber.messageIfAvailable();
64+
proto_msg = subscriber.messageIfAvailable();
6565
break;
6666
} catch (TimeoutException e) {
6767
log.atWarning().log(
@@ -76,8 +76,8 @@ public boolean next() {
7676
}
7777
// since next() is only called on one thread at a time, we are sure that the message is
7878
// available to this thread.
79-
checkState(msg.isPresent());
80-
currentMsg = msg.get();
79+
checkState(proto_msg.isPresent());
80+
currentMsg = SequencedMessage.fromProto(proto_msg.get());
8181
if (currentMsg.offset().value() == endOffset.offset()) {
8282
// this is the last msg for the batch.
8383
batchFulfilled = true;

src/main/java/com/google/cloud/pubsublite/spark/PslSparkUtils.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.cloud.pubsublite.Partition;
2525
import com.google.cloud.pubsublite.SequencedMessage;
2626
import com.google.cloud.pubsublite.SubscriptionPath;
27+
import com.google.cloud.pubsublite.proto.PubSubMessage;
2728
import com.google.common.annotations.VisibleForTesting;
2829
import com.google.common.collect.ImmutableListMultimap;
2930
import com.google.common.collect.ListMultimap;
@@ -116,7 +117,7 @@ private static <T> void extractVal(
116117
}
117118

118119
@SuppressWarnings("CheckReturnValue")
119-
public static Message toPubSubMessage(StructType inputSchema, InternalRow row) {
120+
public static PubSubMessage toPubSubMessage(StructType inputSchema, InternalRow row) {
120121
Message.Builder builder = Message.builder();
121122
extractVal(
122123
inputSchema,
@@ -159,7 +160,7 @@ public static Message toPubSubMessage(StructType inputSchema, InternalRow row) {
159160
}));
160161
builder.setAttributes(attributeMapBuilder.build());
161162
});
162-
return builder.build();
163+
return builder.build().toProto();
163164
}
164165

165166
/**

src/main/java/com/google/cloud/pubsublite/spark/internal/PartitionSubscriberFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
import com.google.api.gax.rpc.ApiException;
2020
import com.google.cloud.pubsublite.Offset;
2121
import com.google.cloud.pubsublite.Partition;
22-
import com.google.cloud.pubsublite.SequencedMessage;
2322
import com.google.cloud.pubsublite.internal.wire.Subscriber;
23+
import com.google.cloud.pubsublite.proto.SequencedMessage;
2424
import java.io.Serializable;
2525
import java.util.List;
2626
import java.util.function.Consumer;

src/test/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReaderTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,15 @@ public void testPartitionReader() throws Exception {
6868

6969
// Multiple get w/o next will return same msg.
7070
when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null));
71-
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1));
71+
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1.toProto()));
7272
assertThat(reader.next()).isTrue();
7373
verifyInternalRow(reader.get(), 10L);
7474
verifyInternalRow(reader.get(), 10L);
7575
assertThat(((SparkPartitionOffset) reader.getOffset()).offset()).isEqualTo(10L);
7676

7777
// Next will advance to next msg.
7878
when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null));
79-
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2));
79+
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2.toProto()));
8080
assertThat(reader.next()).isTrue();
8181
verifyInternalRow(reader.get(), 13L);
8282
assertThat(((SparkPartitionOffset) reader.getOffset()).offset()).isEqualTo(13L);

src/test/java/com/google/cloud/pubsublite/spark/PslMicroBatchInputPartitionReaderTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,14 @@ public void testPartitionReader() throws Exception {
7272

7373
// Multiple get w/o next will return same msg.
7474
when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null));
75-
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1));
75+
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1.toProto()));
7676
assertThat(reader.next()).isTrue();
7777
verifyInternalRow(reader.get(), 10L);
7878
verifyInternalRow(reader.get(), 10L);
7979

8080
// Next will advance to next msg which is also the last msg in the batch.
8181
when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null));
82-
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2));
82+
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2.toProto()));
8383
assertThat(reader.next()).isTrue();
8484
verifyInternalRow(reader.get(), 14L);
8585

@@ -96,14 +96,14 @@ public void testPartitionReaderNewMessageExceedsRange() throws Exception {
9696

9797
// Multiple get w/o next will return same msg.
9898
when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null));
99-
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1));
99+
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1.toProto()));
100100
assertThat(reader.next()).isTrue();
101101
verifyInternalRow(reader.get(), 10L);
102102
verifyInternalRow(reader.get(), 10L);
103103

104104
// Next will advance to next msg, and recognize it's out of the batch range.
105105
when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null));
106-
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2));
106+
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2.toProto()));
107107
assertThat(reader.next()).isFalse();
108108
}
109109
}

src/test/java/com/google/cloud/pubsublite/spark/PslSparkUtilsTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.cloud.pubsublite.Partition;
2626
import com.google.cloud.pubsublite.SequencedMessage;
2727
import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
28+
import com.google.cloud.pubsublite.proto.PubSubMessage;
2829
import com.google.common.collect.ImmutableListMultimap;
2930
import com.google.common.collect.ImmutableMap;
3031
import com.google.protobuf.ByteString;
@@ -151,12 +152,13 @@ public void testToPubSubMessage() {
151152
new StructField("random_extra_field", DataTypes.BinaryType, false, Metadata.empty())
152153
});
153154

154-
assertThat(message).isEqualTo(PslSparkUtils.toPubSubMessage(structType, row));
155+
assertThat(message.toProto()).isEqualTo(PslSparkUtils.toPubSubMessage(structType, row));
155156
}
156157

157158
@Test
158159
public void testToPubSubMessageLongForEventTimestamp() {
159-
Message expectedMsg = Message.builder().setEventTime(Timestamps.fromMicros(100000L)).build();
160+
PubSubMessage expectedMsg =
161+
Message.builder().setEventTime(Timestamps.fromMicros(100000L)).build().toProto();
160162

161163
StructType structType =
162164
new StructType(
@@ -166,7 +168,7 @@ public void testToPubSubMessageLongForEventTimestamp() {
166168
List<Object> list = Collections.singletonList(100000L);
167169
InternalRow row = InternalRow.apply(asScalaBufferConverter(list).asScala());
168170

169-
Message message = PslSparkUtils.toPubSubMessage(structType, row);
171+
PubSubMessage message = PslSparkUtils.toPubSubMessage(structType, row);
170172
assertThat(message).isEqualTo(expectedMsg);
171173
}
172174

0 commit comments

Comments
 (0)