Skip to content

Commit

Permalink
fix(dataproduct): optimize data product sideeffect (#11961)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Nov 26, 2024
1 parent 3b1a8ca commit 094433c
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.dataproduct.DataProductAssociation;
import com.linkedin.dataproduct.DataProductAssociationArray;
import com.linkedin.dataproduct.DataProductProperties;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.RetrieverContext;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.batch.MCLItem;
Expand All @@ -31,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.Getter;
Expand Down Expand Up @@ -65,77 +67,108 @@ private static Stream<MCPItem> generatePatchRemove(
MCLItem mclItem, @Nonnull RetrieverContext retrieverContext) {

if (DATA_PRODUCT_PROPERTIES_ASPECT_NAME.equals(mclItem.getAspectName())) {
List<MCPItem> mcpItems = new ArrayList<>();

DataProductProperties dataProductProperties = mclItem.getAspect(DataProductProperties.class);
if (dataProductProperties == null) {
log.error("Unable to process data product properties for urn: {}", mclItem.getUrn());
return Stream.empty();
}
Map<String, List<GenericJsonPatch.PatchOp>> patchOpMap = new HashMap<>();
for (DataProductAssociation dataProductAssociation :
DataProductAssociationArray newDataProductAssociationArray =
Optional.ofNullable(dataProductProperties.getAssets())
.orElse(new DataProductAssociationArray())) {
RelatedEntitiesScrollResult result =
retrieverContext
.getGraphRetriever()
.scrollRelatedEntities(
null,
QueryUtils.newFilter(
"urn", dataProductAssociation.getDestinationUrn().toString()),
null,
EMPTY_FILTER,
ImmutableList.of("DataProductContains"),
QueryUtils.newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING),
Collections.emptyList(),
null,
10, // Should only ever be one, if ever greater than ten will decrease over time
// to become consistent
null,
null);
if (!result.getEntities().isEmpty()) {
for (RelatedEntities entity : result.getEntities()) {
if (!mclItem.getUrn().equals(UrnUtils.getUrn(entity.getSourceUrn()))) {
GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp();
patchOp.setOp(PatchOperationType.REMOVE.getValue());
patchOp.setPath(String.format("/assets/%s", entity.getDestinationUrn()));
patchOpMap
.computeIfAbsent(entity.getSourceUrn(), urn -> new ArrayList<>())
.add(patchOp);
}
.orElse(new DataProductAssociationArray());

DataProductProperties previousDataProductProperties =
mclItem.getPreviousAspect(DataProductProperties.class);

if (!ChangeType.UPSERT.equals(mclItem.getChangeType())
|| previousDataProductProperties == null) {
// CREATE/CREATE_ENTITY/RESTATE
return generateUnsetMCPs(mclItem, newDataProductAssociationArray, retrieverContext);
} else {
// UPSERT with previous
DataProductAssociationArray oldDataProductAssociationArray =
Optional.ofNullable(previousDataProductProperties.getAssets())
.orElse(new DataProductAssociationArray());

DataProductAssociationArray additions =
newDataProductAssociationArray.stream()
.filter(association -> !oldDataProductAssociationArray.contains(association))
.collect(Collectors.toCollection(DataProductAssociationArray::new));

return generateUnsetMCPs(mclItem, additions, retrieverContext);
}
}
return Stream.empty();
}

private static Stream<MCPItem> generateUnsetMCPs(
@Nonnull MCLItem dataProductItem,
@Nonnull DataProductAssociationArray dataProductAssociations,
@Nonnull RetrieverContext retrieverContext) {
List<MCPItem> mcpItems = new ArrayList<>();
Map<String, List<GenericJsonPatch.PatchOp>> patchOpMap = new HashMap<>();

for (DataProductAssociation dataProductAssociation : dataProductAssociations) {
RelatedEntitiesScrollResult result =
retrieverContext
.getGraphRetriever()
.scrollRelatedEntities(
null,
QueryUtils.newFilter(
"urn", dataProductAssociation.getDestinationUrn().toString()),
null,
EMPTY_FILTER,
ImmutableList.of("DataProductContains"),
QueryUtils.newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING),
Collections.emptyList(),
null,
10, // Should only ever be one, if ever greater than ten will decrease over time
// to become consistent
null,
null);
if (!result.getEntities().isEmpty()) {
for (RelatedEntities entity : result.getEntities()) {
if (!dataProductItem.getUrn().equals(UrnUtils.getUrn(entity.getSourceUrn()))) {
GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp();
patchOp.setOp(PatchOperationType.REMOVE.getValue());
patchOp.setPath(String.format("/assets/%s", entity.getDestinationUrn()));
patchOpMap
.computeIfAbsent(entity.getSourceUrn(), urn -> new ArrayList<>())
.add(patchOp);
}
}
}
for (String urn : patchOpMap.keySet()) {
EntitySpec entitySpec =
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME);
mcpItems.add(
PatchItemImpl.builder()
.urn(UrnUtils.getUrn(urn))
.entitySpec(
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME))
.aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)
.aspectSpec(entitySpec.getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME))
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(
Map.of(
DataProductPropertiesTemplate.ASSETS_FIELD_NAME,
List.of(DataProductPropertiesTemplate.KEY_FIELD_NAME)))
.patch(patchOpMap.get(urn))
.build()
.getJsonPatch())
.auditStamp(mclItem.getAuditStamp())
.systemMetadata(mclItem.getSystemMetadata())
.build(retrieverContext.getAspectRetriever().getEntityRegistry()));
}
return mcpItems.stream();
}
return Stream.empty();
for (String urn : patchOpMap.keySet()) {
EntitySpec entitySpec =
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME);
mcpItems.add(
PatchItemImpl.builder()
.urn(UrnUtils.getUrn(urn))
.entitySpec(
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME))
.aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)
.aspectSpec(entitySpec.getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME))
.patch(
GenericJsonPatch.builder()
.arrayPrimaryKeys(
Map.of(
DataProductPropertiesTemplate.ASSETS_FIELD_NAME,
List.of(DataProductPropertiesTemplate.KEY_FIELD_NAME)))
.patch(patchOpMap.get(urn))
.build()
.getJsonPatch())
.auditStamp(dataProductItem.getAuditStamp())
.systemMetadata(dataProductItem.getSystemMetadata())
.build(retrieverContext.getAspectRetriever().getEntityRegistry()));
}

