Skip to content

Commit

Permalink
Merge branch 'master' into david-leifker-patch-2
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Nov 28, 2024
2 parents ca068c7 + f3eda31 commit f62fc30
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 20 deletions.
10 changes: 9 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ buildscript {
// see also datahub-frontend/play.gradle
ext.playVersion = '2.8.22'
ext.playScalaVersion = '2.13'
ext.akkaVersion = '2.6.21' // 2.7.0+ has incompatible license
ext.log4jVersion = '2.23.1'
ext.slf4jVersion = '1.7.36'
ext.logbackClassic = '1.4.14'
Expand Down Expand Up @@ -105,7 +106,14 @@ project.ext.spec = [
]

project.ext.externalDependency = [
'akkaHttp': "com.typesafe.akka:akka-http-core_$playScalaVersion:10.2.10",
'akkaHttp': "com.typesafe.akka:akka-http-core_$playScalaVersion:10.2.10", // max version due to licensing
'akkaActor': "com.typesafe.akka:akka-actor_$playScalaVersion:$akkaVersion",
'akkaStream': "com.typesafe.akka:akka-stream_$playScalaVersion:$akkaVersion",
'akkaActorTyped': "com.typesafe.akka:akka-actor-typed_$playScalaVersion:$akkaVersion",
'akkaSlf4j': "com.typesafe.akka:akka-slf4j_$playScalaVersion:$akkaVersion",
'akkaJackson': "com.typesafe.akka:akka-serialization-jackson_$playScalaVersion:$akkaVersion",
'akkaParsing': "com.typesafe.akka:akka-parsing_$playScalaVersion:$akkaVersion",
'akkaProtobuf': "com.typesafe.akka:akka-protobuf-v3_$playScalaVersion:$akkaVersion",
'antlr4Runtime': 'org.antlr:antlr4-runtime:4.9.3',
'antlr4': 'org.antlr:antlr4:4.9.3',
'assertJ': 'org.assertj:assertj-core:3.11.1',
Expand Down
7 changes: 7 additions & 0 deletions datahub-frontend/play.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ dependencies {
implementation externalDependency.antlr4Runtime
implementation externalDependency.antlr4
implementation externalDependency.akkaHttp
implementation externalDependency.akkaActor
implementation externalDependency.akkaStream
implementation externalDependency.akkaActorTyped
implementation externalDependency.akkaSlf4j
implementation externalDependency.akkaJackson
implementation externalDependency.akkaParsing
implementation externalDependency.akkaProtobuf

implementation externalDependency.jerseyCore
implementation externalDependency.jerseyGuava
Expand Down
37 changes: 37 additions & 0 deletions docs/automations/snowflake-tag-propagation.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import FeatureAvailability from '@site/src/components/FeatureAvailability';

<FeatureAvailability saasOnly />

> Note that this Automation in currently in open **Beta**. With any questions or issues, please reach out to your Acryl representative.
## Introduction

Snowflake Tag Propagation is an automation that allows you to sync DataHub Glossary Terms and Tags on
Expand All @@ -15,6 +17,41 @@ both columns and tables back to Snowflake. This automation is available in DataH
- Automatically Add DataHub Tags to Snowflake Tables and Columns
- Automatically Remove DataHub Glossary Terms and Tags from Snowflake Tables and Columns when they are removed in DataHub

## Prerequisites

### Permissions Required for Tag Management

- `CREATE TAG`: Required to create new tags in Snowflake.
Ensure the user or role has this privilege on the specific schema or database where tags will be created.
- `APPLY TAG`: Required to assign tags to Snowflake objects such as tables, columns, or other database objects.
This permission must be granted at the database, schema, or object level depending on the scope.


### Permissions Required for Object Access

- `USAGE` on the database and schema: Allows access to the database and schema to view and apply changes.
- `SELECT` on the objects (tables, views, etc.): Enables the automation to read metadata and verify existing tags.

### Example Permission Grant Statements

To grant the necessary permissions for a specific role (DATAHUB_AUTOMATION_ROLE), you can use the following SQL commands:

```sql
-- Tag management permissions
GRANT CREATE TAG ON SCHEMA your_database.your_schema TO ROLE DATAHUB_AUTOMATION_ROLE;
GRANT APPLY TAG ON SCHEMA your_database.your_schema TO ROLE DATAHUB_AUTOMATION_ROLE;

-- Object access for metadata operations
GRANT USAGE ON DATABASE your_database TO ROLE DATAHUB_AUTOMATION_ROLE;
GRANT USAGE ON SCHEMA your_database.your_schema TO ROLE DATAHUB_AUTOMATION_ROLE;
GRANT SELECT ON ALL TABLES IN SCHEMA your_database.your_schema TO ROLE DATAHUB_AUTOMATION_ROLE;

-- Future privileges for tagging
GRANT SELECT ON FUTURE TABLES IN SCHEMA your_database.your_schema TO ROLE DATAHUB_AUTOMATION_ROLE;
GRANT APPLY TAG ON FUTURE TABLES IN SCHEMA your_database.your_schema TO ROLE DATAHUB_AUTOMATION_ROLE;
```


## Enabling Snowflake Tag Sync

1. **Navigate to Automations**: Click on 'Govern' > 'Automations' in the navigation bar.
Expand Down
103 changes: 97 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/feast.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
BrowsePathsClass,
GlobalTagsClass,
MLFeaturePropertiesClass,
MLFeatureTablePropertiesClass,
MLPrimaryKeyPropertiesClass,
OwnerClass,
OwnershipClass,
StatusClass,
TagAssociationClass,
)

# FIXME: ValueType module cannot be used as a type
Expand Down Expand Up @@ -91,6 +95,24 @@ class FeastRepositorySourceConfig(ConfigModel):
environment: str = Field(
default=DEFAULT_ENV, description="Environment to use when constructing URNs"
)
# owner_mappings example:
# This must be added to the recipe in order to extract owners, otherwise NO owners will be extracted
# owner_mappings:
# - feast_owner_name: "<owner>"
# datahub_owner_urn: "urn:li:corpGroup:<owner>"
# datahub_ownership_type: "BUSINESS_OWNER"
owner_mappings: Optional[List[Dict[str, str]]] = Field(
default=None, description="Mapping of owner names to owner types"
)
enable_owner_extraction: bool = Field(
default=False,
description="If this is disabled, then we NEVER try to map owners. "
"If this is enabled, then owner_mappings is REQUIRED to extract ownership.",
)
enable_tag_extraction: bool = Field(
default=False,
description="If this is disabled, then we NEVER try to extract tags.",
)


@platform_name("Feast")
Expand Down Expand Up @@ -215,10 +237,15 @@ def _get_entity_workunit(
"""

feature_view_name = f"{self.feature_store.project}.{feature_view.name}"
aspects = (
[StatusClass(removed=False)]
+ self._get_tags(entity)
+ self._get_owners(entity)
)

entity_snapshot = MLPrimaryKeySnapshot(
urn=builder.make_ml_primary_key_urn(feature_view_name, entity.name),
aspects=[StatusClass(removed=False)],
aspects=aspects,
)

entity_snapshot.aspects.append(
Expand All @@ -243,10 +270,11 @@ def _get_feature_workunit(
Generate an MLFeature work unit for a Feast feature.
"""
feature_view_name = f"{self.feature_store.project}.{feature_view.name}"
aspects = [StatusClass(removed=False)] + self._get_tags(field)

feature_snapshot = MLFeatureSnapshot(
urn=builder.make_ml_feature_urn(feature_view_name, field.name),
aspects=[StatusClass(removed=False)],
aspects=aspects,
)

feature_sources = []
Expand Down Expand Up @@ -295,13 +323,18 @@ def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkU
"""

feature_view_name = f"{self.feature_store.project}.{feature_view.name}"
aspects = (
[
BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]),
StatusClass(removed=False),
]
+ self._get_tags(feature_view)
+ self._get_owners(feature_view)
)

feature_view_snapshot = MLFeatureTableSnapshot(
urn=builder.make_ml_feature_table_urn("feast", feature_view_name),
aspects=[
BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]),
StatusClass(removed=False),
],
aspects=aspects,
)

feature_view_snapshot.aspects.append(
Expand Down Expand Up @@ -360,6 +393,64 @@ def _get_on_demand_feature_view_workunit(

return MetadataWorkUnit(id=on_demand_feature_view_name, mce=mce)

# If a tag is specified in a Feast object, then the tag will be ingested into Datahub if enable_tag_extraction is
# True, otherwise NO tags will be ingested
def _get_tags(self, obj: Union[Entity, FeatureView, FeastField]) -> list:
"""
Extracts tags from the given object and returns a list of aspects.
"""
aspects: List[Union[GlobalTagsClass]] = []

# Extract tags
if self.source_config.enable_tag_extraction:
if obj.tags.get("name"):
tag_name: str = obj.tags["name"]
tag_association = TagAssociationClass(
tag=builder.make_tag_urn(tag_name)
)
global_tags_aspect = GlobalTagsClass(tags=[tag_association])
aspects.append(global_tags_aspect)

return aspects

# If an owner is specified in a Feast object, it will only be ingested into Datahub if owner_mappings is specified
# and enable_owner_extraction is True in FeastRepositorySourceConfig, otherwise NO owners will be ingested
def _get_owners(self, obj: Union[Entity, FeatureView, FeastField]) -> list:
"""
Extracts owners from the given object and returns a list of aspects.
"""
aspects: List[Union[OwnershipClass]] = []

# Extract owner
if self.source_config.enable_owner_extraction:
owner = getattr(obj, "owner", None)
if owner:
# Create owner association, skipping if None
owner_association = self._create_owner_association(owner)
if owner_association: # Only add valid owner associations
owners_aspect = OwnershipClass(owners=[owner_association])
aspects.append(owners_aspect)

return aspects

def _create_owner_association(self, owner: str) -> Optional[OwnerClass]:
"""
Create an OwnerClass instance for the given owner using the owner mappings.
"""
if self.source_config.owner_mappings is not None:
for mapping in self.source_config.owner_mappings:
if mapping["feast_owner_name"] == owner:
ownership_type_class: str = mapping.get(
"datahub_ownership_type", "TECHNICAL_OWNER"
)
datahub_owner_urn = mapping.get("datahub_owner_urn")
if datahub_owner_urn:
return OwnerClass(
owner=datahub_owner_urn,
type=ownership_type_class,
)
return None

@classmethod
def create(cls, config_dict, ctx):
config = FeastRepositorySourceConfig.parse_obj(config_dict)
Expand Down
Loading

0 comments on commit f62fc30

Please sign in to comment.