Skip to content

Manage Airflow assets

Operations on Airflow assets (AirflowDag, AirflowTask).

In general, these should be:

  • Created in top-down order (connection, then AirflowDag, then AirflowTask)
  • Deleted in bottom-up order (tasks, then dags, then connections)1
erDiagram
  Connection ||--o{ AirflowDag : contains
  AirflowDag ||--o{ AirflowTask : contains

Asset structure

Connection

2.1.4 1.11.2

An Airflow connection requires a name and qualifiedName. For creation, specific settings are also required to distinguish it as an Airflow connection rather than another type of connection. In addition, at least one of adminRoles, adminGroups, or adminUsers must be provided.

Create an Airflow connection
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
String adminRoleGuid = RoleCache.getIdForName("$admin"); // (1)
Connection connection = Connection.creator( // (2)
        "airflow-connection", // (3)
        AtlanConnectorType.AIRFLOW, // (4)
        List.of(adminRoleGuid), // (5)
        List.of("group2"), // (6)
        List.of("jsmith")) // (7)
    .build();
AssetMutationResponse response = connection.save(); // (8)
String connectionQualifiedName = response.getCreatedAssets().get(0).getQualifiedName(); // (9)
  1. Retrieve the GUID for the admin role, to use later for defining the roles that can administer the connection.
  2. Build up the minimum request to create a connection.
  3. Provide a human-readable name for your connection, such as production or development.
  4. Set the type of connection to AIRFLOW.
  5. List the workspace roles that should be able to administer the connection (or null if none). All users with that workspace role (current and future) will be administrators of the connection. Note that the values here need to be the GUID(s) of the workspace role(s). At least one of adminRoles, adminGroups, or adminUsers must be provided.
  6. List the group names that can administer this connection (or null if none). All users within that group (current and future) will be administrators of the connection. Note that the values here are the name(s) of the group(s). At least one of adminRoles, adminGroups, or adminUsers must be provided.
  7. List the user names that can administer this connection (or null if none). Note that the values here are the username(s) of the user(s). At least one of adminRoles, adminGroups, or adminUsers must be provided.
  8. Actually call Atlan to create the connection.
  9. Retrieve the qualifiedName for use in subsequent creation calls. (You'd probably want to do some null checking first.)
Create an Airflow connection
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from pyatlan.cache.role_cache import RoleCache
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.assets import Connection, AirflowDag, AirflowTask
from pyatlan.model.enums import AtlanConnectorType

admin_role_guid = RoleCache.get_id_for_name("$admin") # (1)
connection = Connection.creator( # (2)
    name = "airflow-connection", # (3)
    connector_type = AtlanConnectorType.AIRFLOW, # (4)
    admin_roles = [admin_role_guid], # (5)
    admin_groups = ["group2"], # (6)
    admin_users = ["jsmith"] # (7)
)

response = client.asset.save(connection) # (8)
connection_qualified_name = response.assets_created(asset_type=Connection)[0].qualified_name # (9)
  1. Retrieve the GUID for the admin role, to use later for defining the roles that can administer the connection.
  2. Build up the minimum request to create a connection.
  3. Provide a human-readable name for your connection, such as production or development.
  4. Set the type of connection to AIRFLOW.
  5. List the workspace roles that should be able to administer the connection (or None if none). All users with that workspace role (current and future) will be administrators of the connection. Note that the values here need to be the GUID(s) of the workspace role(s). At least one of admin_roles, admin_groups, or admin_users must be provided.
  6. List the group names that can administer this connection (or None if none). All users within that group (current and future) will be administrators of the connection. Note that the values here are the name(s) of the group(s). At least one of admin_roles, admin_groups, or admin_users must be provided.
  7. List the user names that can administer this connection (or None if none). Note that the values here are the username(s) of the user(s). At least one of admin_roles, admin_groups, or admin_users must be provided.
  8. Actually call Atlan to create the connection.
  9. Retrieve the qualifiedName for use in subsequent creation calls. (You'd probably want to do some None checking first.)
POST /api/meta/entity/bulk
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
  "entities": [
    {
      "typeName": "Connection", // (1)
      "attributes": {
        "name": "airflow-connection", // (2)
        "connectorName": "airflow", // (3)
        "qualifiedName": "default/airflow/123456789", // (4)
        "category": "elt", // (5)
        "adminRoles": [ // (6)
          "e7ae0295-c60a-469a-bd2c-fb903943aa02"
        ],
        "adminGroups": [ // (7)
          "group2"
        ],
        "adminUsers": [ // (8)
          "jsmith"
        ]
      }
    }
  ]
}
  1. The typeName must be exactly Connection.
  2. Human-readable name for your connection, such as production or development.
  3. The connectorName must be exactly airflow.
  4. The qualifiedName should follow the pattern: default/airflow/<epoch>, where <epoch> is the time in milliseconds at which the connection is being created.
  5. The category must be elt.
  6. List any workspace roles that can administer this connection. All users with that workspace role (current and future) will be administrators of the connection. Note that the values here need to be the GUID(s) of the workspace role(s). At least one of adminRoles, adminGroups, or adminUsers must be provided.
  7. List any groups that can administer this connection. All users within that group (current and future) will be administrators of the connection. Note that the values here are the name(s) of the group(s). At least one of adminRoles, adminGroups, or adminUsers must be provided.
  8. List any users that can administer this connection. Note that the values here are the username(s) of the user(s). At least one of adminRoles, adminGroups, or adminUsers must be provided.

Access policies

Atlan creates the policies that grant access to a connection, including the ability to retrieve the connection and to create assets within it, asynchronously. It can take several seconds (even up to approximately 30 seconds) before these are in place after creating the connection.

You may therefore need to wait before you'll be able to create the assets below within the connection.

To confirm access, retrieve the connection after it has been created. The SDKs' retry loops will automatically retry until the connection can be successfully retrieved. At that point, your API token has permission to create the other assets.

Note: if you are reusing an existing connection rather than creating one via your API token, you must give your API token a persona that has access to that connection. Otherwise all attempts to create, read, update, or delete assets within that connection will fail due to a lack of permissions.

AirflowDag

2.1.4 1.11.2

An AirflowDag requires a name and a qualifiedName. For creation, you also need to specify the connectionQualifiedName of the connection for the dag.

Create an Airflow dag
11
12
13
14
15
16
17
AirflowDag airflowDag = AirflowDag.creator( // (1)
        "myAirflowDag", // (2)
        connectionQualifiedName // (3)
    )
    .build();
AssetMutationResponse response = airflowDag.save(); // (4)
airflowDag = response.getResult(airflowDag); // (5)
  1. Build up the minimum request to create a dag.
  2. Provide a human-readable name for your dag.
  3. Provide the qualifiedName of the Airflow connection.
  4. Actually call Atlan to create the dag.
  5. Retrieve the created dag for use in subsequent creation calls. (You'd probably want to do some null checking first.)
Create an Airflow dag
17
18
19
20
21
22
airflow_dag = AirflowDag.creator( # (1)
    name="myAirflowDag", # (2)
    connection_qualified_name=connection_qualified_name # (3)
)
response = client.asset.save(airflow_dag) # (4)
airflow_dag_qualifed_name = response.assets_created(asset_type=AirflowDag)[0].qualified_name # (5)
  1. Build up the minimum request to create a dag.
  2. Provide a human-readable name for your dag.
  3. Provide the qualifiedName of the Airflow connection.
  4. Actually call Atlan to create the dag.
  5. Retrieve the created dag for use in subsequent creation calls. (You'd probably want to do some None checking first.)
POST /api/meta/entity/bulk
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{
  "entities": [
    {
      "typeName": "AirflowDag", // (1)
      "attributes": {
        "name": "myAirflowDag", // (2)
        "qualifiedName": "default/airflow/123456789/myAirflowDag", // (3)
        "connectionQualifiedName": "default/airflow/123456789", // (4)
        "connectorName": "airflow" // (5)
      }
    }
  ]
}
  1. The typeName must be exactly AirflowDag.
  2. Human-readable name for your dag.
  3. The qualifiedName should follow the pattern: default/airflow/<epoch>/<name>, where default/airflow/<epoch> is the qualifiedName of the connection for this dag and <name> is the unique name for this dag.
  4. The connectionQualifiedName must be the exact qualifiedName of the connection for this dag.
  5. The connectorName must be exactly airflow.

AirflowTask

2.1.4 1.11.2

An AirflowTask requires a name and a qualifiedName. For creation, you also need to specify the airflowDagQualifiedName of the dag that will contain the task.

Create an Airflow task
18
19
20
21
22
23
AirflowTask airflowTask = AirflowTask.creator( // (1)
        "myAirflowTask", // (2)
        airflowDag  // (3)
    )
    .build();
AssetMutationResponse response = airflowTask.save(); // (4)
  1. Build up the minimum request to create a task.
  2. Provide a human-readable name for your task.
  3. Provide the dag for this task. If you did not already have the dag, you could also use AirflowDag.refByGuid() with the GUID of the dag, or AirflowDag.refByQualifiedName() with the qualifiedName of the dag.
  4. Actually call Atlan to create the task.
Create an Airflow task
23
24
25
26
27
airflowTask = AirflowTask.creator( # (1)
    name="myAirflowTask", # (2)
    airflow_dag_qualified_name=airflow_dag_qualifed_name # (3)
)
response = client.asset.save(airflowTask) # (4)
  1. Build up the minimum request to create a task.
  2. Provide a human-readable name for your task.
  3. Provide the qualified_name of the dag for this task.
  4. Actually call Atlan to create the task.
POST /api/meta/entity/bulk
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
{
  "entities": [
    {
      "typeName": "AirflowTask", // (1)
      "attributes": {
        "name": "myAirflowTask", // (2)
        "qualifiedName": "default/airflow/123456789/myAirflowDag/myAirflowTask", // (3)
        "connectionQualifiedName": "default/airflow/123456789", // (4)
        "connectorName": "airflow", // (5)
        "airflowDag": { // (6)
          "typeName": "AirflowDag", // (7)
          "uniqueAttributes": { // (8)
            "qualifiedName": "default/airflow/123456789/myAirflowDag",
          }
        },
        "airflowDagName": "myAirflowDag", // (9)
        "airflowDagQualifiedName": "default/airflow/123456789/myAirflowDag" // (10)
      }
    }
  ]
}
  1. The typeName must be exactly AirflowTask.
  2. Human-readable name for your task.
  3. The qualifiedName should follow the pattern: default/airflow/<epoch>/<dag>/<name>, where default/airflow/<epoch>/<dag> is the qualifiedName of the dag that contains this task and <name> is the unique name for this task.
  4. The connectionQualifiedName must be the exact qualifiedName of the connection for this task.
  5. The connectorName must be exactly airflow.
  6. The dag in which this task exists is embedded in the airflowDag attribute.
  7. The typeName for this embedded reference must be AirflowDag.
  8. To complete the reference, you must include a uniqueAttributes task with the qualifiedName of the dag. Note: the dag must already exist in Atlan before creating the task.
  9. The airflowDagName should be the human-readable name of the dag.
  10. The airflowDagQualifiedName should be the qualifiedName of the dag.

Available relationships

Every level of the Airflow structure is an Asset, and can therefore be related to the following other assets.

erDiagram
  Asset }o--o{ AtlasGlossaryTerm : meanings
  Asset ||--o{ Link : links
  Asset ||--o| Readme : readme
  Asset }o--o{ Process : inputToProcesses
  Asset }o--o{ Process : outputFromProcesses

AtlasGlossaryTerm

A glossary term provides meaning to an asset. The link terms to assets snippet provides more detail on setting this relationship.

A link provides additional context to an asset, by providing a URL to additional information.

Readme

A README provides rich documentation for an asset. The add asset READMEs snippet provides more detail on setting this relationship.

Process

A process provides lineage information for an asset. An asset can be both an input and an output for one or more processes. The lineage snippets provide more detail on creating and working with lineage.


  1. Although if you want to delete everything in a connection, your better avenue is the packaged connection delete utility in the UI.