OpenLineage¶
Out of the box¶
Atlan supports integrated lineage through the OpenLineage standard out-of-the-box for a number of sources:
If you want to integrate lineage from any of these tools, simply follow the linked instructions.
Specification¶
On the other hand, if you want to add lineage support to some other tooling you can do so by following the OpenLineage standard's specification.
Also available via SDKs
We are working on exposing some simplified ways to integrate via OpenLineage using our SDKs as well.
To integrate via OpenLineage, you need to adhere to three main points:
Format¶
The format of payloads you send containing lineage information must match the OpenLineage standard. Specifically, they must minimally contain:
- A job — a process that consumes or produces datasets.
- A run — an instance of a job that represents one of its occurrences in time.
- At least one of the payloads for a given run should contain input and output datasets (the sources and targets of the lineage)
- All payloads are wrapped up into an event
They will look something like this:
{
"eventTime": "2024-07-01T08:23:37.491542Z", // (1)
"producer": "https://github.com/some/example", // (2)
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "START", // (3)
"job": {
"namespace": "ol-spark", // (4)
"name": "test-job-006", // (5)
"facets": {}
},
"run": {
"runId": "eefd52c3-5871-4f0e-8ff5-237e9a6efb53", // (6)
"facets": {}
},
"inputs": [
{
"namespace": "snowflake://abc123.snowflakecomputing.com", // (7)
"name": "RAW.WIDEWORLDIMPORTERS_SALESFORCE.ORG_EMAIL_ADDRESS_SECURITY", // (8)
"facets": {}
}
],
"outputs": [ // (9)
{
"namespace": "snowflake://abc123.snowflakecomputing.com",
"name": "ANALYTICS.WIDE_WORLD_IMPORTERS.new view",
"facets": {
"columnLineage": { // (10)
"_producer": "https://github.com/atlanhq/atlan-java",
"_schemaURL": "https://openlineage.io/spec/facets/1-1-0/ColumnLineageDatasetFacet.json#/$defs/ColumnLineageDatasetFacet",
"fields": {
"StockItemID": { // (11)
"inputFields": [
{
"namespace": "snowflake://abc123.snowflakecomputing.com",
"name": "RAW.WIDEWORLDIMPORTERS_SALESFORCE.ORG_EMAIL_ADDRESS_SECURITY",
"field": "ID" // (12)
},
{
"namespace": "snowflake://abc123.snowflakecomputing.com",
"name": "RAW.WIDEWORLDIMPORTERS_SALESFORCE.ORG_EMAIL_ADDRESS_SECURITY",
"field": "PARENT_ID"
}
]
},
"TargetStockLevel": {
"inputFields": [
{
"namespace": "snowflake://abc123.snowflakecomputing.com",
"name": "RAW.WIDEWORLDIMPORTERS_SALESFORCE.ORG_EMAIL_ADDRESS_SECURITY",
"field": "SYSTEM_MODSTAMP"
}
]
}
}
}
}
}
]
}
- The time at which the event occurred, in ISO-8601 format.
- A unique URI of what was responsible for triggering the event, for example a specific piece of code.
- The type of the event (e.g.
START
vsCOMPLETE
). - The name of the connection in which this lineage process should exist.
- A unique name for the lineage process. This acts as an idempotent business key: the first time it is used, a lineage process will be created. Any subsequent use of the same job name will cause a new run for that same job to be tracked.
- A unique identifier for the run of the job this event relates to. This must be kept constant between events for the same run (for example, the same
runId
should be used for both aSTART
and aCOMPLETE
event to show when a job run has started and when it has completed). - Inputs (sources) for the data lineage. The
namespace
of a dataset should follow the source-specific naming standards of OpenLineage . - The
name
of a dataset should use a.
-qualified form. For example, a table should beDATABASE_NAME.SCHEMA_NAME.TABLE_NAME
. - Outputs (targets) for the dtaa lineage. The
namespace
andname
should follow the same conventions as the inputs. - If you want to track column-level lineage, note that this is only specified on the target (outputs) end of the lineage.
- Each field used as a key in the
fields
object is the name of a field (column) in the output dataset. - The
inputFields
list within then defines all input (source) fields that map to this output field in column-level lineage.
Airflow has a more complex, hierarchical structure
The general structure above applies to Spark, in particular. For Airflow, there is a nested hierarchical structure that differentiates between an overall DAG and its individual tasks. Each DAG and each task will need to follow the points outlined here, and in addition there are further requirements to link together the DAG and its tasks using additional OpenLineage facets . For now, these are beyond the scope of this document — if you want to integrate Airflow specifically, we would recommend using one of the out-of-the-box integrations linked above.
Events¶
For any given run, there must be at least two events:
START
to indicate that a run has begun- One of the following to indicate that the run has finished:
COMPLETE
to signify that execution of the run has concludedABORT
to signify the run has been stopped abnormallyFAIL
to signify the run has failed
Atlan's OpenLineage processing will merge all inputs and outputs across all events for a given run to construct the lineage for that run. The merge will happen only when a completion event has been received; and completion events will only be processed if there is a START
event for that same run.
Only provide inputs and outputs in one of the events
For simplicity, this means you only need to provide the inputs and outputs for lineage in one of the events. For example, if you provide them in the start event (like in the example above), then the completion event can be as simple as:
{
"eventTime": "2024-07-01T08:23:38.360567Z", // (1)
"producer": "https://github.com/some/example",
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "COMPLETE",
"run": {
"runId": "eefd52c3-5871-4f0e-8ff5-237e9a6efb53", // (2)
"facets": {}
},
"job": { // (3)
"namespace": "ol-spark",
"name": "test-job-006",
"facets": {}
}
}
- The time at which the job run finished.
- The
runId
must match therunId
used in the event marking the start of the job run. - The job details must match those used in the event marking the start of the job run.
Naming¶
Finally, the names used in the payloads must align to assets in Atlan as follows:
- The
namespace
of a job must match the name of a Spark or Airflow connection in Atlan - The
namespace
of a dataset should follow the source-specific naming standards of OpenLineage - The
name
of a dataset should use a.
-qualified form. For example, a table should beDATABASE_NAME.SCHEMA_NAME.TABLE_NAME
What if an asset used in lineage has not yet been crawled into Atlan?
The connection you refer to must exist in Atlan before you can emit OpenLineage events for it. In practice, this means you must first configure OpenLineage, for example, for Spark assets before sending any events with a job namespace
that refers to such a connection.
However, any input or output datasets that do not (yet) exist in Atlan will be created automatically as part of the lineage processing — but only as partial assets . This means they will appear in lineage, but not be discoverable or able to be enriched in any other way in Atlan's UI.
Once such assets are crawled, they will be promoted automatically to "full" (not partial) assets, and then they will be discoverable and can be enriched just like any other asset in Atlan.