Skip to content

Manage workflows

Retrieve workflow

By ID

2.3.1 4.0.0

Retrieve an existing workflow by its ID:

Retrieve workflows by its type
1
2
WorkflowSearchResult result = WorkflowSearchRequest // (1)
    .findById(client, "atlan-snowflake-miner-1714638976"); // (2)
  1. You can search for existing workflows through the WorkflowSearchRequest class.
  2. You can find workflows by their ID using the findById() helper method and providing the ID for one of the packages. In this example, we're retrieving a specific Snowflake miner package. Because this operation will retrieve information from Atlan, you must provide it an AtlanClient through which to connect to the tenant.
Retrieve workflow by its ID
1
2
3
4
5
6
7
from pyatlan.client.atlan import AtlanClient

client = AtlanClient()

result = client.workflow.find_by_id(  # (1)
    id="atlan-snowflake-miner-1714638976"
)
  1. You can find a workflow by its identifier using the find_by_id() method of the workflow client, providing the id for the specific workflow. In this example, we're retrieving the SnowflakeMiner workflow.
Retrieve workflows by its type
1
2
val result = WorkflowSearchRequest // (1)
    .findById(client, "atlan-snowflake-miner-1714638976") // (2)
  1. You can search for existing workflows through the WorkflowSearchRequest class.
  2. You can find workflows by their ID using the findById() helper method and providing the ID for one of the packages. In this example, we're retrieving a specific Snowflake miner package. Because this operation will retrieve information from Atlan, you must provide it an AtlanClient through which to connect to the tenant.
POST /api/service/workflows/indexsearch
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
{
   "from": 0,
   "size": 1,
   "track_total_hits": true,
   "query": {
       "bool": {
       "filter": [
           {
           "nested": {
               "path": "metadata",
               "query": {
               "bool": {
                   "must": [
                   {
                       "term": {
                       "metadata.name.keyword": {
                           "value": "atlan-snowflake-miner-1714638976"
                            // (1)
                       }
                       }
                   }
                   ]
               }
               }
           }
           }
       ]
       }
   },
   "sort": [
       {
       "metadata.creationTimestamp": {
           "order": "desc",
           "nested": {
           "path": "metadata"
           }
       }
       }
   ]
}
  1. You can find a workflow by its identifier. In this example, we're retrieving the SnowflakeMiner workflow.

By type

1.9.5 4.0.0

Retrieve existing workflows by its type:

Retrieve workflows by its type
1
2
List<WorkflowSearchResult> results = WorkflowSearchRequest // (1)
    .findByType(client, SnowflakeMiner.PREFIX, 5); // (2)
  1. You can search for existing workflows through the WorkflowSearchRequest class.
  2. You can find workflows by their type using the findByType() helper method and providing the prefix for one of the packages. In this example, we do so for the SnowflakeMiner. (You can also specify the maximum number of resulting workflows you want to retrieve as results.) Because this operation will retrieve information from Atlan, you must provide it an AtlanClient through which to connect to the tenant.
Retrieve workflows by its type
1
2
3
4
5
6
7
8
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.enums import WorkflowPackage

client = AtlanClient()

results = client.workflow.find_by_type(  # (1)
    prefix=WorkflowPackage.SNOWFLAKE_MINER, max_results=5
)
  1. You can find workflows by their type using the workflow client find_by_type()method and providing the prefix for one of the packages. In this example, we do so for the SnowflakeMiner. (You can also specify the maximum number of resulting workflows you want to retrieve as results.)
Retrieve workflows by its type
1
2
var results = WorkflowSearchRequest // (1)
    .findByType(client, SnowflakeMiner.PREFIX, 5); // (2)
  1. You can search for existing workflows through the WorkflowSearchRequest class.
  2. You can find workflows by their type using the findByType() helper method and providing the prefix for one of the packages. In this example, we do so for the SnowflakeMiner. (You can also specify the maximum number of resulting workflows you want to retrieve as results.) Because this operation will retrieve information from Atlan, you must provide it an AtlanClient through which to connect to the tenant.
