-
Notifications
You must be signed in to change notification settings - Fork 124
Expand file tree
/
Copy pathtexera_run_python_worker.py
More file actions
87 lines (76 loc) · 2.76 KB
/
texera_run_python_worker.py
File metadata and controls
87 lines (76 loc) · 2.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import sys
from loguru import logger
from core.python_worker import PythonWorker
from core.storage.storage_config import StorageConfig
def init_loguru_logger(stream_log_level) -> None:
"""
initialize the loguru's logger with the given configurations
:param stream_log_level: level to be output to stdout/stderr
:return:
"""
# loguru has default configuration which includes stderr as the handler. In order to
# change the configuration, the easiest way is to remove any existing handlers and
# re-configure them.
logger.remove()
# set up stream handler, which outputs to stderr
logger.add(sys.stderr, level=stream_log_level)
if __name__ == "__main__":
(
_,
worker_id,
output_port,
logger_level,
r_path,
iceberg_catalog_type,
iceberg_postgres_catalog_uri_without_scheme,
iceberg_postgres_catalog_username,
iceberg_postgres_catalog_password,
iceberg_rest_catalog_uri,
iceberg_rest_catalog_warehouse_name,
iceberg_table_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
s3_endpoint,
s3_region,
s3_auth_username,
s3_auth_password,
) = sys.argv
init_loguru_logger(logger_level)
StorageConfig.initialize(
iceberg_catalog_type,
iceberg_postgres_catalog_uri_without_scheme,
iceberg_postgres_catalog_username,
iceberg_postgres_catalog_password,
iceberg_rest_catalog_uri,
iceberg_rest_catalog_warehouse_name,
iceberg_table_namespace,
iceberg_file_storage_directory_path,
iceberg_table_commit_batch_size,
s3_endpoint,
s3_region,
s3_auth_username,
s3_auth_password,
)
# Setting R_HOME environment variable for R-UDF usage
if r_path:
import os
os.environ["R_HOME"] = r_path
PythonWorker(
worker_id=worker_id, host="localhost", output_port=int(output_port)
).run()