Skip to content

Commit

Permalink
Add support for creating namespaced databases
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas committed Nov 27, 2024
1 parent f05ea4c commit c60671d
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 16 deletions.
108 changes: 96 additions & 12 deletions pkg/db/pgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,27 @@ import (
"context"
"database/sql"
"fmt"
"strings"
"time"

"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
"github.com/xmtp/xmtpd/pkg/migrations"
)

func newPGXDB(
ctx context.Context,
dsn string,
waitForDB, statementTimeout time.Duration,
) (*sql.DB, error) {
func waitUntilDBReady(ctx context.Context, db *pgxpool.Pool, waitTime time.Duration) error {
waitUntil := time.Now().Add(waitTime)

err := db.Ping(ctx)

for err != nil && time.Now().Before(waitUntil) {
time.Sleep(3 * time.Second)
err = db.Ping(ctx)
}
return err
}

func parseConfig(dsn string, statementTimeout time.Duration) (*pgxpool.Config, error) {
config, err := pgxpool.ParseConfig(dsn)
if err != nil {
return nil, err
Expand All @@ -24,31 +33,106 @@ func newPGXDB(
config.ConnConfig.RuntimeParams["statement_timeout"] = fmt.Sprint(
statementTimeout.Milliseconds(),
)
return config, nil
}

func newPGXDB(
ctx context.Context,
config *pgxpool.Config,
waitForDB time.Duration,
) (*sql.DB, error) {
dbPool, err := pgxpool.NewWithConfig(ctx, config)
if err != nil {
return nil, err
}

if err = waitUntilDBReady(ctx, dbPool, waitForDB); err != nil {
return nil, err
}

db := stdlib.OpenDBFromPool(dbPool)

waitUntil := time.Now().Add(waitForDB)
return db, nil
}

err = db.Ping()
for err != nil && time.Now().Before(waitUntil) {
time.Sleep(3 * time.Second)
err = db.Ping()
// Creates a new database with the given namespace if it doesn't exist
func createNamespace(
ctx context.Context,
config *pgxpool.Config,
namespace string,
waitForDB time.Duration,
) error {
// Make a copy of the config so we don't dirty it
config = config.Copy()
// Change the database to postgres so we are able to create new DBs
config.ConnConfig.Database = "postgres"

// Create a temporary connection to the postgres DB
adminConn, err := pgxpool.NewWithConfig(ctx, config)
if err != nil {
return fmt.Errorf("failed to connect to postgres: %w", err)
}
defer adminConn.Close()

if err = waitUntilDBReady(ctx, adminConn, waitForDB); err != nil {
return err
}

// Create database if it doesn't exist
_, err = adminConn.Exec(ctx, fmt.Sprintf(`CREATE DATABASE "%s"`, namespace))
if err != nil {
// Ignore error if database already exists
if !strings.Contains(err.Error(), "already exists") {
return fmt.Errorf("failed to create database: %w", err)
}
}

return db, err
return nil
}

// Creates a new database with the given namespace if it doesn't exist and returns the full DSN for the new database.
func NewNamespacedDB(
ctx context.Context,
dsn string,
namespace string,
waitForDB, statementTimeout time.Duration,
) (*sql.DB, error) {
// Parse the DSN to get the config
config, err := parseConfig(dsn, statementTimeout)
if err != nil {
return nil, fmt.Errorf("failed to parse DSN: %w", err)
}

if err = createNamespace(ctx, config, namespace, waitForDB); err != nil {
return nil, err
}

config.ConnConfig.Database = namespace

db, err := newPGXDB(ctx, config, waitForDB)
if err != nil {
return nil, err
}

err = migrations.Migrate(ctx, db)
if err != nil {
return nil, err
}

return db, nil
}

func NewDB(
ctx context.Context,
dsn string,
waitForDB, statementTimeout time.Duration,
) (*sql.DB, error) {
db, err := newPGXDB(ctx, dsn, waitForDB, statementTimeout)
config, err := parseConfig(dsn, statementTimeout)
if err != nil {
return nil, err
}

db, err := newPGXDB(ctx, config, waitForDB)
if err != nil {
return nil, err
}
Expand Down
34 changes: 34 additions & 0 deletions pkg/db/pgx_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package db

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/xmtp/xmtpd/pkg/testutils"
)

func TestNamespacedDB(t *testing.T) {
startingDsn := testutils.LocalTestDBDSNPrefix + "/foo" + testutils.LocalTestDBDSNSuffix
newDBName := "bar"
// Create namespaced DB
namespacedDB, err := NewNamespacedDB(
context.Background(),
startingDsn,
newDBName,
time.Second,
time.Second,
)
require.NoError(t, err)

result, err := namespacedDB.Query("SELECT current_database();")
require.NoError(t, err)
defer result.Close()

require.True(t, result.Next())
var dbName string
err = result.Scan(&dbName)
require.NoError(t, err)
require.Equal(t, "bar", dbName)
}
8 changes: 4 additions & 4 deletions pkg/testutils/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
)

const (
localTestDBDSNPrefix = "postgres://postgres:xmtp@localhost:8765"
localTestDBDSNSuffix = "?sslmode=disable"
LocalTestDBDSNPrefix = "postgres://postgres:xmtp@localhost:8765"
LocalTestDBDSNSuffix = "?sslmode=disable"
)

func openDB(t testing.TB, dsn string) (*sql.DB, string, func()) {
Expand All @@ -28,15 +28,15 @@ func openDB(t testing.TB, dsn string) (*sql.DB, string, func()) {
}

func newCtlDB(t testing.TB) (*sql.DB, string, func()) {
return openDB(t, localTestDBDSNPrefix+localTestDBDSNSuffix)
return openDB(t, LocalTestDBDSNPrefix+LocalTestDBDSNSuffix)
}

func newInstanceDB(t testing.TB, ctx context.Context, ctlDB *sql.DB) (*sql.DB, string, func()) {
dbName := "test_" + RandomStringLower(12)
_, err := ctlDB.Exec("CREATE DATABASE " + dbName)
require.NoError(t, err)

db, dsn, cleanup := openDB(t, localTestDBDSNPrefix+"/"+dbName+localTestDBDSNSuffix)
db, dsn, cleanup := openDB(t, LocalTestDBDSNPrefix+"/"+dbName+LocalTestDBDSNSuffix)
require.NoError(t, migrations.Migrate(ctx, db))

return db, dsn, func() {
Expand Down

0 comments on commit c60671d

Please sign in to comment.