POST /api/service/workflows/indexsearch
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
   "from": 0,
   "size": 5, // (1)
   "track_total_hits": true,
   "query": {
       "bool": {
       "filter": [
           {
           "nested": {
               "path": "metadata",
               "query": {
               "regexp": {
                   "metadata.name.keyword": {
                   "value": "atlan[-]snowflake[-]miner[-][0-9]{10}"
                   } // (2)
               }
               }
           }
           }
       ]
       }
   },
   "sort": [
       {
       "metadata.creationTimestamp": {
           "order": "desc",
           "nested": {
           "path": "metadata"
           }
       }
       }
   ]
}
  1. Specify the maximum number of resulting workflows you want to retrieve as results.
  2. In this example, we do so for the SnowflakeMiner with regexp: atlan[-]snowflake[-]miner[-][0-9]{10}.

Retrieve all workflow credentials

2.7.0

To retrive all workflow credentials for example, for Snowflake:

Coming soon

Retrieve all workflow credentials
1
2
3
4
5
6
7
8
9
from pyatlan.client.atlan import AtlanClient

client = AtlanClient()

response = client.credentials.get_all( #(1)
    filter={"name": "atlan-snowflake-17891"},
    limit=5,             
    offset=1           
)
  1. To retrieve workflow credentials using the get_all() method. When run without any parameters, it returns all existing records. You can also use following optional parameters to filter, limit, or paginate through the results:

    • (Optional) filter : filters records based on specific key-value criteria, such as {"name": "atlan-snowflake-178691"} to retrieve credentials where workflow name contains atlan-snowflake-178691.
    • (Optional) limit : restricts the maximum number of records returned in a single call, for example, limit=5 retrieves up to 5 records only.
    • (Optional) offset : skips a specified number of records before starting retrieval, such as offset=10 to skip the first 10 records and retrieve from the 11th onward.

Coming soon

GET api/service/credentials?filter=%7B%22name%22%3A%22atlan-snowflake-17891%22%7D&limit=1&offset=1
1
//(1)
  1. All details are in the URL itself.

    URL-encoded filter

    Note that the filter is URL-encoded. Decoded it would be: {"name":"atlan-snowflake-17891"}

Update workflow source credentials

1.8.4 4.0.0

To update workflow source credentials for example, for Snowflake:

Update workflow source credentials
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Credential snowflakeCredential = client.credentials.get( // (1)
    "972a87c1-28d7-8bf2-896d-ea5bd3e9c691"
    ).toCredential()
    .authType("basic") // (2)
    .username("username") // (3)
    .password("password")
    .extra("role", "role-here")
    .extra("warehouse", "warehouse-here")
    .build() // (4)

CredentialResponse response = snowflakeCredential.update(client) // (5)
  1. You can retrieve the workflow credential object by providing its GUID.
  2. You must specify the authentication type of the credential.
  3. You must provide the sensitive details such as the username, password, and extra when updating credentials. This behavior aligns with the Atlan workflow config update UI.
  4. Build the minimal Credential object.
  5. Now, use the update() method of the Credential object to update this new credentials in Atlan after initially testing it for successful validation. Because this operation will update details in Atlan, you must provide it an AtlanClient through which to connect to the tenant.
Update workflow source credentials
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from pyatlan.client.atlan import AtlanClient

client = AtlanClient()

snowflake_credential = client.credentials.get(
    guid="972a87c1-28d7-8bf2-896d-ea5bd3e9c691"
).to_credential() # (1)

# Basic Authentication
snowflake_credential.auth_type = "basic" # (2)
snowflake_credential.username = "username" # (3)
snowflake_credential.password = "password"
snowflake_credential.extras = {
    "role": "role-here",
    "warehouse": "warehouse-here",
}

response = client.credentials.test_and_update( # (4)
    credential=snowflake_credential
)
  1. You can retrieve the workflow credential object by providing its GUID.
  2. You must specify the authentication type of the credential.
  3. You must provide the sensitive details such as the username, password, and extras when updating credentials. This behavior aligns with the Atlan workflow config update UI.
  4. Now, pass the credential object to the test_and_update() method to update this new credentials in Atlan after initially testing it to confirm its successful validation.
