Skip to content

Commit 03708f3

Browse files
authored
bump version to spark 3.5.6 (#187)
* bump version to spark 3.5.6 * add jackson dependency to address test failure * fix lint error
1 parent 87d447a commit 03708f3

File tree

4 files changed

+27
-7
lines changed

4 files changed

+27
-7
lines changed

pom.xml

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
<groupId>io.streamnative.connectors</groupId>
2626
<artifactId>pulsar-spark-connector_2.13</artifactId>
27-
<version>3.4.1-SNAPSHOT</version>
27+
<version>3.5.6-SNAPSHOT</version>
2828
<name>StreamNative :: Pulsar Spark Connector</name>
2929
<url>https://pulsar.apache.org</url>
3030
<inceptionYear>2019</inceptionYear>
@@ -70,10 +70,11 @@
7070
<scala.version>2.13.12</scala.version>
7171
<scala.binary.version>2.13</scala.binary.version>
7272
<scalatest.version>3.2.14</scalatest.version>
73-
<spark.version>3.4.1</spark.version>
73+
<spark.version>3.5.6</spark.version>
7474
<commons-io.version>2.19.0</commons-io.version>
7575
<testcontainers.version>1.18.3</testcontainers.version>
7676
<bouncycastle.version>1.78</bouncycastle.version>
77+
<jackson.version>2.15.2</jackson.version>
7778

7879
<!-- plugin dependencies -->
7980
<maven.version>3.5.4</maven.version>
@@ -92,6 +93,23 @@
9293

9394
<!-- dependencies for all modules -->
9495
<dependencies>
96+
<!-- Jackson dependencies - ensure version consistency for tests -->
97+
<dependency>
98+
<groupId>com.fasterxml.jackson.core</groupId>
99+
<artifactId>jackson-core</artifactId>
100+
<version>${jackson.version}</version>
101+
</dependency>
102+
<dependency>
103+
<groupId>com.fasterxml.jackson.core</groupId>
104+
<artifactId>jackson-databind</artifactId>
105+
<version>${jackson.version}</version>
106+
</dependency>
107+
<dependency>
108+
<groupId>com.fasterxml.jackson.core</groupId>
109+
<artifactId>jackson-annotations</artifactId>
110+
<version>${jackson.version}</version>
111+
</dependency>
112+
95113
<!-- pulsar dependency -->
96114
<dependency>
97115
<groupId>org.apache.pulsar</groupId>

src/main/scala/org/apache/spark/sql/pulsar/JSONParser.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,15 +367,16 @@ class JacksonRecordParser(schema: DataType, val options: JSONOptions) extends Lo
367367
case e @ (_: RuntimeException | _: JsonProcessingException | _: MalformedInputException) =>
368368
// JSON parser currently doesn't support partial results for corrupted records.
369369
// For such records, all fields other than the meta fields are set to `null`.
370-
throw BadRecordException(() => recordLiteral(record), () => None, e)
370+
throw BadRecordException(() => recordLiteral(record), () => Array.empty[InternalRow], e)
371371
case e: CharConversionException if options.encoding.isEmpty =>
372372
val msg =
373373
"""JSON parser cannot handle a character in its input.
374374
|Specifying encoding as an input option explicitly might help to resolve the issue.
375375
|""".stripMargin + e.getMessage
376376
val wrappedCharException = new CharConversionException(msg)
377377
wrappedCharException.initCause(e)
378-
throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException)
378+
throw BadRecordException(() => recordLiteral(record),
379+
() => Array.empty[InternalRow], wrappedCharException)
379380
}
380381
}
381382
}

src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.spark.SparkEnv
2323
import org.apache.spark.internal.Logging
2424
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext}
2525
import org.apache.spark.sql.catalyst.json.JSONOptionsInRead
26+
import org.apache.spark.sql.catalyst.types.DataTypeUtils
2627
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
2728
import org.apache.spark.sql.execution.streaming.{Sink, Source}
2829
import org.apache.spark.sql.pulsar.PulsarSourceUtils.reportDataLossFunc
@@ -206,7 +207,7 @@ private[pulsar] class PulsarProvider
206207
val caseInsensitiveParams = validateSinkOptions(parameters)
207208

208209
val (clientConfig, producerConfig, topic) = prepareConfForProducer(parameters)
209-
PulsarSinks.validateQuery(data.schema.toAttributes, topic)
210+
PulsarSinks.validateQuery(DataTypeUtils.toAttributes(data.schema), topic)
210211

211212
PulsarSinks.write(
212213
sqlContext.sparkSession,

src/main/scala/org/apache/spark/sql/pulsar/PulsarSinks.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ import scala.util.control.NonFatal
2020

2121
import org.apache.pulsar.client.api.{Producer, PulsarClientException, Schema}
2222

23-
import org.apache.spark.SparkEnv
2423
import org.apache.spark.internal.Logging
2524
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession, SQLContext}
2625
import org.apache.spark.sql.catalyst.expressions
2726
import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal}
27+
import org.apache.spark.sql.catalyst.types.DataTypeUtils
2828
import org.apache.spark.sql.execution.QueryExecution
2929
import org.apache.spark.sql.execution.streaming.Sink
3030
import org.apache.spark.sql.types._
@@ -44,7 +44,7 @@ private[pulsar] class PulsarSink(
4444
override def toString: String = "PulsarSink"
4545

4646
override def addBatch(batchId: Long, data: DataFrame): Unit = {
47-
PulsarSinks.validateQuery(data.schema.toAttributes, topic)
47+
PulsarSinks.validateQuery(DataTypeUtils.toAttributes(data.schema), topic)
4848

4949
if (batchId <= latestBatchId) {
5050
logInfo(s"Skipping already committed batch $batchId")

0 commit comments

Comments
 (0)