Source code for airflow.providers.apache.kylin.operators.kylin_cube
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimporttimefromcollections.abcimportSequencefromdatetimeimportdatetimefromtypingimportTYPE_CHECKINGfromkylinpyimportkylinpyfromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperatorfromairflow.providers.apache.kylin.hooks.kylinimportKylinHookifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classKylinCubeOperator(BaseOperator):""" Submit request about Kylin build/refresh/merge and track job status. For more detail information in `Apache Kylin <http://kylin.apache.org/>`_ :param kylin_conn_id: The connection id as configured in Airflow administration. :param project: kylin project name, this param will overwrite the project in kylin_conn_id: :param cube: kylin cube name :param dsn: (dsn , dsn url of kylin connection ,which will overwrite kylin_conn_id. for example: kylin://ADMIN:KYLIN@sandbox/learn_kylin?timeout=60&is_debug=1) :param command: (kylin command include 'build', 'merge', 'refresh', 'delete', 'build_streaming', 'merge_streaming', 'refresh_streaming', 'disable', 'enable', 'purge', 'clone', 'drop'. build - use /kylin/api/cubes/{cubeName}/build rest api,and buildType is 'BUILD', and you should give start_time and end_time refresh - use build rest api,and buildType is 'REFRESH' merge - use build rest api,and buildType is 'MERGE' build_streaming - use /kylin/api/cubes/{cubeName}/build2 rest api,and buildType is 'BUILD' and you should give offset_start and offset_end refresh_streaming - use build2 rest api,and buildType is 'REFRESH' merge_streaming - use build2 rest api,and buildType is 'MERGE' delete - delete segment, and you should give segment_name value disable - disable cube enable - enable cube purge - purge cube clone - clone cube,new cube name is {cube_name}_clone drop - drop cube) :param start_time: build segment start time :param end_time: build segment end time :param offset_start: streaming build segment start time :param offset_end: streaming build segment end time :param segment_name: segment name :param is_track_job: (whether to track job status. if value is True,will track job until job status is in("FINISHED", "ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED") or timeout) :param interval: track job status,default value is 60s :param timeout: timeout value,default value is 1 day,60 * 60 * 24 s :param eager_error_status: (jobs error status,if job status in this list ,this task will be error. default value is tuple(["ERROR", "DISCARDED", "KILLED", "SUICIDAL", "STOPPED"])) """
[docs]defexecute(self,context:Context):_hook=KylinHook(kylin_conn_id=self.kylin_conn_id,project=self.project,dsn=self.dsn)_support_invoke_command=kylinpy.CubeSource.support_invoke_commandifnotself.command:raiseAirflowException(f"Kylin:Command {self.command} can not be empty")ifself.command.lower()notin_support_invoke_command:raiseAirflowException(f"Kylin:Command {self.command} can not match kylin command list {_support_invoke_command}")kylinpy_params={"start":datetime.fromtimestamp(int(self.start_time)/1000)ifself.start_timeelseNone,"end":datetime.fromtimestamp(int(self.end_time)/1000)ifself.end_timeelseNone,"name":self.segment_name,"offset_start":int(self.offset_start)ifself.offset_startelseNone,"offset_end":int(self.offset_end)ifself.offset_endelseNone,}rsp_data=_hook.cube_run(self.cube,self.command.lower(),**kylinpy_params)ifself.is_track_jobandself.command.lower()inself.build_command:started_at=time.monotonic()job_id=rsp_data.get("uuid")ifjob_idisNone:raiseAirflowException("kylin job id is None")self.log.info("kylin job id: %s",job_id)job_status=Nonewhilejob_statusnotinself.jobs_end_status:iftime.monotonic()-started_at>self.timeout:raiseAirflowException(f"kylin job {job_id} timeout")time.sleep(self.interval)job_status=_hook.get_job_status(job_id)self.log.info("Kylin job status is %s ",job_status)ifjob_statusinself.jobs_error_status:raiseAirflowException(f"Kylin job {job_id} status {job_status} is error ")ifself.do_xcom_push:returnrsp_data