5353 }
5454)
5555
56+ # BQ managed functions (@udf) currently only support Python 3.11.
57+ _MANAGED_FUNC_PYTHON_VERSION = "python-3.11"
58+
5659
5760class FunctionClient :
5861 # Wait time (in seconds) for an IAM binding to take effect after creation.
@@ -193,11 +196,22 @@ def provision_bq_managed_function(
193196 name ,
194197 packages ,
195198 is_row_processor ,
199+ * ,
200+ capture_references = False ,
196201 ):
197202 """Create a BigQuery managed function."""
198- import cloudpickle
199203
200- pickled = cloudpickle .dumps (func )
204+ # TODO(b/406283812): Expose the capability to pass down
205+ # capture_references=True in the public udf API.
206+ if (
207+ capture_references
208+ and (python_version := _utils .get_python_version ())
209+ != _MANAGED_FUNC_PYTHON_VERSION
210+ ):
211+ raise bf_formatting .create_exception_with_feedback_link (
212+ NotImplementedError ,
213+ f"Capturing references for udf is currently supported only in Python version { _MANAGED_FUNC_PYTHON_VERSION } , you are running { python_version } ." ,
214+ )
201215
202216 # Create BQ managed function.
203217 bq_function_args = []
@@ -209,13 +223,15 @@ def provision_bq_managed_function(
209223 bq_function_args .append (f"{ name_ } { type_ } " )
210224
211225 managed_function_options = {
212- "runtime_version" : _utils . get_python_version () ,
226+ "runtime_version" : _MANAGED_FUNC_PYTHON_VERSION ,
213227 "entry_point" : "bigframes_handler" ,
214228 }
215229
216230 # Augment user package requirements with any internal package
217231 # requirements.
218- packages = _utils ._get_updated_package_requirements (packages , is_row_processor )
232+ packages = _utils ._get_updated_package_requirements (
233+ packages , is_row_processor , capture_references
234+ )
219235 if packages :
220236 managed_function_options ["packages" ] = packages
221237 managed_function_options_str = self ._format_function_options (
@@ -235,20 +251,45 @@ def provision_bq_managed_function(
235251 persistent_func_id = (
236252 f"`{ self ._gcp_project_id } .{ self ._bq_dataset } `.{ bq_function_name } "
237253 )
238- create_function_ddl = textwrap .dedent (
239- f"""
240- CREATE OR REPLACE FUNCTION { persistent_func_id } ({ ',' .join (bq_function_args )} )
241- RETURNS { bq_function_return_type }
242- LANGUAGE python
243- OPTIONS ({ managed_function_options_str } )
244- AS r'''
254+
255+ udf_name = func .__name__
256+ if capture_references :
257+ # This code path ensures that if the udf body contains any
258+ # references to variables and/or imports outside the body, they are
259+ # captured as well.
245260 import cloudpickle
246- udf = cloudpickle.loads({ pickled } )
247- def bigframes_handler(*args):
248- return udf(*args)
249- '''
250- """
251- ).strip ()
261+
262+ pickled = cloudpickle .dumps (func )
263+ udf_code = textwrap .dedent (
264+ f"""
265+ import cloudpickle
266+ { udf_name } = cloudpickle.loads({ pickled } )
267+ """
268+ )
269+ else :
270+ # This code path ensures that if the udf body is self contained,
271+ # i.e. there are no references to variables or imports outside the
272+ # body.
273+ udf_code = textwrap .dedent (inspect .getsource (func ))
274+ udf_code = udf_code [udf_code .index ("def" ) :]
275+
276+ create_function_ddl = (
277+ textwrap .dedent (
278+ f"""
279+ CREATE OR REPLACE FUNCTION { persistent_func_id } ({ ',' .join (bq_function_args )} )
280+ RETURNS { bq_function_return_type }
281+ LANGUAGE python
282+ OPTIONS ({ managed_function_options_str } )
283+ AS r'''
284+ __UDF_PLACE_HOLDER__
285+ def bigframes_handler(*args):
286+ return { udf_name } (*args)
287+ '''
288+ """
289+ )
290+ .strip ()
291+ .replace ("__UDF_PLACE_HOLDER__" , udf_code )
292+ )
252293
253294 self ._ensure_dataset_exists ()
254295 self ._create_bq_function (create_function_ddl )
0 commit comments