Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
46e4f57
add batchSize parameter for controlling bulk upload batch size. defau…
unintellisense Jun 28, 2020
144f9f9
update readme
unintellisense Jun 28, 2020
e3f58b0
Merge pull request #1 from loanpal-engineering/add-batch-size-update
unintellisense Jun 28, 2020
bd21e6a
use updated wave-api dependency
unintellisense Jul 12, 2020
707921d
null instead of empty for null objects
unintellisense Jul 14, 2020
af32243
cast null as null
unintellisense Jul 14, 2020
6b4fb6d
use master
unintellisense Jul 14, 2020
eda9081
Merge pull request #2 from loanpal-engineering/nulls-not-empty
unintellisense Jul 14, 2020
131c4f3
spark 3.0/scala 2.12 compatibility
SKinserLoanpal Oct 1, 2020
67ec60a
change to jitpack
SKinserLoanpal Oct 8, 2020
b8c10ea
Merge pull request #3 from loanpal-engineering/spark-3.0
SKinserLoanpal Oct 8, 2020
32effa7
swap out spark version
SKinserLoanpal Oct 8, 2020
0e62d2a
Merge pull request #4 from loanpal-engineering/spark-3.0
SKinserLoanpal Oct 8, 2020
8f72643
use salesforce magic null string value
SKinserLoanpal Feb 1, 2021
e9b423c
maybe empty string is dumb
SKinserLoanpal Feb 2, 2021
d934530
make note of reading/writing
SKinserLoanpal Feb 2, 2021
dedec34
switch to cast
SKinserLoanpal Feb 2, 2021
5f4437f
fix
SKinserLoanpal Feb 2, 2021
6770197
a
SKinserLoanpal Feb 2, 2021
c1d8e0b
add empty string na
SKinserLoanpal Feb 2, 2021
2451cb6
make all null values #N/A aside from dates
SKinserLoanpal Feb 2, 2021
7e068e5
fix dumb mistake
SKinserLoanpal Feb 2, 2021
03491b1
add special case for booleans
SKinserLoanpal Feb 2, 2021
3f9358d
dumb mistake
SKinserLoanpal Feb 2, 2021
f7ed7f2
fix data writer
SKinserLoanpal Feb 2, 2021
99c125d
fix
SKinserLoanpal Feb 2, 2021
4661c15
delete commented code
SKinserLoanpal Feb 3, 2021
e226b20
Merge pull request #5 from loanpal-engineering/null-na
SKinserLoanpal Feb 4, 2021
b6fe136
add support for max column width to csv parser (#6)
unintellisense Apr 27, 2022
7581bf8
shade dependency
SKinserLoanpal Nov 15, 2022
6706341
Merge pull request #8 from loanpal-engineering/spark-3.3.0
SKinserLoanpal Nov 15, 2022
b19abd0
support pkChunks with filtering (handling empty batches)
mweldon-loanpal Mar 17, 2023
fab2d96
Merge pull request #9 from loanpal-engineering/BI-10792
unintellisense Mar 17, 2023
6a658f1
Create codeowners
gwadley-goodleap May 24, 2023
850f035
bulk api 2.0
SKinserLoanpal Nov 29, 2023
e1d7e59
fix
SKinserLoanpal Jan 11, 2024
9a04a88
Merge pull request #10 from loanpal-engineering/spark-3.3.0
SKinserLoanpal Jan 11, 2024
7316f09
change force api to 53
SKinserLoanpal Jan 16, 2024
2d727e8
Merge pull request #11 from loanpal-engineering/spark-3.3.0
SKinserLoanpal Jan 16, 2024
8ccd432
fix max columns
SKinserLoanpal May 23, 2024
bd9c2fa
Merge pull request #14 from loanpal-engineering/spark-3.3.0
SKinserLoanpal May 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ $ bin/spark-shell --packages com.springml:spark-salesforce_2.11:1.1.3
* `timeout`: (Optional) The maximum time spent polling for the completion of bulk query job. This option can only be used when `bulk` is `true`.
* `externalIdFieldName`: (Optional) The name of the field used as the external ID for Salesforce Object. This value is only used when doing an update or upsert. Default "Id".
* `queryAll`: (Optional) Toggle to retrieve deleted and archived records for SOQL queries. Default value is `false`.
### Options only supported for fetching Salesforce Objects.
* `batchSize`: (Optional) maximum number of records per batch when performing updates. Defaults to 5000 (note that batches greater than 10000 will result in a error)


### Scala API
Expand Down
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ organization := "com.springml"
scalaVersion := "2.11.8"

resolvers += "sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/"
resolvers += "jitpack" at "https://jitpack.io"

libraryDependencies ++= Seq(
"com.force.api" % "force-wsc" % "40.0.0",
"com.force.api" % "force-partner-api" % "40.0.0",
"com.springml" % "salesforce-wave-api" % "1.0.10",
"com.github.loanpal-engineering" % "salesforce-wave-api" % "eb71436",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@unintellisense Wondering, is this related to this change? Why do you need to change to a fork of the api client?

"org.mockito" % "mockito-core" % "2.0.31-beta"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ case class DatasetRelation(

private def cast(fieldValue: String, toType: DataType,
nullable: Boolean = true, fieldName: String): Any = {
if (fieldValue == "" && nullable && !toType.isInstanceOf[StringType]) {
if (fieldValue == null)
null
else if (fieldValue == "" && nullable && !toType.isInstanceOf[StringType]) {
null
} else {
toType match {
Expand Down
15 changes: 13 additions & 2 deletions src/main/scala/com/springml/spark/salesforce/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}

import scala.collection.mutable.ListBuffer
import scala.util.{Failure, Success, Try}

/**
* Default source for Salesforce wave data source.
Expand Down Expand Up @@ -123,6 +124,15 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr
val encodeFields = parameters.get("encodeFields")
val monitorJob = parameters.getOrElse("monitorJob", "false")
val externalIdFieldName = parameters.getOrElse("externalIdFieldName", "Id")
val batchSizeStr = parameters.getOrElse("batchSize", "5000")
val batchSize = Try(batchSizeStr.toInt) match {
case Success(v)=> v
case Failure(e)=> {
val errorMsg = "batchSize parameter not an integer."
logger.error(errorMsg)
throw new Exception(errorMsg)
}
}

validateMutualExclusive(datasetName, sfObject, "datasetName", "sfObject")

Expand All @@ -141,7 +151,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr
} else {
logger.info("Updating Salesforce Object")
updateSalesforceObject(username, password, login, version, sfObject.get, mode,
flag(upsert, "upsert"), externalIdFieldName, data)
flag(upsert, "upsert"), externalIdFieldName, batchSize, data)
}

return createReturnRelation(data)
Expand All @@ -156,6 +166,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr
mode: SaveMode,
upsert: Boolean,
externalIdFieldName: String,
batchSize: Integer,
data: DataFrame) {

val csvHeader = Utils.csvHeadder(data.schema)
Expand All @@ -164,7 +175,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with Cr
val repartitionedRDD = Utils.repartition(data.rdd)
logger.info("no of partitions after repartitioning is " + repartitionedRDD.partitions.length)

val writer = new SFObjectWriter(username, password, login, version, sfObject, mode, upsert, externalIdFieldName, csvHeader)
val writer = new SFObjectWriter(username, password, login, version, sfObject, mode, upsert, externalIdFieldName, csvHeader, batchSize)
logger.info("Writing data")
val successfulWrite = writer.writeData(repartitionedRDD)
logger.info(s"Writing data was successful was $successfulWrite")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,24 @@ class SFObjectWriter (
val mode: SaveMode,
val upsert: Boolean,
val externalIdFieldName: String,
val csvHeader: String
val csvHeader: String,
val batchSize: Integer
) extends Serializable {

@transient val logger = Logger.getLogger(classOf[SFObjectWriter])

def writeData(rdd: RDD[Row]): Boolean = {
val csvRDD = rdd.map(row => row.toSeq.map(value => Utils.rowValue(value)).mkString(","))

val partitionCnt = (1 + csvRDD.count() / batchSize).toInt
val partitionedRDD = csvRDD.repartition(partitionCnt)
Comment on lines +44 to +45
Copy link

@mosche mosche Sep 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to be the right place to repartition as it's just leading to a 2nd round of shuffling the data around :/ Partitioning to control the size of ingest batches is already done in Utils.repartition, so the limit of records per batch should be considered there:
https://github.com/springml/spark-salesforce/pull/59/files#diff-b359f3e710dff2341dbedadb012b9ff4R62-R73

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A PR for the alternative approach is here #59


val jobInfo = new JobInfo(WaveAPIConstants.STR_CSV, sfObject, operation(mode, upsert))
jobInfo.setExternalIdFieldName(externalIdFieldName)

val jobId = bulkAPI.createJob(jobInfo).getId

csvRDD.mapPartitionsWithIndex {
partitionedRDD.mapPartitionsWithIndex {
case (index, iterator) => {
val records = iterator.toArray.mkString("\n")
var batchInfoId : String = null
Expand Down