Microsoft Graph API Operators¶
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AZURE PORTAL or AZURE CLI.
Install API libraries via pip.
pip install 'apache-airflow[azure]'Detailed information is available Installation of Airflow®
MSGraphAsyncOperator¶
Use the
MSGraphAsyncOperator
to call Microsoft Graph API.
Below is an example of using this operator to get a Sharepoint site.
site_task = MSGraphAsyncOperator(
task_id="news_site",
conn_id="msgraph_api",
url="sites/850v1v.sharepoint.com:/sites/news",
result_processor=lambda context, response: response["id"].split(",")[1], # only keep site_id
)
Below is an example of using this operator to get a Sharepoint site pages.
site_pages_task = MSGraphAsyncOperator(
task_id="news_pages",
conn_id="msgraph_api",
api_version="beta",
url=("sites/%s/pages" % "{{ ti.xcom_pull(task_ids='news_site') }}"), # noqa: UP031
)
Below is an example of using this operator to get PowerBI workspaces.
workspaces_task = MSGraphAsyncOperator(
task_id="workspaces",
conn_id="powerbi",
url="myorg/admin/workspaces/modified",
result_processor=lambda context, response: list(map(lambda workspace: workspace["id"], response)),
)
Below is an example of using this operator to get PowerBI workspaces info.
workspaces_info_task = MSGraphAsyncOperator(
task_id="get_workspace_info",
conn_id="powerbi",
url="myorg/admin/workspaces/getInfo",
method="POST",
query_parameters={
"lineage": True,
"datasourceDetails": True,
"datasetSchema": True,
"datasetExpressions": True,
"getArtifactUsers": True,
},
data={"workspaces": workspaces_task.output},
result_processor=lambda context, response: {"scanId": response["id"]},
)
Below is an example of using this operator to refresh PowerBI dataset.
refresh_dataset_task = MSGraphAsyncOperator(
task_id="refresh_dataset",
conn_id="powerbi_api",
url="myorg/groups/{workspaceId}/datasets/{datasetId}/refreshes",
method="POST",
path_parameters={
"workspaceId": "9a7e14c6-9a7d-4b4c-b0f2-799a85e60a51",
"datasetId": "ffb6096e-d409-4826-aaeb-b5d4b165dc4d",
},
data={"type": "full"}, # Needed for enhanced refresh
result_processor=lambda context, response: response["requestid"],
)
refresh_dataset_history_task = MSGraphSensor(
task_id="refresh_dataset_history",
conn_id="powerbi_api",
url="myorg/groups/{workspaceId}/datasets/{datasetId}/refreshes/{refreshId}",
path_parameters={
"workspaceId": "9a7e14c6-9a7d-4b4c-b0f2-799a85e60a51",
"datasetId": "ffb6096e-d409-4826-aaeb-b5d4b165dc4d",
"refreshId": refresh_dataset_task.output,
},
timeout=350.0,
event_processor=lambda context, event: event["status"] == "Completed",
)
Below is an example of using this operator to create an item schedule in Fabric.
# https://learn.microsoft.com/en-us/rest/api/fabric/core/job-scheduler/create-item-schedule?tabs=HTTP
workspaces_task = MSGraphAsyncOperator(
task_id="schedule_datapipeline",
conn_id="powerbi",
method="POST",
url="workspaces/{workspaceId}/items/{itemId}/jobs/instances",
path_parameters={
"workspaceId": "e90b2873-4812-4dfb-9246-593638165644",
"itemId": "65448530-e5ec-4aeb-a97e-7cebf5d67c18",
},
query_parameters={"jobType": "Pipeline"},
dag=dag,
outlets=[
Asset(
"workspaces/e90b2873-4812-4dfb-9246-593638165644/items/65448530-e5ec-4aeb-a97e-7cebf5d67c18/jobs/instances?jobType=Pipeline"
)
],
)
Reference¶
For further information, look at: