Skip to content

Commit

Permalink
support OceanBase Binlog Service
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Jul 19, 2024
1 parent 59db6fa commit ae2d7b8
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 32 deletions.
1 change: 1 addition & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type MigrationContext struct {
GoogleCloudPlatform bool
AzureMySQL bool
AttemptInstantDDL bool
OceanBaseBinlogService bool

config ContextConfig
configMutex *sync.Mutex
Expand Down
5 changes: 2 additions & 3 deletions go/base/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@
package base

import (
gosql "database/sql"
"fmt"
"os"
"regexp"
"strings"
"time"

gosql "database/sql"

"github.com/github/gh-ost/go/mysql"
)

Expand Down Expand Up @@ -75,7 +74,7 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
// AliyunRDS set users port to "NULL", replace it by gh-ost param
// GCP set users port to "NULL", replace it by gh-ost param
// Azure MySQL set users port to a different value by design, replace it by gh-ost para
if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL {
if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL || migrationContext.OceanBaseBinlogService {
port = connectionConfig.Key.Port
} else {
portQuery := `select @@global.port`
Expand Down
1 change: 1 addition & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func main() {
flag.BoolVar(&migrationContext.AliyunRDS, "aliyun-rds", false, "set to 'true' when you execute on Aliyun RDS.")
flag.BoolVar(&migrationContext.GoogleCloudPlatform, "gcp", false, "set to 'true' when you execute on a 1st generation Google Cloud Platform (GCP).")
flag.BoolVar(&migrationContext.AzureMySQL, "azure", false, "set to 'true' when you execute on Azure Database on MySQL.")
flag.BoolVar(&migrationContext.OceanBaseBinlogService, "oceanbase", false, "set to 'true' when you execute on OceanBase Binlog Service")

executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit")
flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration replication is stopped, and tables are swapped and immediately swap-revert. Replication remains stopped and you can compare the two tables for building trust")
Expand Down
81 changes: 56 additions & 25 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (this *Applier) InitDBConnections() (err error) {
if err := this.validateAndReadTimeZone(); err != nil {
return err
}
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL {
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL && !this.migrationContext.OceanBaseBinlogService {
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
return err
} else {
Expand Down Expand Up @@ -670,24 +670,35 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
return chunkSize, rowsAffected, duration, nil
}

// LockOriginalTable places a write lock on the original table
func (this *Applier) LockOriginalTable() error {
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
this.migrationContext.Log.Infof("Locking %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
// lockTables places a write lock on the specific tables
func (this *Applier) lockTables(tableNames ...string) error {
databaseName := this.migrationContext.DatabaseName
query := `lock /* gh-ost */ tables `
for i, tableName := range tableNames {
if i != 0 {
query = query + `, `
}
query = query + fmt.Sprintf(`%s.%s write`, databaseName, tableName)
}
this.migrationContext.Log.Infof("Locking tables %v in database %s", tableNames, databaseName)
this.migrationContext.LockTablesStartTime = time.Now()
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err
}
this.migrationContext.Log.Infof("Table locked")
this.migrationContext.Log.Infof("Tables %v in database %s locked", tableNames, databaseName)
return nil
}

// LockOriginalTable places a write lock on the original table
func (this *Applier) LockOriginalTable() error {
return this.lockTables(this.migrationContext.OriginalTableName)
}

// LockOriginAndGhostTable places a write lock on the original table and the ghost table
func (this *Applier) LockOriginAndGhostTable() error {
return this.lockTables(this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName())
}

// UnlockTables makes tea. No wait, it unlocks tables.
func (this *Applier) UnlockTables() error {
query := `unlock /* gh-ost */ tables`
Expand Down Expand Up @@ -968,7 +979,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke

tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2
this.migrationContext.Log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
query = fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
query = fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout=%d`, tableLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
tableLocked <- err
return err
Expand Down Expand Up @@ -1037,25 +1048,31 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
return nil
}

// AtomicCutoverRename
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
tx, err := this.db.Begin()
func (this *Applier) atomicCutoverRename(db *gosql.DB, sessionIdChan chan int64, tablesRenamed chan<- error) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer func() {
tx.Rollback()
sessionIdChan <- -1
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
if sessionIdChan != nil {
sessionIdChan <- -1
}
if tablesRenamed != nil {
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
}
}()
var sessionId int64
if err := tx.QueryRow(`select /* gh-ost */ connection_id()`).Scan(&sessionId); err != nil {
return err

if sessionIdChan != nil {
var sessionId int64
if err := tx.QueryRow(`select /* gh-ost */ connection_id()`).Scan(&sessionId); err != nil {
return err
}
sessionIdChan <- sessionId
}
sessionIdChan <- sessionId

this.migrationContext.Log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds)
query := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
query := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
return err
}
Expand All @@ -1072,14 +1089,28 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed
)
this.migrationContext.Log.Infof("Issuing and expecting this to block: %s", query)
if _, err := tx.Exec(query); err != nil {
tablesRenamed <- err
if tablesRenamed != nil {
tablesRenamed <- err
}
return this.migrationContext.Log.Errore(err)
}
tablesRenamed <- nil
if tablesRenamed != nil {
tablesRenamed <- nil
}
this.migrationContext.Log.Infof("Tables renamed")
return nil
}

// AtomicCutoverRename renames tables for atomic cut over in non lock session
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
return this.atomicCutoverRename(this.db, sessionIdChan, tablesRenamed)
}

// AtomicCutoverRenameWithLock renames tables for atomic cut over in the lock session
func (this *Applier) AtomicCutoverRenameWithLock() error {
return this.atomicCutoverRename(this.singletonDB, nil, nil)
}

func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
query := fmt.Sprintf(`show /* gh-ost */ global status like '%s'`, variableName)
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (this *Inspector) InitDBConnections() (err error) {
if err := this.validateConnection(); err != nil {
return err
}
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL {
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL && !this.migrationContext.OceanBaseBinlogService {
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
return err
} else {
Expand Down
46 changes: 43 additions & 3 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ func (this *Migrator) canStopStreaming() bool {

// onChangelogEvent is called when a binlog event operation on the changelog table is intercepted.
func (this *Migrator) onChangelogEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
if dmlEvent.NewColumnValues == nil {
// in some compatible systems, such as OceanBase Binlog Service, an UPSERT event is
// converted to a DELETE event and an INSERT event, we need to skip the DELETE event.
return nil
}
// Hey, I created the changelog table, I know the type of columns it has!
switch hint := dmlEvent.NewColumnValues.StringColumn(2); hint {
case "state":
Expand Down Expand Up @@ -551,9 +556,15 @@ func (this *Migrator) cutOver() (err error) {

switch this.migrationContext.CutOverType {
case base.CutOverAtomic:
// Atomic solution: we use low timeout and multiple attempts. But for
// each failed attempt, we throttle until replication lag is back to normal
err = this.atomicCutOver()
if this.migrationContext.OceanBaseBinlogService || !mysql.IsSmallerMinorVersion(this.migrationContext.ApplierMySQLVersion, "8.0.13") {
// Atomic solution for latest MySQL: cut over the tables in the same session where the origin
// table and ghost table are both locked, it can only work on MySQL 8.0.13 or later versions
err = this.atomicCutOverMySQL8()
} else {
// Atomic solution: we use low timeout and multiple attempts. But for
// each failed attempt, we throttle until replication lag is back to normal
err = this.atomicCutOver()
}
case base.CutOverTwoStep:
err = this.cutOverTwoStep()
default:
Expand Down Expand Up @@ -632,6 +643,35 @@ func (this *Migrator) cutOverTwoStep() (err error) {
return nil
}

// atomicCutOverMySQL8 will lock down the original table and the ghost table, execute
// what's left of last DML entries, and atomically swap original->old, then new->original.
// It requires to execute RENAME TABLE when the table is LOCKED under WRITE LOCK, which is
// supported from MySQL 8.0.13, see https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-13.html.
func (this *Migrator) atomicCutOverMySQL8() (err error) {
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)

if err := this.retryOperation(this.applier.LockOriginAndGhostTable); err != nil {
return err
}

if err := this.retryOperation(this.waitForEventsUpToLock); err != nil {
return err
}
if err := this.applier.AtomicCutoverRenameWithLock(); err != nil {
return err
}
if err := this.retryOperation(this.applier.UnlockTables); err != nil {
return err
}

lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime)
this.migrationContext.Log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
return nil
}

// atomicCutOver
func (this *Migrator) atomicCutOver() (err error) {
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
Expand Down
45 changes: 45 additions & 0 deletions go/mysql/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package mysql
import (
gosql "database/sql"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -211,3 +212,47 @@ func Kill(db *gosql.DB, connectionID string) error {
_, err := db.Exec(`KILL QUERY %s`, connectionID)
return err
}

func versionTokens(version string, digits int) []int {
v := strings.Split(version, "-")[0]
tokens := strings.Split(v, ".")
intTokens := make([]int, digits)
for i := range tokens {
if i >= digits {
break
}
intTokens[i], _ = strconv.Atoi(tokens[i])
}
return intTokens
}

func isSmallerVersion(version string, otherVersion string, digits int) bool {
v := versionTokens(version, digits)
o := versionTokens(otherVersion, digits)
for i := 0; i < len(v); i++ {
if v[i] < o[i] {
return true
}
if v[i] > o[i] {
return false
}
if i == digits {
break
}
}
return false
}

// IsSmallerMajorVersion tests two versions against another and returns true if
// the former is a smaller "major" version than the latter.
// e.g. 5.5.36 is NOT a smaller major version as compared to 5.5.40, but IS as compared to 5.6.9
func IsSmallerMajorVersion(version string, otherVersion string) bool {
return isSmallerVersion(version, otherVersion, 2)
}

// IsSmallerMinorVersion tests two versions against another and returns true if
// the former is a smaller "minor" version than the latter.
// e.g. 5.5.36 is a smaller major version as compared to 5.5.40, as well as compared to 5.6.7
func IsSmallerMinorVersion(version string, otherVersion string) bool {
return isSmallerVersion(version, otherVersion, 3)
}

0 comments on commit ae2d7b8

Please sign in to comment.