Skip to content

Manage lineage

Create lineage between assets

1.4.0 1.0.0

To create lineage between assets, you need to create a Process entity.

Input and output assets must already exist

Note that the assets you reference as the inputs and outputs of the process must already exist, before creating the process.

Create lineage between assets
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
LineageProcess process = LineageProcess.creator( // (1)
        "Source 1, Source 2, Source 3 -> Target 1, Target 2", // (2)
        "default/snowflake/1657025257", // (3)
        "dag_123", // (4)
        List.of( // (5)
            Table.refByGuid("495b1516-aaaf-4390-8cfd-b11ade7a7799"),
            Table.refByGuid("d002dead-1655-4d75-abd6-ad889fa04bd4"),
            Table.refByQualifiedName("default/snowflake/1657025257/OPS/DEFAULT/RUN_STATS")),
        List.of( // (6)
            Table.refByGuid("86d9a061-7753-4884-b988-a02d3954bc24"),
            Table.refByQualifiedName("default/snowflake/1657025257/OPS/DEFAULT/FULL_STATS")),
        null) // (7)
    .sql("select * from somewhere;") // (8)
    .sourceURL("https://your.orchestrator/unique/id/123") // (9)
    .build();
AssetMutationResponse response = process.save(); // (10)
assert response.getCreatedAssets().size() == 1 // (11)
assert response.getUpdatedAssets().size() == 5 // (12)
  1. Use the creator() method to initialize the object with all necessary attributes for creating it.
  2. Provide a name for how the process will be shown in the UI.
  3. Provide the qualifiedName of the connection that ran the process.

    Tips for the connection

    The process itself must be created within a connection for both access control and icon labelling. Use a connection qualifiedName that indicates the system that ran the process:

    • You could use the same connection qualifiedName as the source system, if it was the source system "pushing" data to the target(s).
    • You could use the same connection qualifiedName as the target system, if it was the target system "pulling" data from the source(s).
    • You could use a different connection qualifiedName from either source or target, if there is a system in-between doing the processing (for example an ETL engine or orchestrator).
  4. (Optional) Provide the unique ID of the process within that connection. This could be the unique DAG ID for an orchestrator, for example. Since it is optional, you can also send null and the SDK will generate a unique ID for you based on the unique combination of inputs and outputs for the process.

    Use your own ID if you can

    While the SDK can generate this ID for you, since it is based on the unique combination of inputs and outputs the ID can change if those inputs or outputs change. This could result in extra processes in lineage as this process itself changes over time.

    By using your own ID for the process, any changes that occur in that process over time (even if the inputs or outputs change) the same single process in Atlan will be updated.

  5. Provide the list of inputs to the process. Note that each of these is only a Reference to an asset, not a full asset object. For a reference you only need (in addition to the type of asset) either:

    • its GUID (for the static <Type>.refByGuid() method)
    • its qualifiedName (for the static <Type>.refByQualifiedName() method)
  6. Provide the list of outputs to the process. Note that each of these is again only a Reference to an asset.

  7. (Optional) Provide the parent LineageProcess in which this process ran (for example, if this process is a subprocess of some higher-level process). If this is a top-level process, you can also send null for this parameter (as in this example).
  8. (Optional) You can also add other properties to the lineage process, such as SQL code that runs within the process.
  9. (Optional) You can also provide a link to the process, which will provide a button to click to go to that link from the Atlan UI when viewing the process in Atlan.
  10. Call the save() method to actually create the process.
  11. The response will include that single lineage process asset that was created.
  12. The response will also include the 5 data assets (3 inputs, 2 outputs) that were updated.
Create lineage between assets
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.assets import Process, Table