return mcpItems.stream();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
import com.linkedin.metadata.aspect.GraphRetriever;
import com.linkedin.metadata.aspect.SystemAspect;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.models.graph.RelatedEntities;
import com.linkedin.metadata.aspect.models.graph.RelatedEntitiesScrollResult;
Expand Down Expand Up @@ -47,13 +48,7 @@
public class DataProductUnsetSideEffectTest {
private static final EntityRegistry TEST_REGISTRY = new TestEntityRegistry();
private static final List<ChangeType> SUPPORTED_CHANGE_TYPES =
List.of(
ChangeType.CREATE,
ChangeType.PATCH,
ChangeType.CREATE_ENTITY,
ChangeType.UPSERT,
ChangeType.DELETE,
ChangeType.RESTATE);
List.of(ChangeType.CREATE, ChangeType.CREATE_ENTITY, ChangeType.UPSERT, ChangeType.RESTATE);
private static final Urn TEST_PRODUCT_URN =
UrnUtils.getUrn("urn:li:dataProduct:someDataProductId");

Expand Down Expand Up @@ -358,6 +353,109 @@ public void testBulkAssetMove() {
}
}

@Test
public void testUpsertWithPreviousAspect() {
DataProductUnsetSideEffect test = new DataProductUnsetSideEffect();
test.setConfig(TEST_PLUGIN_CONFIG);

// Case 1: UPSERT with new additions
DataProductProperties previousProperties = new DataProductProperties();
DataProductAssociationArray previousAssociations = new DataProductAssociationArray();
DataProductAssociation previousAssociation = new DataProductAssociation();
previousAssociation.setDestinationUrn(DATASET_URN_1);
previousAssociations.add(previousAssociation);
previousProperties.setAssets(previousAssociations);

// New properties include both old and new datasets
DataProductProperties newProperties = new DataProductProperties();
DataProductAssociationArray newAssociations = new DataProductAssociationArray();
DataProductAssociation association1 = new DataProductAssociation();
association1.setDestinationUrn(DATASET_URN_1);
DataProductAssociation association2 = new DataProductAssociation();
association2.setDestinationUrn(DATASET_URN_2);
newAssociations.add(association1);
newAssociations.add(association2);
newProperties.setAssets(newAssociations);

// Create change item with previous aspect
SystemAspect prevData = mock(SystemAspect.class);
when(prevData.getRecordTemplate()).thenReturn(previousProperties);

ChangeItemImpl dataProductPropertiesChangeItem =
ChangeItemImpl.builder()
.urn(TEST_PRODUCT_URN)
.aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)
.changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(DATA_PRODUCT_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME)
.getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME))
.recordTemplate(newProperties)
.previousSystemAspect(prevData)
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(mockAspectRetriever);

