Skip to content
This repository was archived by the owner on Feb 27, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ class SQLServerBulkJdbcOptions(val params: CaseInsensitiveMap[String])
val schemaCheckEnabled =
params.getOrElse("schemaCheckEnabled", "true").toBoolean

// user input column names array to match dataframe
val columnsToWrite =
params.getOrElse("columnsToWrite", "").toString

// Not a feature
// Only used for internally testing data idempotency
val testDataIdempotency =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,46 +180,31 @@ object BulkCopyUtils extends Logging {
}

/**
* getComputedCols
* utility function to get computed columns.
* Use computed column names to exclude computed column when matching schema.
* getAutoCols
* utility function to get auto generated columns.
* Use auto generated column names to exclude them when matching schema.
*/
private[spark] def getComputedCols(
private[spark] def getAutoCols(
conn: Connection,
table: String): List[String] = {
val queryStr = s"SELECT name FROM sys.computed_columns WHERE object_id = OBJECT_ID('${table}');"
val computedColRs = conn.createStatement.executeQuery(queryStr)
val computedCols = ListBuffer[String]()
while (computedColRs.next()) {
val colName = computedColRs.getString("name")
computedCols.append(colName)
// auto cols union computed cols, generated always cols, and node / edge table auto cols
val queryStr = s"""SELECT name
FROM sys.columns
WHERE object_id = OBJECT_ID('${table}')
AND (is_computed = 1 -- computed column
OR generated_always_type > 0 -- generated always / temporal table
OR (is_hidden = 0 AND graph_type = 2)) -- graph table
"""

val autoColRs = conn.createStatement.executeQuery(queryStr)
val autoCols = ListBuffer[String]()
while (autoColRs.next()) {
val colName = autoColRs.getString("name")
autoCols.append(colName)
}
computedCols.toList
autoCols.toList
}

/**
* dfComputedColCount
* utility function to get number of computed columns in dataframe.
* Use number of computed columns in dataframe to get number of non computed column in df,
* and compare with the number of non computed column in sql table
*/
private[spark] def dfComputedColCount(
dfColNames: List[String],
computedCols: List[String],
dfColCaseMap: Map[String, String],
isCaseSensitive: Boolean): Int ={
var dfComputedColCt = 0
for (j <- 0 to computedCols.length-1){
if (isCaseSensitive && dfColNames.contains(computedCols(j)) ||
!isCaseSensitive && dfColCaseMap.contains(computedCols(j).toLowerCase())
&& dfColCaseMap(computedCols(j).toLowerCase()) == computedCols(j)) {
dfComputedColCt += 1
}
}
dfComputedColCt
}


/**
* getColMetadataMap
* Utility function convert result set meta data to array.
Expand Down Expand Up @@ -263,7 +248,7 @@ object BulkCopyUtils extends Logging {
val colMetaData = {
if(checkSchema) {
checkExTableType(conn, options)
matchSchemas(conn, options.dbtable, df, rs, options.url, isCaseSensitive, options.schemaCheckEnabled)
matchSchemas(conn, options.dbtable, df, rs, options.url, isCaseSensitive, options.schemaCheckEnabled, options.columnsToWrite)
} else {
defaultColMetadataMap(rs.getMetaData())
}
Expand All @@ -289,6 +274,7 @@ object BulkCopyUtils extends Logging {
* @param url: String,
* @param isCaseSensitive: Boolean
* @param strictSchemaCheck: Boolean
* @param columnsToWrite: String
*/
private[spark] def matchSchemas(
conn: Connection,
Expand All @@ -297,39 +283,42 @@ object BulkCopyUtils extends Logging {
rs: ResultSet,
url: String,
isCaseSensitive: Boolean,
strictSchemaCheck: Boolean): Array[ColumnMetadata]= {
strictSchemaCheck: Boolean,
columnsToWrite: String): Array[ColumnMetadata]= {
val dfColCaseMap = (df.schema.fieldNames.map(item => item.toLowerCase)
zip df.schema.fieldNames.toList).toMap
val dfCols = df.schema

val tableCols = getSchema(rs, JdbcDialects.get(url))
val computedCols = getComputedCols(conn, dbtable)
val autoCols = getAutoCols(conn, dbtable)

val columnsToWriteSet = columnsToWrite.split(",").toSet
logDebug(s"columnsToWrite: $columnsToWriteSet")

val prefix = "Spark Dataframe and SQL Server table have differing"

if (computedCols.length == 0) {
assertIfCheckEnabled(dfCols.length == tableCols.length, strictSchemaCheck,
s"${prefix} numbers of columns")
} else if (strictSchemaCheck) {
val dfColNames = df.schema.fieldNames.toList
val dfComputedColCt = dfComputedColCount(dfColNames, computedCols, dfColCaseMap, isCaseSensitive)
// if df has computed column(s), check column length using non computed column in df and table.
// non computed column number in df: dfCols.length - dfComputedColCt
// non computed column number in table: tableCols.length - computedCols.length
assertIfCheckEnabled(dfCols.length-dfComputedColCt == tableCols.length-computedCols.length, strictSchemaCheck,
s"${prefix} numbers of columns")
}
// auto columns should not exist in df
assertIfCheckEnabled(dfCols.length + autoCols.length == tableCols.length, strictSchemaCheck,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check this only when columnsToWriteSet.isEmpty?

s"${prefix} numbers of columns")

// if columnsToWrite provided by user, use it for metadata mapping. If not, use sql table.
if (columnsToWrite == "") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use columnsToWriteSet.isEmpty as you've already converted the string.

val result = new Array[ColumnMetadata](columnsToWriteSet.size)
} else {
val result = new Array[ColumnMetadata](tableCols.length - autoCols.length)
}

val result = new Array[ColumnMetadata](tableCols.length - computedCols.length)
var nonAutoColIndex = 0

for (i <- 0 to tableCols.length-1) {
val tableColName = tableCols(i).name
var dfFieldIndex = -1
// set dfFieldIndex = -1 for all computed columns to skip ColumnMetadata
if (computedCols.contains(tableColName)) {
logDebug(s"skipping computed col index $i col name $tableColName dfFieldIndex $dfFieldIndex")
if (!columnsToWriteSet.isEmpty && !columnsToWriteSet.contains(tableColName)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think when columnsToWrite is provided, we should completely ignore auto columns...

// if columnsToWrite provided, and column name not in it, skip column mapping and ColumnMetadata
logDebug(s"skipping col index $i col name $tableColName, user not provided in columnsToWrite list")
} else if (autoCols.contains(tableColName)) {
// if auto columns, skip column mapping and ColumnMetadata
logDebug(s"skipping auto generated col index $i col name $tableColName dfFieldIndex $dfFieldIndex")
}else{
var dfColName:String = ""
if (isCaseSensitive) {
Expand Down