From 628d958a621d9d22c6f64e5299e70180413eee5d Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Wed, 9 Jul 2025 21:09:48 -0700 Subject: [PATCH 1/3] bump version to spark 3.5.6 --- pom.xml | 4 ++-- .../scala/org/apache/spark/sql/pulsar/JSONParser.scala | 4 ++-- .../org/apache/spark/sql/pulsar/PulsarProvider.scala | 7 +++---- .../scala/org/apache/spark/sql/pulsar/PulsarSinks.scala | 8 +++----- 4 files changed, 10 insertions(+), 13 deletions(-) diff --git a/pom.xml b/pom.xml index 548397fb..ac950651 100644 --- a/pom.xml +++ b/pom.xml @@ -24,7 +24,7 @@ io.streamnative.connectors pulsar-spark-connector_2.13 - 3.4.1-SNAPSHOT + 3.5.6-SNAPSHOT StreamNative :: Pulsar Spark Connector https://pulsar.apache.org 2019 @@ -70,7 +70,7 @@ 2.13.12 2.13 3.2.14 - 3.4.1 + 3.5.6 2.19.0 1.18.3 1.78 diff --git a/src/main/scala/org/apache/spark/sql/pulsar/JSONParser.scala b/src/main/scala/org/apache/spark/sql/pulsar/JSONParser.scala index 0288d97e..05cb8ebe 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/JSONParser.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/JSONParser.scala @@ -367,7 +367,7 @@ class JacksonRecordParser(schema: DataType, val options: JSONOptions) extends Lo case e @ (_: RuntimeException | _: JsonProcessingException | _: MalformedInputException) => // JSON parser currently doesn't support partial results for corrupted records. // For such records, all fields other than the meta fields are set to `null`. - throw BadRecordException(() => recordLiteral(record), () => None, e) + throw BadRecordException(() => recordLiteral(record), () => Array.empty[InternalRow], e) case e: CharConversionException if options.encoding.isEmpty => val msg = """JSON parser cannot handle a character in its input. @@ -375,7 +375,7 @@ class JacksonRecordParser(schema: DataType, val options: JSONOptions) extends Lo |""".stripMargin + e.getMessage val wrappedCharException = new CharConversionException(msg) wrappedCharException.initCause(e) - throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException) + throw BadRecordException(() => recordLiteral(record), () => Array.empty[InternalRow], wrappedCharException) } } } diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala index 29c00c14..12f44ace 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala @@ -15,14 +15,13 @@ package org.apache.spark.sql.pulsar import java.{util => ju} import java.util.{Locale, UUID} - import org.apache.pulsar.client.api.MessageId import org.apache.pulsar.common.naming.TopicName - import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext} +import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.json.JSONOptionsInRead +import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.pulsar.PulsarSourceUtils.reportDataLossFunc @@ -206,7 +205,7 @@ private[pulsar] class PulsarProvider val caseInsensitiveParams = validateSinkOptions(parameters) val (clientConfig, producerConfig, topic) = prepareConfForProducer(parameters) - PulsarSinks.validateQuery(data.schema.toAttributes, topic) + PulsarSinks.validateQuery(DataTypeUtils.toAttributes(data.schema), topic) PulsarSinks.write( sqlContext.sparkSession, diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSinks.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSinks.scala index b0c44d05..e5bd886b 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSinks.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSinks.scala @@ -15,16 +15,14 @@ package org.apache.spark.sql.pulsar import java.{util => ju} import java.util.concurrent.TimeUnit - import scala.util.control.NonFatal - import org.apache.pulsar.client.api.{Producer, PulsarClientException, Schema} - import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession, SQLContext} +import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext, SparkSession} import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal} +import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.types._ @@ -44,7 +42,7 @@ private[pulsar] class PulsarSink( override def toString: String = "PulsarSink" override def addBatch(batchId: Long, data: DataFrame): Unit = { - PulsarSinks.validateQuery(data.schema.toAttributes, topic) + PulsarSinks.validateQuery(DataTypeUtils.toAttributes(data.schema), topic) if (batchId <= latestBatchId) { logInfo(s"Skipping already committed batch $batchId") From 8df8804dbf699b26eb1047a17f91a805e497e8c4 Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Thu, 10 Jul 2025 09:31:04 -0700 Subject: [PATCH 2/3] add jackson dependency to address test failure --- pom.xml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pom.xml b/pom.xml index ac950651..9f341bc3 100644 --- a/pom.xml +++ b/pom.xml @@ -74,6 +74,7 @@ 2.19.0 1.18.3 1.78 + 2.15.2 3.5.4 @@ -92,6 +93,23 @@ + + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-annotations + ${jackson.version} + + org.apache.pulsar From 983ecb8e5027f69f573e017fcccf51dcdd8e85c2 Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Mon, 25 Aug 2025 15:38:52 -0700 Subject: [PATCH 3/3] fix lint error --- src/main/scala/org/apache/spark/sql/pulsar/JSONParser.scala | 3 ++- .../scala/org/apache/spark/sql/pulsar/PulsarProvider.scala | 4 +++- .../scala/org/apache/spark/sql/pulsar/PulsarSinks.scala | 6 ++++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/main/scala/org/apache/spark/sql/pulsar/JSONParser.scala b/src/main/scala/org/apache/spark/sql/pulsar/JSONParser.scala index 05cb8ebe..2f175faa 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/JSONParser.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/JSONParser.scala @@ -375,7 +375,8 @@ class JacksonRecordParser(schema: DataType, val options: JSONOptions) extends Lo |""".stripMargin + e.getMessage val wrappedCharException = new CharConversionException(msg) wrappedCharException.initCause(e) - throw BadRecordException(() => recordLiteral(record), () => Array.empty[InternalRow], wrappedCharException) + throw BadRecordException(() => recordLiteral(record), + () => Array.empty[InternalRow], wrappedCharException) } } } diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala index 12f44ace..93ac1616 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala @@ -15,11 +15,13 @@ package org.apache.spark.sql.pulsar import java.{util => ju} import java.util.{Locale, UUID} + import org.apache.pulsar.client.api.MessageId import org.apache.pulsar.common.naming.TopicName + import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext, SaveMode, SparkSession} +import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.json.JSONOptionsInRead import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSinks.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSinks.scala index e5bd886b..a1376e5d 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSinks.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSinks.scala @@ -15,11 +15,13 @@ package org.apache.spark.sql.pulsar import java.{util => ju} import java.util.concurrent.TimeUnit + import scala.util.control.NonFatal + import org.apache.pulsar.client.api.{Producer, PulsarClientException, Schema} -import org.apache.spark.SparkEnv + import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext, SparkSession} +import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal} import org.apache.spark.sql.catalyst.types.DataTypeUtils