Skip to content

Commit

Permalink
Fixed trigger ID randomization for new workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
frikky committed Nov 5, 2024
1 parent e17f55f commit 169a0ef
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 3 deletions.
25 changes: 25 additions & 0 deletions db-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8193,6 +8193,31 @@ func FixWorkflowPosition(ctx context.Context, workflow Workflow) Workflow {
}

func SetWorkflow(ctx context.Context, workflow Workflow, id string, optionalEditedSecondsOffset ...int) error {
// FIXME: Due to a possibility of ID reusage on duplication, we re-randomize ID's IF the workflow is new
// Due to caching, this is kind of fine.
foundWorkflow, err := GetWorkflow(ctx, id)
if err != nil || foundWorkflow.ID == "" {
log.Printf("[INFO] Workflow %s doesn't exist, randomizing IDs for Triggers", id)

// Old ID + Org ID as seed -> generate new uuid
for triggerIndex, trigger := range workflow.Triggers {
uuidSeed := fmt.Sprintf("%s_%s", trigger.ID, workflow.OrgId)
newTriggerId := uuid.NewV5(uuid.NamespaceOID, uuidSeed).String()
for branchIndex, branch := range workflow.Branches {
if branch.SourceID == trigger.ID {
workflow.Branches[branchIndex].SourceID = newTriggerId
}

if branch.DestinationID == trigger.ID {
workflow.Branches[branchIndex].DestinationID = newTriggerId
}
}

workflow.Triggers[triggerIndex].ID = newTriggerId
workflow.Triggers[triggerIndex].Status = "stopped"
}
}

// Overwriting to be sure these are matching
// No real point in having id + workflow.ID anymore
id = workflow.ID
Expand Down
12 changes: 9 additions & 3 deletions shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -16651,7 +16651,6 @@ func GetReplacementNodes(ctx context.Context, execution WorkflowExecution, trigg
}

childNodes := FindChildNodes(workflowExecution.Workflow, workflowAction, []string{}, []string{})
//log.Printf("Found %d childnodes of %s", len(childNodes), workflowAction)
newActions := []Action{}
branches := []Branch{}

Expand Down Expand Up @@ -29142,9 +29141,16 @@ func checkExecutionStatus(ctx context.Context, exec *WorkflowExecution) *Workflo
if workflowChanged {
workflow.Actions = originalActions

err = SetWorkflow(ctx, *workflow, workflow.ID)
// This causes too many writes and can't be handled at scale. Removing for now. Only setting cache.
cacheKey := fmt.Sprintf("workflow_%s", workflow.ID)
data, err := json.Marshal(workflow)
if err != nil {
log.Printf("[ERROR] Failed updating workflow from execution validator. This is NOT critical as we keep cache %s: %s", workflow.ID, err)
log.Printf("[ERROR] Failed marshalling validation of %s: %s", workflow.ID, err)
} else {
err = SetCache(ctx, cacheKey, data, 30)
if err != nil {
log.Printf("[ERROR] Failed setting cache for workflow '%s': %s", cacheKey, err)
}
}
}

Expand Down

0 comments on commit 169a0ef

Please sign in to comment.