airflow.models.dagrun

Module Contents

Classes

TISchedulingDecision

Type of return for DagRun.task_instance_scheduling_decisions.

DagRun

Invocation instance of a DAG.

DagRunNote

For storage of arbitrary notes concerning the dagrun instance.

Attributes

CreatedTasks

RUN_ID_REGEX

airflow.models.dagrun.CreatedTasks[source]
airflow.models.dagrun.RUN_ID_REGEX = '^(?:manual|scheduled|dataset_triggered)__(?:\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\+00:00)$'[source]
class airflow.models.dagrun.TISchedulingDecision[source]

Bases: NamedTuple

Type of return for DagRun.task_instance_scheduling_decisions.

tis: list[airflow.models.taskinstance.TaskInstance][source]
schedulable_tis: list[airflow.models.taskinstance.TaskInstance][source]
changed_tis: bool[source]
unfinished_tis: list[airflow.models.taskinstance.TaskInstance][source]
finished_tis: list[airflow.models.taskinstance.TaskInstance][source]
class airflow.models.dagrun.DagRun(dag_id=None, run_id=None, queued_at=NOTSET, execution_date=None, start_date=None, external_trigger=None, conf=None, state=None, run_type=None, dag_hash=None, creating_job_id=None, data_interval=None)[source]

Bases: airflow.models.base.Base, airflow.utils.log.logging_mixin.LoggingMixin

Invocation instance of a DAG.

A DAG run can be created by the scheduler (i.e. scheduled runs), or by an external trigger (i.e. manual runs).

property stats_tags: dict[str, str][source]
property logical_date: datetime.datetime[source]
property state[source]
property is_backfill: bool[source]
__tablename__ = 'dag_run'[source]
id[source]
dag_id[source]
queued_at[source]
execution_date[source]
start_date[source]
end_date[source]
run_id[source]
creating_job_id[source]
external_trigger[source]
run_type[source]
conf[source]
data_interval_start[source]
data_interval_end[source]
last_scheduling_decision[source]
dag_hash[source]
log_template_id[source]
updated_at[source]
clear_number[source]
__table_args__ = ()[source]
task_instances[source]
dag_model[source]
dag_run_note[source]
note[source]
DEFAULT_DAGRUNS_TO_EXAMINE[source]
__repr__()[source]

Return repr(self).

validate_run_id(key, run_id)[source]
get_state()[source]
set_state(state)[source]

Change the state of the DagRan.

Changes to attributes are implemented in accordance with the following table (rows represent old states, columns represent new states):

State transition matrix

QUEUED

RUNNING

SUCCESS

FAILED

None

queued_at = timezone.utcnow()

if empty: start_date = timezone.utcnow() end_date = None

end_date = timezone.utcnow()

end_date = timezone.utcnow()

QUEUED

queued_at = timezone.utcnow()

if empty: start_date = timezone.utcnow() end_date = None

end_date = timezone.utcnow()

end_date = timezone.utcnow()

RUNNING

queued_at = timezone.utcnow() start_date = None end_date = None

end_date = timezone.utcnow()

end_date = timezone.utcnow()

SUCCESS

queued_at = timezone.utcnow() start_date = None end_date = None

start_date = timezone.utcnow() end_date = None

FAILED

queued_at = timezone.utcnow() start_date = None end_date = None

start_date = timezone.utcnow() end_date = None

refresh_from_db(session=NEW_SESSION)[source]

Reload the current dagrun from the database.

Parameters

session (sqlalchemy.orm.Session) – database session

classmethod active_runs_of_dags(dag_ids=None, only_running=False, session=NEW_SESSION)[source]

Get the number of active dag runs for each dag.

classmethod next_dagruns_to_examine(state, session, max_number=None)[source]

Return the next DagRuns that the scheduler should attempt to schedule.

This will return zero or more DagRun rows that are row-level-locked with a “SELECT … FOR UPDATE” query, you should ensure that any scheduling decisions are made in a single transaction – as soon as the transaction is committed it will be unlocked.

classmethod find(dag_id=None, run_id=None, execution_date=None, state=None, external_trigger=None, no_backfills=False, run_type=None, session=NEW_SESSION, execution_start_date=None, execution_end_date=None)[source]

Return a set of dag runs for the given search criteria.

