Source code for airflow.example_dags.tutorial_objectstorage

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

# [START tutorial]
# [START import_module]
import pendulum
import requests

from airflow.decorators import dag, task
from airflow.io.path import ObjectStoragePath

# [END import_module]

[docs]API = "https://opendata.fmi.fi/timeseries"
[docs]aq_fields = { "fmisid": "int32", "time": "datetime64[ns]", "AQINDEX_PT1H_avg": "float64", "PM10_PT1H_avg": "float64", "PM25_PT1H_avg": "float64", "O3_PT1H_avg": "float64", "CO_PT1H_avg": "float64", "SO2_PT1H_avg": "float64", "NO2_PT1H_avg": "float64", "TRSC_PT1H_avg": "float64", }
# [START create_object_storage_path]
[docs]base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")
# [END create_object_storage_path] @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], )
[docs]def tutorial_objectstorage(): """ ### Object Storage Tutorial Documentation This is a tutorial DAG to showcase the usage of the Object Storage API. Documentation that goes along with the Airflow Object Storage tutorial is located [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/objectstorage.html) """ # [START get_air_quality_data] @task def get_air_quality_data(**kwargs) -> ObjectStoragePath: """ #### Get Air Quality Data This task gets air quality data from the Finnish Meteorological Institute's open data API. The data is saved as parquet. """ import pandas as pd execution_date = kwargs["logical_date"] start_time = kwargs["data_interval_start"] params = { "format": "json", "precision": "double", "groupareas": "0", "producer": "airquality_urban", "area": "Uusimaa", "param": ",".join(aq_fields.keys()), "starttime": start_time.isoformat(timespec="seconds"), "endtime": execution_date.isoformat(timespec="seconds"), "tz": "UTC", } response = requests.get(API, params=params) response.raise_for_status() # ensure the bucket exists base.mkdir(exist_ok=True) formatted_date = execution_date.format("YYYYMMDD") path = base / f"air_quality_{formatted_date}.parquet" df = pd.DataFrame(response.json()).astype(aq_fields) with path.open("wb") as file: df.to_parquet(file) return path # [END get_air_quality_data] # [START analyze] @task def analyze(path: ObjectStoragePath, **kwargs): """ #### Analyze This task analyzes the air quality data, prints the results """ import duckdb conn = duckdb.connect(database=":memory:") conn.register_filesystem(path.fs) conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')") df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf() print(df2.head()) # [END analyze] # [START main_flow] obj_path = get_air_quality_data() analyze(obj_path)
# [END main_flow] # [START dag_invocation] tutorial_objectstorage() # [END dag_invocation] # [END tutorial]

Was this entry helpful?