List<MCPItem> testOutput =
test.postMCPSideEffect(
List.of(
MCLItemImpl.builder()
.build(
dataProductPropertiesChangeItem,
null,
null,
retrieverContext.getAspectRetriever())),
retrieverContext)
.toList();

// Verify that only one patch is generated for the new dataset
assertEquals(
testOutput.size(), 1, "Expected removal of previous data product for new dataset only");
MCPItem patchItem = testOutput.get(0);
assertEquals(
patchItem.getUrn(), TEST_PRODUCT_URN_2, "Patch should target the old data product");
GenericJsonPatch.PatchOp expectedPatchOp = new GenericJsonPatch.PatchOp();
expectedPatchOp.setOp(PatchOperationType.REMOVE.getValue());
expectedPatchOp.setPath(String.format("/assets/%s", DATASET_URN_2));

// Case 2: UPSERT with no new additions
DataProductProperties sameProperties = new DataProductProperties();
DataProductAssociationArray sameAssociations = new DataProductAssociationArray();
DataProductAssociation sameAssociation = new DataProductAssociation();
sameAssociation.setDestinationUrn(DATASET_URN_1);
sameAssociations.add(sameAssociation);
sameProperties.setAssets(sameAssociations);

SystemAspect prevSameData = mock(SystemAspect.class);
when(prevData.getRecordTemplate()).thenReturn(sameProperties);

ChangeItemImpl noChangeItem =
ChangeItemImpl.builder()
.urn(TEST_PRODUCT_URN)
.aspectName(DATA_PRODUCT_PROPERTIES_ASPECT_NAME)
.changeType(ChangeType.UPSERT)
.entitySpec(TEST_REGISTRY.getEntitySpec(DATA_PRODUCT_ENTITY_NAME))
.aspectSpec(
TEST_REGISTRY
.getEntitySpec(DATA_PRODUCT_ENTITY_NAME)
.getAspectSpec(DATA_PRODUCT_PROPERTIES_ASPECT_NAME))
.recordTemplate(sameProperties)
.previousSystemAspect(prevSameData)
.auditStamp(AuditStampUtils.createDefaultAuditStamp())
.build(mockAspectRetriever);

List<MCPItem> noChangeOutput =
test.postMCPSideEffect(
List.of(
MCLItemImpl.builder()
.build(noChangeItem, null, null, retrieverContext.getAspectRetriever())),
retrieverContext)
.toList();

// Verify no patches are generated when there are no new additions
assertEquals(noChangeOutput.size(), 0, "Expected no changes when assets are the same");
}

private static DataProductProperties getTestDataProductProperties(Urn destinationUrn) {
DataProductProperties dataProductProperties = new DataProductProperties();
DataProductAssociationArray dataProductAssociations = new DataProductAssociationArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ public MCPSideEffect dataProductUnsetSideEffect() {
AspectPluginConfig.builder()
.enabled(true)
.className(DataProductUnsetSideEffect.class.getName())
.supportedOperations(
List.of("CREATE", "CREATE_ENTITY", "UPSERT", "RESTATE", "DELETE", "PATCH"))
.supportedOperations(List.of("CREATE", "CREATE_ENTITY", "UPSERT", "RESTATE"))
.supportedEntityAspectNames(
List.of(
AspectPluginConfig.EntityAspectName.builder()
Expand Down

0 comments on commit 094433c

Please sign in to comment.