Parameters
  • dag_id (str | list[str] | None) – the dag_id or list of dag_id to find dag runs for

  • run_id (Iterable[str] | None) – defines the run id for this dag run

  • run_type (airflow.utils.types.DagRunType | None) – type of DagRun

  • execution_date (datetime.datetime | Iterable[datetime.datetime] | None) – the execution date

  • state (airflow.utils.state.DagRunState | None) – the state of the dag run

  • external_trigger (bool | None) – whether this dag run is externally triggered

  • no_backfills (bool) – return no backfills (True), return all (False). Defaults to False

  • session (sqlalchemy.orm.Session) – database session

  • execution_start_date (datetime.datetime | None) – dag run that was executed from this date

  • execution_end_date (datetime.datetime | None) – dag run that was executed until this date

classmethod find_duplicate(dag_id, run_id, execution_date, session=NEW_SESSION)[source]

Return an existing run for the DAG with a specific run_id or execution_date.

None is returned if no such DAG run is found.

Parameters
static generate_run_id(run_type, execution_date)[source]

Generate Run ID based on Run Type and Execution Date.

static fetch_task_instances(dag_id=None, run_id=None, task_ids=None, state=None, session=NEW_SESSION)[source]

Return the task instances for this dag run.

get_task_instances(state=None, session=NEW_SESSION)[source]

Returns the task instances for this dag run.

Redirect to DagRun.fetch_task_instances method. Keep this method because it is widely used across the code.

get_task_instance(task_id, session=NEW_SESSION, *, map_index=-1)[source]

Return the task instance specified by task_id for this dag run.

Parameters
static fetch_task_instance(dag_id, dag_run_id, task_id, session=NEW_SESSION, map_index=-1)[source]

Returns the task instance specified by task_id for this dag run.

Parameters
  • dag_id (str) – the DAG id

  • dag_run_id (str) – the DAG run id

  • task_id (str) – the task id

  • session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session

get_dag()[source]

Return the Dag associated with this DagRun.

Returns

DAG

Return type

airflow.models.dag.DAG

static get_previous_dagrun(dag_run, state=None, session=NEW_SESSION)[source]

Return the previous DagRun, if there is one.

Parameters
static get_previous_scheduled_dagrun(dag_run_id, session=NEW_SESSION)[source]

Return the previous SCHEDULED DagRun, if there is one.

Parameters
update_state(session=NEW_SESSION, execute_callbacks=True)[source]

Determine the overall state of the DagRun based on the state of its TaskInstances.

Parameters
  • session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session

  • execute_callbacks (bool) – Should dag callbacks (success/failure, SLA etc.) be invoked directly (default: true) or recorded as a pending request in the returned_callback property

Returns

Tuple containing tis that can be scheduled in the current loop & returned_callback that needs to be executed

Return type

tuple[list[airflow.models.taskinstance.TaskInstance], airflow.callbacks.callback_requests.DagCallbackRequest | None]

task_instance_scheduling_decisions(session=NEW_SESSION)[source]
notify_dagrun_state_changed(msg='')[source]
verify_integrity(*, session=NEW_SESSION)[source]

Verify the DagRun by checking for removed tasks or tasks that are not in the database yet.

It will set state to removed or add the task if required.

Missing_indexes

A dictionary of task vs indexes that are missing.

Parameters

session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session

classmethod get_latest_runs(session=NEW_SESSION)[source]

Return the latest DagRun for each DAG.

schedule_tis(schedulable_tis, session=NEW_SESSION, max_tis_per_query=None)[source]

Set the given task instances in to the scheduled state.

Each element of schedulable_tis should have its task attribute already set.

Any EmptyOperator without callbacks or outlets is instead set straight to the success state.

All the TIs should belong to this DagRun, but this code is in the hot-path, this is not checked – it is the caller’s responsibility to call this function only with TIs from a single dag run.

get_log_template(*, session=NEW_SESSION)[source]
get_log_filename_template(*, session=NEW_SESSION)[source]
class airflow.models.dagrun.DagRunNote(content, user_id=None)[source]

Bases: airflow.models.base.Base

For storage of arbitrary notes concerning the dagrun instance.

__tablename__ = 'dag_run_note'[source]
user_id[source]
dag_run_id[source]
content[source]
created_at[source]
updated_at[source]
dag_run[source]
__table_args__ = ()[source]
__repr__()[source]

Return repr(self).

Was this entry helpful?