Update workflow source credentials
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
val snowflakeCredential = client.credentials.get( // (1)
    "972a87c1-28d7-8bf2-896d-ea5bd3e9c691"
    ).toCredential()
    .authType("basic") // (2)
    .username("username") // (3)
    .password("password")
    .extra("role", "role-here")
    .extra("warehouse", "warehouse-here")
    .build() // (4)

val response = snowflakeCredential.update() // (5)
  1. You can retrieve the workflow credential object by providing its GUID.
  2. You must specify the authentication type of the credential.
  3. You must provide the sensitive details such as the username, password, and extra when updating credentials. This behavior aligns with the Atlan workflow config update UI.
  4. Build the minimal Credential object.
  5. Now, use the update() method of the Credential object to update this new credentials in Atlan after initially testing it for successful validation. Because this operation will update details in Atlan, you must provide it an AtlanClient through which to connect to the tenant.
GET /api/service/credentials/972a87c1-28d7-8bf2-896d-ea5bd3e9c691
1
// (1)
  1. You can retrieve the workflow credential object by providing its GUID.
POST /api/service/credentials/972a87c1-28d7-8bf2-896d-ea5bd3e9c691/test
1
// (1)
  1. You can also test the existing credential authentication by providing its GUID.
POST /api/service/credentials/test
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
{
   "name": "default-snowflake-1735595539-0", // (1)
   "host": "test.snowflake.com",
   "port": 443,
   "authType": "basic",
   "connectorType": "jdbc",
   "username": "test-username", // (2)
   "password": "test-password",
   "extra": {
       "role": "test-role",
       "warehouse": "test-warehouse",
   },
   "connectorConfigName": "atlan-connectors-snowflake"
} 
  1. This example demonstrates how to test & update the source credentials for the Snowflake crawler (basic authentication).

  2. You can update the following credentials fields:

    • username: update with the new username.
    • password: update with the new password.
    • role: update with the new role.
    • warehouse: update with the new warehouse.

Update workflow configuration

2.3.1

To update workflow configuration for example, for Snowflake:

Coming soon

Update workflow configuration
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from pyatlan.client.atlan import AtlanClient

client = AtlanClient()

result = client.workflow.find_by_id(  # (1)
    id="atlan-snowflake-1714638976"
)

workflow_task = result.source.spec.templates[0].dag.tasks[0]
workflow_params = workflow_task.arguments.parameters # (2)

for option in workflow_params:
    if option.name == "enable-lineage": # (3)
        option.value = True

response = client.workflow.update(workflow=result.to_workflow()) # (4)
  1. You can find a workflow by its identifier using the find_by_id() method of the workflow client, providing the id for the specific workflow. In this example, we're retrieving the Snowflake workflow for an update.
  2. Retrieve the workflow template and specific task that you need to update.
  3. Update the specific workflow parameter. In this example, we're enabling lineage for the Snowflake workflow.
  4. Convert the workflow search result object to a workflow object and pass that to the update() method to actually perform the workflow update in Atlan.

Coming soon

Retrieve workflow run

By ID

2.4.2

Retrieve an existing workflow run by its ID:

Coming soon

Retrieve workflow run by its ID
1
2
3
4
5
6
7
from pyatlan.client.atlan import AtlanClient

client = AtlanClient()

result = client.workflow.find_run_by_id(  # (1)
    id="atlan-snowflake-miner-1714638976-mzdza"
)
  1. You can find a workflow run by its identifier using the find_run_by_id() method of the workflow client, providing the id for the specific workflow run. In this example, we're retrieving the existing SnowflakeMiner workflow run.

Coming soon

