## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.fromtypingimportDict,Iterable,Optional,TuplefromsqlalchemyimportColumn,Integer,String,Text,funcfromsqlalchemy.orm.sessionimportSessionfromairflow.exceptionsimportAirflowExceptionfromairflow.models.baseimportBasefromairflow.ti_deps.dependencies_statesimportEXECUTION_STATESfromairflow.typing_compatimportTypedDictfromairflow.utils.sessionimportprovide_sessionfromairflow.utils.sqlalchemyimportnowait,with_row_locksfromairflow.utils.stateimportState
[docs]classPoolStats(TypedDict):"""Dictionary containing Pool Stats"""
[docs]defget_pool(pool_name,session:Session=None):""" Get the Pool with specific pool name from the Pools. :param pool_name: The pool name of the Pool to get. :param session: SQLAlchemy ORM Session :return: the pool object """returnsession.query(Pool).filter(Pool.pool==pool_name).first()
@staticmethod@provide_session
[docs]defget_default_pool(session:Session=None):""" Get the Pool of the default_pool from the Pools. :param session: SQLAlchemy ORM Session :return: the pool object """returnPool.get_pool(Pool.DEFAULT_POOL_NAME,session=session)
@staticmethod@provide_session
[docs]defslots_stats(*,lock_rows:bool=False,session:Session=None,)->Dict[str,PoolStats]:""" Get Pool stats (Number of Running, Queued, Open & Total tasks) If ``lock_rows`` is True, and the database engine in use supports the ``NOWAIT`` syntax, then a non-blocking lock will be attempted -- if the lock is not available then SQLAlchemy will throw an OperationalError. :param lock_rows: Should we attempt to obtain a row-level lock on all the Pool rows returns :param session: SQLAlchemy ORM Session """fromairflow.models.taskinstanceimportTaskInstance# Avoid circular importpools:Dict[str,PoolStats]={}query=session.query(Pool.pool,Pool.slots)iflock_rows:query=with_row_locks(query,session=session,**nowait(session))pool_rows:Iterable[Tuple[str,int]]=query.all()for(pool_name,total_slots)inpool_rows:iftotal_slots==-1:total_slots=float('inf')# type: ignorepools[pool_name]=PoolStats(total=total_slots,running=0,queued=0,open=0)state_count_by_pool=(session.query(TaskInstance.pool,TaskInstance.state,func.sum(TaskInstance.pool_slots)).filter(TaskInstance.state.in_(list(EXECUTION_STATES))).group_by(TaskInstance.pool,TaskInstance.state)).all()# calculate queued and running metricsfor(pool_name,state,count)instate_count_by_pool:# Some databases return decimal.Decimal here.count=int(count)stats_dict:Optional[PoolStats]=pools.get(pool_name)ifnotstats_dict:continue# TypedDict key must be a string literal, so we use if-statements to set valueifstate=="running":stats_dict["running"]=countelifstate=="queued":stats_dict["queued"]=countelse:raiseAirflowException(f"Unexpected state. Expected values: {EXECUTION_STATES}.")# calculate open metricforpool_name,stats_dictinpools.items():ifstats_dict["total"]==-1:# -1 means infinitestats_dict["open"]=-1else:stats_dict["open"]=stats_dict["total"]-stats_dict["running"]-stats_dict["queued"]returnpools
[docs]defto_json(self):""" Get the Pool in a json structure :return: the pool object in json format """return{'id':self.id,'pool':self.pool,'slots':self.slots,'description':self.description,
}@provide_session
[docs]defoccupied_slots(self,session:Session):""" Get the number of slots used by running/queued tasks at the moment. :param session: SQLAlchemy ORM Session :return: the used number of slots """fromairflow.models.taskinstanceimportTaskInstance# Avoid circular importreturnint(session.query(func.sum(TaskInstance.pool_slots)).filter(TaskInstance.pool==self.pool).filter(TaskInstance.state.in_(list(EXECUTION_STATES))).scalar()or0
)@provide_session
[docs]defrunning_slots(self,session:Session):""" Get the number of slots used by running tasks at the moment. :param session: SQLAlchemy ORM Session :return: the used number of slots """fromairflow.models.taskinstanceimportTaskInstance# Avoid circular importreturnint(session.query(func.sum(TaskInstance.pool_slots)).filter(TaskInstance.pool==self.pool).filter(TaskInstance.state==State.RUNNING).scalar()or0
)@provide_session
[docs]defqueued_slots(self,session:Session):""" Get the number of slots used by queued tasks at the moment. :param session: SQLAlchemy ORM Session :return: the used number of slots """fromairflow.models.taskinstanceimportTaskInstance# Avoid circular importreturnint(session.query(func.sum(TaskInstance.pool_slots)).filter(TaskInstance.pool==self.pool).filter(TaskInstance.state==State.QUEUED).scalar()or0
)@provide_session
[docs]defopen_slots(self,session:Session)->float:""" Get the number of slots open at the moment. :param session: SQLAlchemy ORM Session :return: the number of slots """ifself.slots==-1:returnfloat('inf')else:returnself.slots-self.occupied_slots(session)