Skip to content

Manage Kafka assets

In general, these should be:

  • Created in top-down order (connection, then KafkaTopic, then KafkaConsumerGroup)
  • Deleted in bottom-up order (consumer groups, then topics, then connections)1
erDiagram
  Connection ||--o{ KafkaTopic : contains
  KafkaTopic ||--o{ KafkaConsumerGroup : contains

Asset structure

Connection

2.1.4 4.0.0

A Kafka connection requires a name and qualifiedName. For creation, specific settings are also required to distinguish it as a Kafka connection rather than another type of connection. In addition, at least one of adminRoles, adminGroups, or adminUsers must be provided.

Create a Kafka connection
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
String adminRoleGuid = client.getRoleCache().getIdForName("$admin"); // (1)
Connection connection = Connection.creator( // (2)
        "kafka-connection", // (3)
        AtlanConnectorType.KAFKA, // (4)
        List.of(adminRoleGuid), // (5)
        List.of("group2"), // (6)
        List.of("jsmith")) // (7)
    .build();
AssetMutationResponse response = connection.save(client); // (8)
String connectionQualifiedName = response.getCreatedAssets().get(0).getQualifiedName(); // (9)
  1. Retrieve the GUID for the admin role, to use later for defining the roles that can administer the connection.
  2. Build up the minimum request to create a connection.
  3. Provide a human-readable name for your connection, such as production or development.
  4. Set the type of connection to KAFKA.
  5. List the workspace roles that should be able to administer the connection (or null if none). All users with that workspace role (current and future) will be administrators of the connection. Note that the values here need to be the GUID(s) of the workspace role(s). At least one of adminRoles, adminGroups, or adminUsers must be provided.
  6. List the group names that can administer this connection (or null if none). All users within that group (current and future) will be administrators of the connection. Note that the values here are the name(s) of the group(s). At least one of adminRoles, adminGroups, or adminUsers must be provided.
  7. List the user names that can administer this connection (or null if none). Note that the values here are the username(s) of the user(s). At least one of adminRoles, adminGroups, or adminUsers must be provided.
  8. Actually call Atlan to create the connection. Because this operation will persist the asset in Atlan, you must provide it an AtlanClient through which to connect to the tenant.
  9. Retrieve the qualifiedName for use in subsequent creation calls. (You'd probably want to do some null checking first.)
Create a Kafka connection
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from pyatlan.cache.role_cache import RoleCache
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.assets import Connection, KafkaTopic, KafkaConsumerGroup
from pyatlan.model.enums import AtlanConnectorType

admin_role_guid = RoleCache.get_id_for_name("$admin") # (1)
connection = Connection.creator( # (2)
    name = "kafka-connection", # (3)
    connector_type = AtlanConnectorType.KAFKA, # (4)
    admin_roles = [admin_role_guid], # (5)
    admin_groups = ["group2"], # (6)
    admin_users = ["jsmith"] # (7)
)

response = client.asset.save(connection) # (8)
connection_qualified_name = response.assets_created(asset_type=Connection)[0].qualified_name # (9)
  1. Retrieve the GUID for the admin role, to use later for defining the roles that can administer the connection.
  2. Build up the minimum request to create a connection.
  3. Provide a human-readable name for your connection, such as production or development.
  4. Set the type of connection to KAFKA.
  5. List the workspace roles that should be able to administer the connection (or None if none). All users with that workspace role (current and future) will be administrators of the connection. Note that the values here need to be the GUID(s) of the workspace role(s). At least one of admin_roles, admin_groups, or admin_users must be provided.
  6. List the group names that can administer this connection (or None if none). All users within that group (current and future) will be administrators of the connection. Note that the values here are the name(s) of the group(s). At least one of admin_roles, admin_groups, or admin_users must be provided.
  7. List the user names that can administer this connection (or None if none). Note that the values here are the username(s) of the user(s). At least one of admin_roles, admin_groups, or admin_users must be provided.
  8. Actually call Atlan to create the connection.
  9. Retrieve the qualifiedName for use in subsequent creation calls. (You'd probably want to do some None checking first.)
Create a Kafka connection
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
val adminRoleGuid = client.roleCache.getIdForName("\$admin") // (1)
val connection = Connection.creator( // (2)
        "kafka-connection", // (3)
        AtlanConnectorType.KAFKA, // (4)
        listOf(adminRoleGuid), // (5)
        listOf("group2"), // (6)
        listOf("jsmith")) // (7)
    .build()
val response = connection.save(client) // (8)
val connectionQualifiedName = response.createdAssets[0].qualifiedName // (9)
  1. Retrieve the GUID for the admin role, to use later for defining the roles that can administer the connection.
  2. Build up the minimum request to create a connection.
  3. Provide a human-readable name for your connection, such as production or development.
  4. Set the type of connection to KAFKA.
  5. List the workspace roles that should be able to administer the connection (or null if none). All users with that workspace role (current and future) will be administrators of the connection. Note that the values here need to be the GUID(s) of the workspace role(s). At least one of adminRoles, adminGroups, or adminUsers must be provided.
  6. List the group names that can administer this connection (or null if none). All users within that group (current and future) will be administrators of the connection. Note that the values here are the name(s) of the group(s). At least one of adminRoles, adminGroups, or adminUsers must be provided.
  7. List the user names that can administer this connection (or null if none). Note that the values here are the username(s) of the user(s). At least one of adminRoles, adminGroups, or adminUsers must be provided.
  8. Actually call Atlan to create the connection. Because this operation will persist the asset in Atlan, you must provide it an AtlanClient through which to connect to the tenant.
  9. Retrieve the qualifiedName for use in subsequent creation calls. (You'd probably want to do some null checking first.)
POST /api/meta/entity/bulk
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
  "entities": [
    {
      "typeName": "Connection", // (1)
      "attributes": {
        "name": "kafka-connection", // (2)
        "connectorName": "kafka", // (3)
        "qualifiedName": "default/kafka/123456789", // (4)
        "category": "eventbus", // (5)
        "adminRoles": [ // (6)
          "e7ae0295-c60a-469a-bd2c-fb903943aa02"
        ],
        "adminGroups": [ // (7)
          "group2"
        ],
        "adminUsers": [ // (8)
          "jsmith"
        ]
      }
    }
  ]
}
  1. The typeName must be exactly Connection.
  2. Human-readable name for your connection, such as production or development.
  3. The connectorName must be exactly kafka.
  4. The qualifiedName should follow the pattern: default/kafka/<epoch>, where <epoch> is the time in milliseconds at which the connection is being created.
  5. The category must be eventbus.
  6. List any workspace roles that can administer this connection. All users with that workspace role (current and future) will be administrators of the connection. Note that the values here need to be the GUID(s) of the workspace role(s). At least one of adminRoles, adminGroups, or adminUsers must be provided.
  7. List any groups that can administer this connection. All users within that group (current and future) will be administrators of the connection. Note that the values here are the name(s) of the group(s). At least one of adminRoles, adminGroups, or adminUsers must be provided.
  8. List any users that can administer this connection. Note that the values here are the username(s) of the user(s). At least one of adminRoles, adminGroups, or adminUsers must be provided.

Access policies

Atlan creates the policies that grant access to a connection, including the ability to retrieve the connection and to create assets within it, asynchronously. It can take several seconds (even up to approximately 30 seconds) before these are in place after creating the connection.

You may therefore need to wait before you'll be able to create the assets below within the connection.

To confirm access, retrieve the connection after it has been created. The SDKs' retry loops will automatically retry until the connection can be successfully retrieved. At that point, your API token has permission to create the other assets.

Note: if you are reusing an existing connection rather than creating one via your API token, you must give your API token a persona that has access to that connection. Otherwise all attempts to create, read, update, or delete assets within that connection will fail due to a lack of permissions.

KafkaTopic

2.2.2 4.0.0

A KafkaTopic requires a name and a qualifiedName. For creation, you also need to specify the connectionQualifiedName of the connection for the topic.

Create a Kafka topic
11
12
13
14
15
16
KafkaTopic kafkaTopic = KafkaTopic.creator( // (1)
        "myKafkaTopic", // (2)
        connectionQualifiedName) // (3)
    .build();
response = kafkaTopic.save(client); // (4)
kafkaTopic = response.getResult(kafkaTopic); // (5)
  1. Build up the minimum request to create a topic.
  2. Provide a human-readable name for your topic.
  3. Provide the qualifiedName of the Kafka connection.
  4. Actually call Atlan to create the topic. Because this operation will persist the asset in Atlan, you must provide it an AtlanClient through which to connect to the tenant.
  5. Retrieve the created topic for use in subsequent creation calls.
Create a Kafka topic
17
18
19
20
21
22
kafka_topic = KafkaTopic.creator( # (1)
    name="myKafkaTopic", # (2)
    connection_qualified_name=connection_qualified_name # (3)
)
response = client.asset.save(kafka_topic) # (4)
kafka_topic_qualifed_name = response.assets_created(asset_type=KafkaTopic)[0].qualified_name # (5)
  1. Build up the minimum request to create a topic.
  2. Provide a human-readable name for your topic.
  3. Provide the qualifiedName of the Kafka connection.
  4. Actually call Atlan to create the topic.
  5. Retrieve the created topic for use in subsequent creation calls. (You'd probably want to do some None checking first.)
Create a Kafka topic
11
12
13
14
15
16
var kafkaTopic = KafkaTopic.creator( // (1)
        "myKafkaTopic", // (2)
        connectionQualifiedName) // (3)
    .build()
response = kafkaTopic.save(client) // (4)
kafkaTopic = response.getResult(kafkaTopic) // (5)
  1. Build up the minimum request to create a topic.
  2. Provide a human-readable name for your topic.
  3. Provide the qualifiedName of the Kafka connection.
  4. Actually call Atlan to create the topic. Because this operation will persist the asset in Atlan, you must provide it an AtlanClient through which to connect to the tenant.
  5. Retrieve the created topic for use in subsequent creation calls.
POST /api/meta/entity/bulk
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{
  "entities": [
    {
      "typeName": "KafkaTopic", // (1)
      "attributes": {
        "name": "myKafkaTopic", // (2)
        "qualifiedName": "default/kafka/123456789/topic/myKafkaTopic", // (3)
        "connectionQualifiedName": "default/kafka/123456789", // (4)
        "connectorName": "kafka" // (5)
      }
    }
  ]
}
  1. The typeName must be exactly KafkaTopic.
  2. Human-readable name for your topic.
  3. The qualifiedName should follow the pattern: default/kafka/<epoch>/topic/<name>, where default/kafka/<epoch> is the qualifiedName of the connection for this topic and <name> is the unique name for this topic.
  4. The connectionQualifiedName must be the exact qualifiedName of the connection for this topic.
  5. The connectorName must be exactly kafka.

KafkaConsumerGroup

2.2.2 4.0.0

A KafkaConsumerGroup requires a name and a qualifiedName. For creation, you also need to specify the list of kafkaTopicQualifiedNames of the topics that will contain the consumer group.

Create a Kafka consumer group
17
18
19
20
21
KafkaConsumerGroup consumerGroup = KafkaConsumerGroup.creatorObj( // (1)
        "myKafkaConsumerGroup", // (2)
        List.of(kafkaTopic)) // (3)
    .build();
response = consumerGroup.save(client); // (4)
  1. Build up the minimum request to create a consumer group.
  2. Provide a human-readable name for your consumer group.
  3. Provide the list of topics for this consumer group.
  4. Actually call Atlan to create the consumer group. Because this operation will persist the asset in Atlan, you must provide it an AtlanClient through which to connect to the tenant.
Create a Kafka consumer group
23
24
25
26
27
consumer_group = KafkaConsumerGroup.creator( # (1)
    name="myKafkaConsumerGroup", # (2)
    kafka_topic_qualified_names=[kafka_topic_qualified_name], # (3)
)
response = client.asset.save(consumer_group) # (4)
  1. Build up the minimum request to create a consumer group.
  2. Provide a human-readable name for your consumer group.
  3. Provide the list of qualified_names of the topic for this consumer group.
  4. Actually call Atlan to create the consumer group.
Create a Kafka consumer group
17
18
19
20
21
val consumerGroup = KafkaConsumerGroup.creatorObj( // (1)
        "myKafkaConsumerGroup", // (2)
        listOf(kafkaTopic)) // (3)
    .build()
response = consumerGroup.save(client) // (4)
  1. Build up the minimum request to create a consumer group.
  2. Provide a human-readable name for your consumer group.
  3. Provide the list of topics for this consumer group.
  4. Actually call Atlan to create the consumer group. Because this operation will persist the asset in Atlan, you must provide it an AtlanClient through which to connect to the tenant.
POST /api/meta/entity/bulk
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
{
  "entities": [
    {
      "typeName": "KafkaConsumerGroup", // (1)
      "attributes": {
        "name": "myKafkaConsumerGroup", // (2)
        "qualifiedName": "default/kafka/123456789/consumer-group/myKafkaConsumerGroup", // (3)
        "connectionQualifiedName": "default/kafka/123456789", // (4)
        "connectorName": "kafka", // (5)
        "kafkaTopics": [{ // (6)
          "typeName": "KafkaTopic", // (7)
          "uniqueAttributes": { // (8)
            "qualifiedName": "default/kafka/123456789/topic/myKafkaTopic",
          }
        }],
        "kafkaTopicQualifiedNames": ["default/kafka/123456789/topic/myKafkaTopic"], // (9)
      }
    }
  ]
}
  1. The typeName must be exactly KafkaConsumerGroup.
  2. Human-readable name for your consumer group.
  3. The qualifiedName should follow the pattern: default/kafka/<epoch>/consumer-group/<name>, where default/kafka/<epoch> is the qualifiedName of the connection that contains this consumer group and <name> is the unique name for this consumer group.
  4. The connectionQualifiedName must be the exact qualifiedName of the connection for this consumer group.
  5. The connectorName must be exactly kafka.
  6. The list of topics in which this consumer group exists is embedded in the kafkaTopics attribute.
  7. The typeName for this embedded reference must be KafkaTopic.
  8. To complete the reference, you must include a uniqueAttributes with the qualifiedName of the topic. Note: the topic must already exist in Atlan before creating the consumer group.
  9. The list of topic qualified names.

Available relationships

Every level of the Kafka structure is an Asset, and can therefore be related to the following other assets.

erDiagram
  Asset }o--o{ AtlasGlossaryTerm : meanings
  Asset ||--o{ Link : links
  Asset ||--o| Readme : readme
  Asset }o--o{ Process : inputToProcesses
  Asset }o--o{ Process : outputFromProcesses

AtlasGlossaryTerm

A glossary term provides meaning to an asset. The link terms to assets snippet provides more detail on setting this relationship.

A link provides additional context to an asset, by providing a URL to additional information.

Readme

A README provides rich documentation for an asset. The add asset READMEs snippet provides more detail on setting this relationship.

Process

A process provides lineage information for an asset. An asset can be both an input and an output for one or more processes. The lineage snippets provide more detail on creating and working with lineage.


  1. Although if you want to delete everything in a connection,your better avenue is the packaged connection delete utility in the UI.