POST /api/service/runs/indexsearch
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
   "from": 0,
   "size": 1,
   "track_total_hits": true,
   "query": {
       "bool": {
       "filter": [
           {
           "term": {
               "_id": {
               "value": "atlan-snowflake-miner-1714638976-mzdza"
               } // (1)
           }
           }
       ]
       }
   },
   "sort": [
       {
       "metadata.creationTimestamp": {
           "order": "desc",
           "nested": {
           "path": "metadata"
           }
       }
       }
   ]
}
  1. You can find a workflow run by its identifier. In this example, we're retrieving the existing SnowflakeMiner workflow run.

Retrieve all workflow runs

2.1.8

By their phase:

To retrieve all existing workflow runs based on their phase, such as Succeeded, Running, Failed, etc

Coming soon

Retrieve all workflow runs by their phase
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.enums import AtlanWorkflowPhase

client = AtlanClient()

response = client.workflow.get_runs(
    workflow_name="atlan-snowflake-miner-1714638976",
    workflow_phase=AtlanWorkflowPhase.RUNNING,
    from_=0,
    size=100,
) # (1)
  1. To retrieve all existing workflow runs based on their phase, you need to specify:

    • name of the workflow as displayed in the UI, eg: atlan-snowflake-miner-1714638976.
    • phase of the given workflow (e.g: Succeeded, Running, Failed, etc)
    • starting index of the search results (default: 0).
    • maximum number of search results to return (default: 100).

Coming soon

POST /api/service/runs/indexsearch
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
{
   "from": 0, // (1)
   "size": 100, // (2)
   "track_total_hits": true,
   "query": {
       "bool": {
       "must": [
           {
           "nested": {
               "path": "spec",
               "query": {
               "term": {
                   "spec.workflowTemplateRef.name.keyword": {
                   "value": "atlan-snowflake-miner-1714638976"
                   } // (3)
               }
               }
           }
           }
       ],
       "filter": [
           {
           "term": {
               "status.phase.keyword": {
               "value": "Succeeded"
               } // (4)
           }
           }
       ]
       }
   },
   "sort": [
       {
       "metadata.creationTimestamp": {
           "order": "desc",
           "nested": {
           "path": "metadata"
           }
       }
       }
   ]
}
  1. Starting index of the search results (default: 0).
  2. Maximum number of search results to return (default: 100).
  3. Name of the workflow as displayed in the UI, eg: atlan-snowflake-miner-1714638976.
  4. Phase of the given workflow (e.g: Succeeded, Running, Failed, etc)

Stop a running workflow

2.1.8

To stop a running workflow:

Coming soon

Retrieve all workflow runs by their phase
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from pyatlan.client.atlan import AtlanClient

client = AtlanClient()

runs = client.workflow.get_runs(
    workflow_name="atlan-snowflake-miner-1714638976",
    workflow_phase=AtlanWorkflowPhase.RUNNING,
) # (1)

response = client.workflow.stop(
    workflow_run_id=runs[0].id
) # (2)
  1. First, retrieve all existing running workflows.
  2. From the list of existing running workflows, provide the identifier of the specific workflow run to the client.workflow.stop() method, e.g: atlan-snowflake-miner-1714638976-9wfxz.

Coming soon

Delete a workflow

2.1.8 4.0.0

To delete a workflow:

Delete a workflow
1
2
3
client.workflows.archive(
    "atlan-snowflake-miner-1714638976"
); // (1)
  1. To delete an existing workflow, specify the name of the workflow as displayed in the UI (e.g: atlan-snowflake-miner-1714638976).
Delete a workflow
1
2
3
4
5
6
7
from pyatlan.client.atlan import AtlanClient

client = AtlanClient()

client.workflow.delete(
    workflow_name="atlan-snowflake-miner-1714638976"
) # (1)
  1. To delete an existing workflow, specify:

    • name of the workflow as displayed in the UI (e.g: atlan-snowflake-miner-1714638976).
Delete a workflow
1
2
3
client.workflows.archive(
    "atlan-snowflake-miner-1714638976"
) // (1)
  1. To delete an existing workflow, specify the name of the workflow as displayed in the UI (e.g: atlan-snowflake-miner-1714638976).