Skip to content

Commit

Permalink
Fix SQL query to retrieve MySQL foreign key constraints (#2296)
Browse files Browse the repository at this point in the history
  • Loading branch information
alishakawaguchi authored Jul 16, 2024
1 parent c033213 commit a6c79b9
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 53 deletions.
1 change: 1 addition & 0 deletions backend/gen/go/db/dbschemas/mysql/system.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/pkg/dbschemas/sql/mysql/queries/system.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ FROM
JOIN information_schema.key_column_usage kcu
ON
kcu.constraint_name = rc.constraint_name
AND kcu.constraint_schema = rc.constraint_schema
JOIN information_schema.columns as c
ON
c.table_schema = kcu.table_schema
Expand Down
1 change: 1 addition & 0 deletions backend/pkg/dbschemas/sql/mysql/schema/system.sql
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ create table information_schema.key_column_usage (
);

create table information_schema.referential_constraints (
constraint_schema text not null,
constraint_name text not null,
update_rule text not null,
delete_rule text not null
Expand Down
81 changes: 29 additions & 52 deletions backend/pkg/sqlmanager/mysql/mysql-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ func (m *MysqlManager) GetTableConstraintsBySchema(ctx context.Context, schemas
}, nil
}

func (m *MysqlManager) getForeignKeyConstraints(ctx context.Context, schemas []string) ([]*sqlmanager_shared.ForeignKeyConstraintsRow, error) {
holder := make([][]*mysql_queries.GetForeignKeyConstraintsRow, len(schemas))
// Key is schema.table value is list of tables that key depends on
func (m *MysqlManager) getForeignKeyConstraintsMap(ctx context.Context, schemas []string) (map[string][]*sqlmanager_shared.ForeignConstraint, error) {
fksBySchema := make([][]*mysql_queries.GetForeignKeyConstraintsRow, len(schemas)) // groupped by schema
errgrp, errctx := errgroup.WithContext(ctx)
for idx := range schemas {
idx := idx
Expand All @@ -98,7 +99,7 @@ func (m *MysqlManager) getForeignKeyConstraints(ctx context.Context, schemas []s
if err != nil {
return err
}
holder[idx] = constraints
fksBySchema[idx] = constraints
return nil
})
}
Expand All @@ -107,59 +108,35 @@ func (m *MysqlManager) getForeignKeyConstraints(ctx context.Context, schemas []s
return nil, err
}

output := []*mysql_queries.GetForeignKeyConstraintsRow{}
for _, schemas := range holder {
output = append(output, schemas...)
}
result := []*sqlmanager_shared.ForeignKeyConstraintsRow{}
for _, row := range output {
result = append(result, &sqlmanager_shared.ForeignKeyConstraintsRow{
SchemaName: row.SchemaName,
TableName: row.TableName,
ColumnName: row.ColumnName,
IsNullable: sqlmanager_shared.ConvertNullableTextToBool(row.IsNullable),
ConstraintName: row.ConstraintName,
ForeignSchemaName: row.ForeignSchemaName,
ForeignTableName: row.ForeignTableName,
ForeignColumnName: row.ForeignColumnName,
})
}
return result, nil
}

// Key is schema.table value is list of tables that key depends on
func (m *MysqlManager) getForeignKeyConstraintsMap(ctx context.Context, schemas []string) (map[string][]*sqlmanager_shared.ForeignConstraint, error) {
fkConstraints, err := m.getForeignKeyConstraints(ctx, schemas)
if err != nil {
return nil, err
}
groupedFks := map[string][]*sqlmanager_shared.ForeignKeyConstraintsRow{} // grouped by constraint name
for _, row := range fkConstraints {
groupedFks[row.ConstraintName] = append(groupedFks[row.ConstraintName], row)
}
constraints := map[string][]*sqlmanager_shared.ForeignConstraint{}
for _, fks := range groupedFks {
cols := []string{}
notNullable := []bool{}
fkCols := []string{}
for _, fk := range fks {
cols = append(cols, fk.ColumnName)
notNullable = append(notNullable, !fk.IsNullable)
fkCols = append(fkCols, fk.ForeignColumnName)
for _, fksSchema := range fksBySchema {
groupedFks := map[string][]*mysql_queries.GetForeignKeyConstraintsRow{} // grouped by constraint name
for _, row := range fksSchema {
groupedFks[row.ConstraintName] = append(groupedFks[row.ConstraintName], row)
}
for _, fks := range groupedFks {
cols := []string{}
notNullable := []bool{}
fkCols := []string{}
for _, fk := range fks {
cols = append(cols, fk.ColumnName)
notNullable = append(notNullable, !sqlmanager_shared.ConvertNullableTextToBool(fk.IsNullable))
fkCols = append(fkCols, fk.ForeignColumnName)
}
row := fks[0]
tableName := sqlmanager_shared.BuildTable(row.SchemaName, row.TableName)
constraints[tableName] = append(constraints[tableName], &sqlmanager_shared.ForeignConstraint{
Columns: cols,
NotNullable: notNullable,
ForeignKey: &sqlmanager_shared.ForeignKey{
Table: sqlmanager_shared.BuildTable(row.ForeignSchemaName, row.ForeignTableName),
Columns: fkCols,
},
})
}
row := fks[0]
tableName := sqlmanager_shared.BuildTable(row.SchemaName, row.TableName)
constraints[tableName] = append(constraints[tableName], &sqlmanager_shared.ForeignConstraint{
Columns: cols,
NotNullable: notNullable,
ForeignKey: &sqlmanager_shared.ForeignKey{
Table: sqlmanager_shared.BuildTable(row.ForeignSchemaName, row.ForeignTableName),
Columns: fkCols,
},
})
}

return constraints, err
return constraints, nil
}

func (m *MysqlManager) GetPrimaryKeyConstraints(ctx context.Context, schemas []string) ([]*sqlmanager_shared.PrimaryKey, error) {
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/workflows/datasync/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ func Workflow(wfctx workflow.Context, req *WorkflowRequest) (*WorkflowResponse,
logger := log.With(logger, withBenthosConfigResponseLoggerTags(bc)...)
future := invokeSync(bc, childctx, &started, &completed, logger)
workselector.AddFuture(future, func(f workflow.Future) {
logger.Info("config sync completed")
var result sync_activity.SyncResponse
err := f.Get(childctx, &result)
if err != nil {
Expand All @@ -134,6 +133,7 @@ func Workflow(wfctx workflow.Context, req *WorkflowRequest) (*WorkflowResponse,
cancelHandler()
activityErr = err
}
logger.Info("config sync completed")
delete(redisDependsOn, neosync_benthos.BuildBenthosTable(bc.TableSchema, bc.TableName))
// clean up redis
err = runRedisCleanUpActivity(wfctx, logger, actOptResp, redisDependsOn, req.JobId, wfinfo.WorkflowExecution.ID, redisConfigs)
Expand Down

0 comments on commit a6c79b9

Please sign in to comment.