client = AtlanClient()
process = Process.create( # (1) 
    name="Source 1, Source 2, Source 3 -> Target 1, Target 2", # (2)
    connection_qualified_name="default/snowflake/1657025257", # (3)
    process_id="dag_123", # (4)
    inputs=[ # (5)
        Table.ref_by_guid(guid="495b1516-aaaf-4390-8cfd-b11ade7a7799"),
        Table.ref_by_guid(guid="d002dead-1655-4d75-abd6-ad889fa04bd4"),
        Table.ref_by_qualified_name(qualified_name="default/snowflake/1657025257/OPS/DEFAULT/RUN_STATS"),
    ],
    outputs=[ # (6)
        Table.ref_by_guid(guid="86d9a061-7753-4884-b988-a02d3954bc24"),
        Table.ref_by_qualified_name(qualified_name="default/snowflake/1657025257/OPS/DEFAULT/FULL_STATS"),
    ],
) # (7)
process.sql = "select * from somewhere;" # (8)
process.source_url = "https://your.orchestrator/unique/id/123" # (9)
response = client.asset.save(process) # (10)
assert (processes := response.assets_created(Process)) # (11)
assert len(processes) == 1 # (12)
assert (tables := response.assets_updated(Table)) # (13)
assert len(tables) == 2 # (14)
  1. Use the create() method to initialize the object with all necessary attributes for creating it.
  2. Provide a name for how the process will be shown in the UI.
  3. Provide the qualified_name of the connection that ran the process.

    Tips for the connection

    The process itself must be created within a connection for both access control and icon labelling. Use a connection qualified_name that indicates the system that ran the process:

    • You could use the same connection qualified_name as the source system, if it was the source system "pushing" data to the target(s).
    • You could use the same connection qualified_name as the target system, if it was the target system "pulling" data from the source(s).
    • You could use a different connection qualified_name from either source or target, if there is a system in-between doing the processing (for example an ETL engine or orchestrator).
  4. (Optional) Provide the unique ID of the process within that connection. This could be the unique DAG ID for an orchestrator, for example. Since it is optional, you can also leave it out and the SDK will generate a unique ID for you based on the unique combination of inputs and outputs for the process.

    Use your own ID if you can

    While the SDK can generate this ID for you, since it is based on the unique combination of inputs and outputs the ID can change if those inputs or outputs change. This could result in extra processes in lineage as this process itself changes over time.

    By using your own ID for the process, any changes that occur in that process over time (even if the inputs or outputs change) the same single process in Atlan will be updated.

  5. Provide the list of inputs to the process. Note that each of these is only a Reference to an asset, not a full asset object. For a reference you only need (in addition to the type of asset) either:

    • its GUID (for the ref_by_guidB() method)
    • its qualifiedName (for the ref_by_qualified_name() method)
  6. Provide the list of outputs to the process. Note that each of these is again only a Reference to an asset.

  7. (Optional) Provide the parent Process in which this process ran (for example, if this process is a subprocess of some higher-level process). If this is a top-level process, you can also send null for this parameter (as in this example).
  8. (Optional) You can also add other properties to the lineage process, such as SQL code that runs within the process.
  9. (Optional) You can also provide a link to the process, which will provide a button to click to go to that link from the Atlan UI when viewing the process in Atlan.
  10. Call the save() method to actually create the process.
  11. Check that a Process was created.
  12. Check that only 1 Process was created.
  13. Check that tables were updated.
  14. Check that 5 tables (3 inputs, 2 outputs) were updated.
POST /api/meta/entity/bulk
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
{
  "entities": [ // (1)
    {
      "typeName": "Process", // (2)
      "attributes": {
        "name": "Source 1, Source 2, Source 3 -> Target 1, Target 2", // (3)
        "qualifiedName": "default/snowflake/1657025257/dag_123", // (4)
        "inputs": [ // (5)
          {
            "typeName": "Table",
            "guid": "495b1516-aaaf-4390-8cfd-b11ade7a7799"
          },
          {
            "typeName": "Table",
            "guid": "d002dead-1655-4d75-abd6-ad889fa04bd4"
          },
          {
            "typeName": "Table",
            "uniqueAttributes": {
              "qualifiedName": "default/snowflake/1657025257/OPS/DEFAULT/RUN_STATS"
            }
          }
        ],
        "outputs": [ // (6)
          {
            "typeName": "Table",
            "guid": "86d9a061-7753-4884-b988-a02d3954bc24"
          },
          {
            "typeName": "Table",
            "uniqueAttributes": {
              "qualifiedName": "default/snowflake/1657025257/OPS/DEFAULT/FULL_STATS"
            }
          }
        ]
      }
    }
  ]
}
  1. All assets must be wrapped in an entities array.
  2. You must provide the exact type name for a Process asset (case-sensitive).
  3. You must provide a name of the integration process.
  4. You must provide a unique qualifiedName for the integration process (case-sensitive).
  5. You must list all of the input assets to the process. These can be referenced by GUID or by qualifiedName.
  6. You must list all of the output assets from the process. These can also be referenced by either GUID or qualifiedName.

Remove lineage between assets

1.4.0 1.0.0

To remove lineage between assets, you need to delete the Process entity that links them:

Remove lineage between assets
1
2
3
4
5
6
7
AssetMutationResponse response =
        Asset.purge("b4113341-251b-4adc-81fb-2420501c30e6"); // (1)
Asset deleted = response.getDeletedAssets().get(0); // (2)
Process process;
if (deleted instanceof Process) {
    process = (Process) deleted; // (3)
}
  1. Provide the GUID for the process to the static Asset.purge() method.
  2. The response will include that single process that was purged.
  3. If you want to confirm the details, you'll need to type-check and then cast the generic Asset returned into a Process.
Remove lineage between assets
1
2
3
4
5
6
7
8
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.assets import Process

client = AtlanClient()
response = client.asset.purge_by_guid( # (1)
    guid="b4113341-251b-4adc-81fb-2420501c30e6") # (2)
assert (processes := response.assets_deleted(Process)) # (3)
assert len(processes) == 1 # (4)
  1. Invoke the asset.purge_by_guid to delete the Process.
  2. Provide the GUID of the process to be purged.
  3. Check that a Process was purged.
  4. Check that only 1 Process was purged.
DELETE /api/meta/entity/bulk?guid=6fa1f0d0-5720-4041-8243-c2a5628b68bf&deleteType=PURGE
1
// (1)
  1. All of the details are in the request URL, there is no payload for a deletion. The GUID for the process itself (not any of its inputs or outputs) is what is listed in the URL.
More information

This will irreversibly delete the process, and therefore the lineage it represented. The input and output assets themselves will also be updated, to no longer be linked to the (now non-existent) process. However, the input and output assets themselves will continue to exist in Atlan.