2626#include  <sys/stat.h> 
2727#include  <unistd.h> 
2828
29+ #include  "access/htup_details.h" 
2930#include  "access/sysattr.h" 
3031#include  "access/reloptions.h" 
3132#if  PG_VERSION_NUM  >= 120000 
4849#include  "parser/parsetree.h" 
4950#include  "storage/ipc.h" 
5051#include  "utils/builtins.h" 
52+ #include  "utils/datum.h" 
5153#include  "utils/guc.h" 
5254#include  "utils/lsyscache.h" 
5355#include  "utils/memutils.h" 
56+ #include  "utils/syscache.h" 
5457
5558/* Declarations for dynamic loading */ 
5659PG_MODULE_MAGIC ;
@@ -215,6 +218,7 @@ static int interactive_timeout = INTERACTIVE_TIMEOUT;
215218static  void  mysql_error_print (MYSQL  * conn );
216219static  void  mysql_stmt_error_print (MySQLFdwExecState  * festate ,
217220								   const  char  * msg );
221+ static  List  * getUpdateTargetAttrs (RangeTblEntry  * rte );
218222
219223/* 
220224 * mysql_load_library function dynamically load the mysql's library 
@@ -1186,11 +1190,32 @@ mysqlPlanForeignModify(PlannerInfo *root,
11861190				(errcode (ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION ),
11871191				 errmsg ("first column of remote table must be unique for INSERT/UPDATE/DELETE operation" )));
11881192
1189- 	if  (operation  ==  CMD_INSERT )
1193+ 	/* 
1194+ 	 * In an INSERT, we transmit all columns that are defined in the foreign 
1195+ 	 * table.  In an UPDATE, if there are BEFORE ROW UPDATE triggers on the 
1196+ 	 * foreign table, we transmit all columns like INSERT; else we transmit 
1197+ 	 * only columns that were explicitly targets of the UPDATE, so as to avoid 
1198+ 	 * unnecessary data transmission.  (We can't do that for INSERT since we 
1199+ 	 * would miss sending default values for columns not listed in the source 
1200+ 	 * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since 
1201+ 	 * those triggers might change values for non-target columns, in which 
1202+ 	 * case we would miss sending changed values for those columns.) 
1203+ 	 */ 
1204+ 	if  (operation  ==  CMD_INSERT  || 
1205+ 		(operation  ==  CMD_UPDATE  && 
1206+ 		 rel -> trigdesc  && 
1207+ 		 rel -> trigdesc -> trig_update_before_row ))
11901208	{
11911209		TupleDesc 	tupdesc  =  RelationGetDescr (rel );
11921210		int 			attnum ;
11931211
1212+ 		/* 
1213+ 		 * If it is an UPDATE operation, check for row identifier column in 
1214+ 		 * target attribute list by calling getUpdateTargetAttrs(). 
1215+ 		 */ 
1216+ 		if  (operation  ==  CMD_UPDATE )
1217+ 			getUpdateTargetAttrs (rte );
1218+ 
11941219		for  (attnum  =  1 ; attnum  <= tupdesc -> natts ; attnum ++ )
11951220		{
11961221			Form_pg_attribute  attr  =  TupleDescAttr (tupdesc , attnum  -  1 );
@@ -1201,27 +1226,7 @@ mysqlPlanForeignModify(PlannerInfo *root,
12011226	}
12021227	else  if  (operation  ==  CMD_UPDATE )
12031228	{
1204- #if  PG_VERSION_NUM  >= 90500 
1205- 		Bitmapset   * tmpset  =  bms_copy (rte -> updatedCols );
1206- #else 
1207- 		Bitmapset   * tmpset  =  bms_copy (rte -> modifiedCols );
1208- #endif 
1209- 		AttrNumber 	col ;
1210- 
1211- 		while  ((col  =  bms_first_member (tmpset )) >= 0 )
1212- 		{
1213- 			col  +=  FirstLowInvalidHeapAttributeNumber ;
1214- 			if  (col  <= InvalidAttrNumber )	/* shouldn't happen */ 
1215- 				elog (ERROR , "system-column update is not supported" );
1216- 
1217- 			/* 
1218- 			 * We also disallow updates to the first column 
1219- 			 */ 
1220- 			if  (col  ==  1 )		/* shouldn't happen */ 
1221- 				elog (ERROR , "row identifier column update is not supported" );
1222- 
1223- 			targetAttrs  =  lappend_int (targetAttrs , col );
1224- 		}
1229+ 		targetAttrs  =  getUpdateTargetAttrs (rte );
12251230		/* We also want the rowid column to be available for the update */ 
12261231		targetAttrs  =  lcons_int (1 , targetAttrs );
12271232	}
@@ -1437,6 +1442,10 @@ mysqlExecForeignUpdate(EState *estate,
14371442	Datum 		value ;
14381443	int 			n_params ;
14391444	bool 	   * isnull ;
1445+ 	Datum 		new_value ;
1446+ 	HeapTuple 	tuple ;
1447+ 	Form_pg_attribute  attr ;
1448+ 	bool 		found_row_id_col  =  false;
14401449
14411450	n_params  =  list_length (fmstate -> retrieved_attrs );
14421451
@@ -1449,9 +1458,16 @@ mysqlExecForeignUpdate(EState *estate,
14491458		int 			attnum  =  lfirst_int (lc );
14501459		Oid 			type ;
14511460
1452- 		/* first attribute cannot be in target list attribute */ 
1461+ 		/* 
1462+ 		 * The first attribute cannot be in the target list attribute.  Set the 
1463+ 		 * found_row_id_col to true once we find it so that we can fetch the 
1464+ 		 * value later. 
1465+ 		 */ 
14531466		if  (attnum  ==  1 )
1467+ 		{
1468+ 			found_row_id_col  =  true;
14541469			continue ;
1470+ 		}
14551471
14561472		type  =  TupleDescAttr (slot -> tts_tupleDescriptor , attnum  -  1 )-> atttypid ;
14571473		value  =  slot_getattr (slot , attnum , (bool  * ) (& isnull [bindnum ]));
@@ -1461,9 +1477,62 @@ mysqlExecForeignUpdate(EState *estate,
14611477		bindnum ++ ;
14621478	}
14631479
1464- 	/* Get the id that was passed up as a resjunk column */ 
1480+ 	/* 
1481+ 	 * Since we add a row identifier column in the target list always, so 
1482+ 	 * found_row_id_col flag should be true. 
1483+ 	 */ 
1484+ 	if  (!found_row_id_col )
1485+ 		elog (ERROR , "missing row identifier column value in UPDATE" );
1486+ 
1487+ 	new_value  =  slot_getattr (slot , 1 , & is_null );
1488+ 
1489+ 	/* 
1490+ 	 * Get the row identifier column value that was passed up as a resjunk 
1491+ 	 * column and compare that value with the new value to identify if that 
1492+ 	 * value is changed. 
1493+ 	 */ 
14651494	value  =  ExecGetJunkAttribute (planSlot , 1 , & is_null );
1466- 	typeoid  =  get_atttype (foreignTableId , 1 );
1495+ 
1496+ 	tuple  =  SearchSysCache2 (ATTNUM ,
1497+ 							ObjectIdGetDatum (foreignTableId ),
1498+ 							Int16GetDatum (1 ));
1499+ 	if  (!HeapTupleIsValid (tuple ))
1500+ 		elog (ERROR , "cache lookup failed for attribute %d of relation %u" ,
1501+ 			 1 , foreignTableId );
1502+ 
1503+ 	attr  =  (Form_pg_attribute ) GETSTRUCT (tuple );
1504+ 	typeoid  =  attr -> atttypid ;
1505+ 
1506+ 	if  (DatumGetPointer (new_value ) !=  NULL  &&  DatumGetPointer (value ) !=  NULL )
1507+ 	{
1508+ 		Datum 		n_value  =  new_value ;
1509+ 		Datum  		o_value  =  value ;
1510+ 
1511+ 		/* If the attribute type is varlena then need to detoast the datums. */ 
1512+ 		if  (attr -> attlen  ==  -1 )
1513+ 		{
1514+ 			n_value  =  PointerGetDatum (PG_DETOAST_DATUM (new_value ));
1515+ 			o_value  =  PointerGetDatum (PG_DETOAST_DATUM (value ));
1516+ 		}
1517+ 
1518+ 		if  (!datumIsEqual (o_value , n_value , attr -> attbyval , attr -> attlen ))
1519+ 			ereport (ERROR ,
1520+ 					(errcode (ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION ),
1521+ 					 errmsg ("row identifier column update is not supported" )));
1522+ 
1523+ 		/* Free memory if it's a copy made above */ 
1524+ 		if  (DatumGetPointer (n_value ) !=  DatumGetPointer (new_value ))
1525+ 			pfree (DatumGetPointer (n_value ));
1526+ 		if  (DatumGetPointer (o_value ) !=  DatumGetPointer (value ))
1527+ 			pfree (DatumGetPointer (o_value ));
1528+ 	}
1529+ 	else  if  (!(DatumGetPointer (new_value ) ==  NULL  && 
1530+ 			   DatumGetPointer (value ) ==  NULL ))
1531+ 		ereport (ERROR ,
1532+ 				(errcode (ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION ),
1533+ 				 errmsg ("row identifier column update is not supported" )));
1534+ 
1535+ 	ReleaseSysCache (tuple );
14671536
14681537	/* Bind qual */ 
14691538	mysql_bind_sql_var (typeoid , bindnum , value , mysql_bind_buffer , & is_null );
@@ -2009,3 +2078,37 @@ mysql_stmt_error_print(MySQLFdwExecState *festate, const char *msg)
20092078			break ;
20102079	}
20112080}
2081+ 
2082+ /* 
2083+  * getUpdateTargetAttrs 
2084+  * 		Returns the list of attribute numbers of the columns being updated. 
2085+  */ 
2086+ static  List  * 
2087+ getUpdateTargetAttrs (RangeTblEntry  * rte )
2088+ {
2089+ 	List 	   * targetAttrs  =  NIL ;
2090+ 
2091+ #if  PG_VERSION_NUM  >= 90500 
2092+ 	Bitmapset   * tmpset  =  bms_copy (rte -> updatedCols );
2093+ #else 
2094+ 	Bitmapset   * tmpset  =  bms_copy (rte -> modifiedCols );
2095+ #endif 
2096+ 	AttrNumber 	col ;
2097+ 
2098+ 	while  ((col  =  bms_first_member (tmpset )) >= 0 )
2099+ 	{
2100+ 		col  +=  FirstLowInvalidHeapAttributeNumber ;
2101+ 		if  (col  <= InvalidAttrNumber )	/* shouldn't happen */ 
2102+ 			elog (ERROR , "system-column update is not supported" );
2103+ 
2104+ 		/* We also disallow updates to the first column */ 
2105+ 		if  (col  ==  1 )
2106+ 			ereport (ERROR ,
2107+ 				(errcode (ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION ),
2108+ 				 errmsg ("row identifier column update is not supported" )));
2109+ 
2110+ 		targetAttrs  =  lappend_int (targetAttrs , col );
2111+ 	}
2112+ 
2113+ 	return  targetAttrs ;
2114+ }
0 commit comments