6565# BQ managed functions (@udf) currently only support Python 3.11.
6666_MANAGED_FUNC_PYTHON_VERSION = "python-3.11"
6767
68+ _DEFAULT_FUNCTION_MEMORY_MIB = 1024
69+
6870
6971class FunctionClient :
7072 # Wait time (in seconds) for an IAM binding to take effect after creation.
@@ -402,9 +404,11 @@ def create_cloud_function(
402404 is_row_processor = False ,
403405 vpc_connector = None ,
404406 vpc_connector_egress_settings = "private-ranges-only" ,
405- memory_mib = 1024 ,
407+ memory_mib = None ,
408+ cpus = None ,
406409 ingress_settings = "internal-only" ,
407410 workers = None ,
411+ threads = None ,
408412 concurrency = None ,
409413 ):
410414 """Create a cloud function from the given user defined function."""
@@ -488,6 +492,8 @@ def create_cloud_function(
488492 function .service_config = functions_v2 .ServiceConfig ()
489493 if memory_mib is not None :
490494 function .service_config .available_memory = f"{ memory_mib } Mi"
495+ if cpus is not None :
496+ function .service_config .available_cpu = str (cpus )
491497 if timeout_seconds is not None :
492498 if timeout_seconds > 1200 :
493499 raise bf_formatting .create_exception_with_feedback_link (
@@ -521,10 +527,15 @@ def create_cloud_function(
521527 )
522528 if concurrency :
523529 function .service_config .max_instance_request_concurrency = concurrency
530+
531+ env_vars = {}
524532 if workers :
525- function .service_config .environment_variables = {
526- "WORKERS" : str (workers )
527- }
533+ env_vars ["WORKERS" ] = str (workers )
534+ if threads :
535+ env_vars ["THREADS" ] = str (threads )
536+ if env_vars :
537+ function .service_config .environment_variables = env_vars
538+
528539 if ingress_settings not in _INGRESS_SETTINGS_MAP :
529540 raise bf_formatting .create_exception_with_feedback_link (
530541 ValueError ,
@@ -589,6 +600,7 @@ def provision_bq_remote_function(
589600 cloud_function_vpc_connector ,
590601 cloud_function_vpc_connector_egress_settings ,
591602 cloud_function_memory_mib ,
603+ cloud_function_cpus ,
592604 cloud_function_ingress_settings ,
593605 bq_metadata ,
594606 workers ,
@@ -626,6 +638,18 @@ def provision_bq_remote_function(
626638 )
627639 cf_endpoint = self .get_cloud_function_endpoint (cloud_function_name )
628640
641+ if (cloud_function_cpus is None ) and (cloud_function_memory_mib is None ):
642+ cloud_function_memory_mib = _DEFAULT_FUNCTION_MEMORY_MIB
643+
644+ # assumption is most bigframes functions are cpu bound, single-threaded and many won't release GIL
645+ # therefore, want to allocate a worker for each cpu, and allow a concurrent request per worker
646+ expected_cpus = cloud_function_cpus or _infer_cpus_from_memory (
647+ cloud_function_memory_mib
648+ )
649+ workers = expected_cpus
650+ concurrency = expected_cpus
651+ threads = 2 # (per worker)
652+
629653 # Create the cloud function if it does not exist
630654 if not cf_endpoint :
631655 cf_endpoint = self .create_cloud_function (
@@ -640,8 +664,10 @@ def provision_bq_remote_function(
640664 vpc_connector = cloud_function_vpc_connector ,
641665 vpc_connector_egress_settings = cloud_function_vpc_connector_egress_settings ,
642666 memory_mib = cloud_function_memory_mib ,
667+ cpus = cloud_function_cpus ,
643668 ingress_settings = cloud_function_ingress_settings ,
644669 workers = workers ,
670+ threads = threads ,
645671 concurrency = concurrency ,
646672 )
647673 else :
@@ -708,3 +734,19 @@ def get_remote_function_specs(self, remote_function_name):
708734 # Note: list_routines doesn't make an API request until we iterate on the response object.
709735 pass
710736 return (http_endpoint , bq_connection )
737+
738+
739+ def _infer_cpus_from_memory (memory_mib : int ) -> int :
740+ if memory_mib <= 2048 :
741+ # in actuality, will be 0.583 for 1024mb, 0.33 for 512mb, etc, but we round up to 1
742+ return 1
743+ elif memory_mib <= 8192 :
744+ return 2
745+ elif memory_mib <= 8192 :
746+ return 2
747+ elif memory_mib <= 16384 :
748+ return 4
749+ elif memory_mib <= 32768 :
750+ return 8
751+ else :
752+ raise ValueError ("Cloud run support at most 32768MiB per instance" )
0 commit comments