diff --git a/build.gradle b/build.gradle
index e3c4f5efe6bb63..be4d7ee8a562b9 100644
--- a/build.gradle
+++ b/build.gradle
@@ -48,6 +48,7 @@ buildscript {
// see also datahub-frontend/play.gradle
ext.playVersion = '2.8.22'
ext.playScalaVersion = '2.13'
+ ext.akkaVersion = '2.6.21' // 2.7.0+ has incompatible license
ext.log4jVersion = '2.23.1'
ext.slf4jVersion = '1.7.36'
ext.logbackClassic = '1.4.14'
@@ -105,7 +106,14 @@ project.ext.spec = [
]
project.ext.externalDependency = [
- 'akkaHttp': "com.typesafe.akka:akka-http-core_$playScalaVersion:10.2.10",
+ 'akkaHttp': "com.typesafe.akka:akka-http-core_$playScalaVersion:10.2.10", // max version due to licensing
+ 'akkaActor': "com.typesafe.akka:akka-actor_$playScalaVersion:$akkaVersion",
+ 'akkaStream': "com.typesafe.akka:akka-stream_$playScalaVersion:$akkaVersion",
+ 'akkaActorTyped': "com.typesafe.akka:akka-actor-typed_$playScalaVersion:$akkaVersion",
+ 'akkaSlf4j': "com.typesafe.akka:akka-slf4j_$playScalaVersion:$akkaVersion",
+ 'akkaJackson': "com.typesafe.akka:akka-serialization-jackson_$playScalaVersion:$akkaVersion",
+ 'akkaParsing': "com.typesafe.akka:akka-parsing_$playScalaVersion:$akkaVersion",
+ 'akkaProtobuf': "com.typesafe.akka:akka-protobuf-v3_$playScalaVersion:$akkaVersion",
'antlr4Runtime': 'org.antlr:antlr4-runtime:4.9.3',
'antlr4': 'org.antlr:antlr4:4.9.3',
'assertJ': 'org.assertj:assertj-core:3.11.1',
diff --git a/datahub-frontend/play.gradle b/datahub-frontend/play.gradle
index 266962721a80a8..d513c3c232d9a0 100644
--- a/datahub-frontend/play.gradle
+++ b/datahub-frontend/play.gradle
@@ -55,6 +55,13 @@ dependencies {
implementation externalDependency.antlr4Runtime
implementation externalDependency.antlr4
implementation externalDependency.akkaHttp
+ implementation externalDependency.akkaActor
+ implementation externalDependency.akkaStream
+ implementation externalDependency.akkaActorTyped
+ implementation externalDependency.akkaSlf4j
+ implementation externalDependency.akkaJackson
+ implementation externalDependency.akkaParsing
+ implementation externalDependency.akkaProtobuf
implementation externalDependency.jerseyCore
implementation externalDependency.jerseyGuava
diff --git a/docs/automations/snowflake-tag-propagation.md b/docs/automations/snowflake-tag-propagation.md
index b72224642b0f07..8eded451644cce 100644
--- a/docs/automations/snowflake-tag-propagation.md
+++ b/docs/automations/snowflake-tag-propagation.md
@@ -4,6 +4,8 @@ import FeatureAvailability from '@site/src/components/FeatureAvailability';
+> Note that this Automation in currently in open **Beta**. With any questions or issues, please reach out to your Acryl representative.
+
## Introduction
Snowflake Tag Propagation is an automation that allows you to sync DataHub Glossary Terms and Tags on
@@ -15,6 +17,41 @@ both columns and tables back to Snowflake. This automation is available in DataH
- Automatically Add DataHub Tags to Snowflake Tables and Columns
- Automatically Remove DataHub Glossary Terms and Tags from Snowflake Tables and Columns when they are removed in DataHub
+## Prerequisites
+
+### Permissions Required for Tag Management
+
+- `CREATE TAG`: Required to create new tags in Snowflake.
+Ensure the user or role has this privilege on the specific schema or database where tags will be created.
+- `APPLY TAG`: Required to assign tags to Snowflake objects such as tables, columns, or other database objects.
+This permission must be granted at the database, schema, or object level depending on the scope.
+
+
+### Permissions Required for Object Access
+
+- `USAGE` on the database and schema: Allows access to the database and schema to view and apply changes.
+- `SELECT` on the objects (tables, views, etc.): Enables the automation to read metadata and verify existing tags.
+
+### Example Permission Grant Statements
+
+To grant the necessary permissions for a specific role (DATAHUB_AUTOMATION_ROLE), you can use the following SQL commands:
+
+```sql
+-- Tag management permissions
+GRANT CREATE TAG ON SCHEMA your_database.your_schema TO ROLE DATAHUB_AUTOMATION_ROLE;
+GRANT APPLY TAG ON SCHEMA your_database.your_schema TO ROLE DATAHUB_AUTOMATION_ROLE;
+
+-- Object access for metadata operations
+GRANT USAGE ON DATABASE your_database TO ROLE DATAHUB_AUTOMATION_ROLE;
+GRANT USAGE ON SCHEMA your_database.your_schema TO ROLE DATAHUB_AUTOMATION_ROLE;
+GRANT SELECT ON ALL TABLES IN SCHEMA your_database.your_schema TO ROLE DATAHUB_AUTOMATION_ROLE;
+
+-- Future privileges for tagging
+GRANT SELECT ON FUTURE TABLES IN SCHEMA your_database.your_schema TO ROLE DATAHUB_AUTOMATION_ROLE;
+GRANT APPLY TAG ON FUTURE TABLES IN SCHEMA your_database.your_schema TO ROLE DATAHUB_AUTOMATION_ROLE;
+```
+
+
## Enabling Snowflake Tag Sync
1. **Navigate to Automations**: Click on 'Govern' > 'Automations' in the navigation bar.
diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py
index e097fd1f221ea5..6330fe0291660d 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/feast.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py
@@ -42,10 +42,14 @@
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
BrowsePathsClass,
+ GlobalTagsClass,
MLFeaturePropertiesClass,
MLFeatureTablePropertiesClass,
MLPrimaryKeyPropertiesClass,
+ OwnerClass,
+ OwnershipClass,
StatusClass,
+ TagAssociationClass,
)
# FIXME: ValueType module cannot be used as a type
@@ -91,6 +95,24 @@ class FeastRepositorySourceConfig(ConfigModel):
environment: str = Field(
default=DEFAULT_ENV, description="Environment to use when constructing URNs"
)
+ # owner_mappings example:
+ # This must be added to the recipe in order to extract owners, otherwise NO owners will be extracted
+ # owner_mappings:
+ # - feast_owner_name: ""
+ # datahub_owner_urn: "urn:li:corpGroup:"
+ # datahub_ownership_type: "BUSINESS_OWNER"
+ owner_mappings: Optional[List[Dict[str, str]]] = Field(
+ default=None, description="Mapping of owner names to owner types"
+ )
+ enable_owner_extraction: bool = Field(
+ default=False,
+ description="If this is disabled, then we NEVER try to map owners. "
+ "If this is enabled, then owner_mappings is REQUIRED to extract ownership.",
+ )
+ enable_tag_extraction: bool = Field(
+ default=False,
+ description="If this is disabled, then we NEVER try to extract tags.",
+ )
@platform_name("Feast")
@@ -215,10 +237,15 @@ def _get_entity_workunit(
"""
feature_view_name = f"{self.feature_store.project}.{feature_view.name}"
+ aspects = (
+ [StatusClass(removed=False)]
+ + self._get_tags(entity)
+ + self._get_owners(entity)
+ )
entity_snapshot = MLPrimaryKeySnapshot(
urn=builder.make_ml_primary_key_urn(feature_view_name, entity.name),
- aspects=[StatusClass(removed=False)],
+ aspects=aspects,
)
entity_snapshot.aspects.append(
@@ -243,10 +270,11 @@ def _get_feature_workunit(
Generate an MLFeature work unit for a Feast feature.
"""
feature_view_name = f"{self.feature_store.project}.{feature_view.name}"
+ aspects = [StatusClass(removed=False)] + self._get_tags(field)
feature_snapshot = MLFeatureSnapshot(
urn=builder.make_ml_feature_urn(feature_view_name, field.name),
- aspects=[StatusClass(removed=False)],
+ aspects=aspects,
)
feature_sources = []
@@ -295,13 +323,18 @@ def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkU
"""
feature_view_name = f"{self.feature_store.project}.{feature_view.name}"
+ aspects = (
+ [
+ BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]),
+ StatusClass(removed=False),
+ ]
+ + self._get_tags(feature_view)
+ + self._get_owners(feature_view)
+ )
feature_view_snapshot = MLFeatureTableSnapshot(
urn=builder.make_ml_feature_table_urn("feast", feature_view_name),
- aspects=[
- BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]),
- StatusClass(removed=False),
- ],
+ aspects=aspects,
)
feature_view_snapshot.aspects.append(
@@ -360,6 +393,64 @@ def _get_on_demand_feature_view_workunit(
return MetadataWorkUnit(id=on_demand_feature_view_name, mce=mce)
+ # If a tag is specified in a Feast object, then the tag will be ingested into Datahub if enable_tag_extraction is
+ # True, otherwise NO tags will be ingested
+ def _get_tags(self, obj: Union[Entity, FeatureView, FeastField]) -> list:
+ """
+ Extracts tags from the given object and returns a list of aspects.
+ """
+ aspects: List[Union[GlobalTagsClass]] = []
+
+ # Extract tags
+ if self.source_config.enable_tag_extraction:
+ if obj.tags.get("name"):
+ tag_name: str = obj.tags["name"]
+ tag_association = TagAssociationClass(
+ tag=builder.make_tag_urn(tag_name)
+ )
+ global_tags_aspect = GlobalTagsClass(tags=[tag_association])
+ aspects.append(global_tags_aspect)
+
+ return aspects
+
+ # If an owner is specified in a Feast object, it will only be ingested into Datahub if owner_mappings is specified
+ # and enable_owner_extraction is True in FeastRepositorySourceConfig, otherwise NO owners will be ingested
+ def _get_owners(self, obj: Union[Entity, FeatureView, FeastField]) -> list:
+ """
+ Extracts owners from the given object and returns a list of aspects.
+ """
+ aspects: List[Union[OwnershipClass]] = []
+
+ # Extract owner
+ if self.source_config.enable_owner_extraction:
+ owner = getattr(obj, "owner", None)
+ if owner:
+ # Create owner association, skipping if None
+ owner_association = self._create_owner_association(owner)
+ if owner_association: # Only add valid owner associations
+ owners_aspect = OwnershipClass(owners=[owner_association])
+ aspects.append(owners_aspect)
+
+ return aspects
+
+ def _create_owner_association(self, owner: str) -> Optional[OwnerClass]:
+ """
+ Create an OwnerClass instance for the given owner using the owner mappings.
+ """
+ if self.source_config.owner_mappings is not None:
+ for mapping in self.source_config.owner_mappings:
+ if mapping["feast_owner_name"] == owner:
+ ownership_type_class: str = mapping.get(
+ "datahub_ownership_type", "TECHNICAL_OWNER"
+ )
+ datahub_owner_urn = mapping.get("datahub_owner_urn")
+ if datahub_owner_urn:
+ return OwnerClass(
+ owner=datahub_owner_urn,
+ type=ownership_type_class,
+ )
+ return None
+
@classmethod
def create(cls, config_dict, ctx):
config = FeastRepositorySourceConfig.parse_obj(config_dict)
diff --git a/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json b/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json
index 1b91925289845b..a4fd9843c5cf49 100644
--- a/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json
+++ b/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json
@@ -9,8 +9,33 @@
"removed": false
}
},
+ {
+ "com.linkedin.pegasus2avro.common.GlobalTags": {
+ "tags": [
+ {
+ "tag": "urn:li:tag:deprecated"
+ }
+ ]
+ }
+ },
+ {
+ "com.linkedin.pegasus2avro.common.Ownership": {
+ "owners": [
+ {
+ "owner": "urn:li:corpGroup:MOCK_OWNER",
+ "type": "BUSINESS_OWNER"
+ }
+ ],
+ "ownerTypes": {},
+ "lastModified": {
+ "time": 0,
+ "actor": "urn:li:corpuser:unknown"
+ }
+ }
+ },
{
"com.linkedin.pegasus2avro.ml.metadata.MLPrimaryKeyProperties": {
+ "customProperties": {},
"description": "Driver ID",
"dataType": "ORDINAL",
"sources": [
@@ -23,7 +48,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "feast-repository-test"
+ "runId": "feast-repository-test",
+ "lastRunId": "no-run-id-provided"
}
},
{
@@ -36,8 +62,18 @@
"removed": false
}
},
+ {
+ "com.linkedin.pegasus2avro.common.GlobalTags": {
+ "tags": [
+ {
+ "tag": "urn:li:tag:needs_documentation"
+ }
+ ]
+ }
+ },
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
+ "customProperties": {},
"description": "Conv rate",
"dataType": "CONTINUOUS",
"sources": [
@@ -50,7 +86,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "feast-repository-test"
+ "runId": "feast-repository-test",
+ "lastRunId": "no-run-id-provided"
}
},
{
@@ -65,6 +102,7 @@
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
+ "customProperties": {},
"description": "Acc rate",
"dataType": "CONTINUOUS",
"sources": [
@@ -77,7 +115,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "feast-repository-test"
+ "runId": "feast-repository-test",
+ "lastRunId": "no-run-id-provided"
}
},
{
@@ -92,6 +131,7 @@
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
+ "customProperties": {},
"description": "Avg daily trips",
"dataType": "ORDINAL",
"sources": [
@@ -104,7 +144,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "feast-repository-test"
+ "runId": "feast-repository-test",
+ "lastRunId": "no-run-id-provided"
}
},
{
@@ -119,6 +160,7 @@
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
+ "customProperties": {},
"description": "String feature",
"dataType": "TEXT",
"sources": [
@@ -131,7 +173,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "feast-repository-test"
+ "runId": "feast-repository-test",
+ "lastRunId": "no-run-id-provided"
}
},
{
@@ -151,6 +194,30 @@
"removed": false
}
},
+ {
+ "com.linkedin.pegasus2avro.common.GlobalTags": {
+ "tags": [
+ {
+ "tag": "urn:li:tag:deprecated"
+ }
+ ]
+ }
+ },
+ {
+ "com.linkedin.pegasus2avro.common.Ownership": {
+ "owners": [
+ {
+ "owner": "urn:li:corpGroup:MOCK_OWNER",
+ "type": "BUSINESS_OWNER"
+ }
+ ],
+ "ownerTypes": {},
+ "lastModified": {
+ "time": 0,
+ "actor": "urn:li:corpuser:unknown"
+ }
+ }
+ },
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": {
"customProperties": {},
@@ -170,7 +237,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "feast-repository-test"
+ "runId": "feast-repository-test",
+ "lastRunId": "no-run-id-provided"
}
},
{
@@ -189,7 +257,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "feast-repository-test"
+ "runId": "feast-repository-test",
+ "lastRunId": "no-run-id-provided"
}
},
{
@@ -204,6 +273,7 @@
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
+ "customProperties": {},
"dataType": "CONTINUOUS",
"sources": [
"urn:li:dataset:(urn:li:dataPlatform:request,vals_to_add,PROD)",
@@ -216,7 +286,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "feast-repository-test"
+ "runId": "feast-repository-test",
+ "lastRunId": "no-run-id-provided"
}
},
{
@@ -231,6 +302,7 @@
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
+ "customProperties": {},
"dataType": "CONTINUOUS",
"sources": [
"urn:li:dataset:(urn:li:dataPlatform:request,vals_to_add,PROD)",
@@ -243,7 +315,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "feast-repository-test"
+ "runId": "feast-repository-test",
+ "lastRunId": "no-run-id-provided"
}
},
{
@@ -278,7 +351,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "feast-repository-test"
+ "runId": "feast-repository-test",
+ "lastRunId": "no-run-id-provided"
}
},
{
@@ -297,7 +371,40 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
- "runId": "feast-repository-test"
+ "runId": "feast-repository-test",
+ "lastRunId": "no-run-id-provided"
+ }
+},
+{
+ "entityType": "tag",
+ "entityUrn": "urn:li:tag:deprecated",
+ "changeType": "UPSERT",
+ "aspectName": "tagKey",
+ "aspect": {
+ "json": {
+ "name": "deprecated"
+ }
+ },
+ "systemMetadata": {
+ "lastObserved": 1586847600000,
+ "runId": "feast-repository-test",
+ "lastRunId": "no-run-id-provided"
+ }
+},
+{
+ "entityType": "tag",
+ "entityUrn": "urn:li:tag:needs_documentation",
+ "changeType": "UPSERT",
+ "aspectName": "tagKey",
+ "aspect": {
+ "json": {
+ "name": "needs_documentation"
+ }
+ },
+ "systemMetadata": {
+ "lastObserved": 1586847600000,
+ "runId": "feast-repository-test",
+ "lastRunId": "no-run-id-provided"
}
}
]
\ No newline at end of file
diff --git a/metadata-ingestion/tests/integration/feast/feature_store/data/registry.db b/metadata-ingestion/tests/integration/feast/feature_store/data/registry.db
index a511ff56c97705..5dca29d92afe53 100644
Binary files a/metadata-ingestion/tests/integration/feast/feature_store/data/registry.db and b/metadata-ingestion/tests/integration/feast/feature_store/data/registry.db differ
diff --git a/metadata-ingestion/tests/integration/feast/feature_store/features.py b/metadata-ingestion/tests/integration/feast/feature_store/features.py
index a6e6cd3616e924..dcfd417637958c 100644
--- a/metadata-ingestion/tests/integration/feast/feature_store/features.py
+++ b/metadata-ingestion/tests/integration/feast/feature_store/features.py
@@ -19,6 +19,8 @@
join_keys=["driver_id"],
value_type=ValueType.INT64,
description="Driver ID",
+ owner="MOCK_OWNER",
+ tags={"name": "deprecated"},
)
driver_hourly_stats_view = FeatureView(
@@ -29,7 +31,7 @@
Field(
name="conv_rate",
dtype=feast.types.Float64,
- tags=dict(description="Conv rate"),
+ tags={"name": "needs_documentation", "description": "Conv rate"},
),
Field(
name="acc_rate",
@@ -49,7 +51,8 @@
],
online=True,
source=driver_hourly_stats_source,
- tags={},
+ tags={"name": "deprecated"},
+ owner="MOCK_OWNER",
)
input_request = RequestSource(
diff --git a/metadata-ingestion/tests/integration/feast/test_feast_repository.py b/metadata-ingestion/tests/integration/feast/test_feast_repository.py
index a6bdce67222896..7f04337145dc36 100644
--- a/metadata-ingestion/tests/integration/feast/test_feast_repository.py
+++ b/metadata-ingestion/tests/integration/feast/test_feast_repository.py
@@ -19,6 +19,15 @@ def test_feast_repository_ingest(pytestconfig, tmp_path, mock_time):
"config": {
"path": str(test_resources_dir / "feature_store"),
"environment": "PROD",
+ "enable_tag_extraction": True,
+ "enable_owner_extraction": True,
+ "owner_mappings": [
+ {
+ "feast_owner_name": "MOCK_OWNER",
+ "datahub_owner_urn": "urn:li:corpGroup:MOCK_OWNER",
+ "datahub_ownership_type": "BUSINESS_OWNER",
+ }
+ ],
},
},
"sink": {