-
Notifications
You must be signed in to change notification settings - Fork 3
Postgres #62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Postgres #62
Changes from 15 commits
fa05d57
351a6dc
b1b982d
2d9462f
6643158
fd8c76f
b6681a2
da5b115
18309f0
b651814
160df17
11659ed
37fd588
ca444bc
c483fb7
4b803bf
f202e18
ad129b8
352ed9f
5e807d5
494dca9
1f7b6b7
af75628
6916d34
1a91556
fb56414
5e29dc9
6e114a9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| test: test_mssql test_pgsql | ||
|
|
||
|
|
||
| test_mssql: | ||
| docker compose --progress plain -f docker-compose.mssql.yml run test | ||
|
|
||
| test_pgsql: | ||
| docker compose --progress plain -f docker-compose.pgsql.yml run test |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ package sqlcode | |
| import ( | ||
| "context" | ||
| "database/sql" | ||
| "database/sql/driver" | ||
| ) | ||
|
|
||
| type DB interface { | ||
|
|
@@ -11,6 +12,7 @@ type DB interface { | |
| QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row | ||
| Conn(ctx context.Context) (*sql.Conn, error) | ||
| BeginTx(ctx context.Context, txOptions *sql.TxOptions) (*sql.Tx, error) | ||
| Driver() driver.Driver | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a required interface for sql.DB and shouldn't be too much of a hassle to support in internal libraries. |
||
| } | ||
|
|
||
| var _ DB = &sql.DB{} | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,11 +3,25 @@ package sqlcode | |
| import ( | ||
| "context" | ||
| "database/sql" | ||
|
|
||
| mssql "github.com/denisenkom/go-mssqldb" | ||
|
||
| "github.com/jackc/pgx/v5/stdlib" | ||
| ) | ||
|
|
||
| func Exists(ctx context.Context, dbc DB, schemasuffix string) (bool, error) { | ||
| var schemaID int | ||
| err := dbc.QueryRowContext(ctx, `select isnull(schema_id(@p1), 0)`, SchemaName(schemasuffix)).Scan(&schemaID) | ||
|
|
||
| driver := dbc.Driver() | ||
| var qs string | ||
|
|
||
| if _, ok := driver.(*mssql.Driver); ok { | ||
| qs = `select isnull(schema_id(@p1), 0)` | ||
| } | ||
| if _, ok := driver.(*stdlib.Driver); ok { | ||
| qs = `select coalesce((select oid from pg_namespace where nspname = $1),0)` | ||
| } | ||
|
|
||
| err := dbc.QueryRowContext(ctx, qs, SchemaName(schemasuffix)).Scan(&schemaID) | ||
| if err != nil { | ||
| return false, err | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,9 @@ import ( | |
| "time" | ||
|
|
||
| mssql "github.com/denisenkom/go-mssqldb" | ||
| "github.com/jackc/pgx/v5" | ||
| "github.com/jackc/pgx/v5/stdlib" | ||
| pgxstdlib "github.com/jackc/pgx/v5/stdlib" | ||
| "github.com/vippsas/sqlcode/sqlparser" | ||
| ) | ||
|
|
||
|
|
@@ -77,24 +80,25 @@ func impersonate(ctx context.Context, dbc DB, username string, f func(conn *sql. | |
| // Upload will create and upload the schema; resulting in an error | ||
| // if the schema already exists | ||
| func (d *Deployable) Upload(ctx context.Context, dbc DB) error { | ||
| // First, impersonate a user with minimal privileges to get at least | ||
| // some level of sandboxing so that migration scripts can't do anything | ||
| // the caller didn't expect them to. | ||
| return impersonate(ctx, dbc, "sqlcode-deploy-sandbox-user", func(conn *sql.Conn) error { | ||
| driver := dbc.Driver() | ||
| qs := make(map[string][]interface{}, 1) | ||
|
|
||
| var uploadFunc = func(conn *sql.Conn) error { | ||
| tx, err := conn.BeginTx(ctx, nil) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| _, err = tx.ExecContext(ctx, `sqlcode.CreateCodeSchema`, | ||
| sql.Named("schemasuffix", d.SchemaSuffix), | ||
| ) | ||
| if err != nil { | ||
| _ = tx.Rollback() | ||
| return err | ||
| for q, args := range qs { | ||
| _, err = tx.ExecContext(ctx, q, args...) | ||
|
|
||
| if err != nil { | ||
| _ = tx.Rollback() | ||
| return fmt.Errorf("failed to execute (%s) with arg(%s) in schema %s: %w", q, args, d.SchemaSuffix, err) | ||
| } | ||
| } | ||
|
|
||
| preprocessed, err := Preprocess(d.CodeBase, d.SchemaSuffix) | ||
| preprocessed, err := Preprocess(d.CodeBase, d.SchemaSuffix, dbc.Driver()) | ||
| if err != nil { | ||
| _ = tx.Rollback() | ||
| return err | ||
|
|
@@ -103,15 +107,16 @@ func (d *Deployable) Upload(ctx context.Context, dbc DB) error { | |
| _, err := tx.ExecContext(ctx, b.Lines) | ||
| if err != nil { | ||
| _ = tx.Rollback() | ||
| sqlerr, ok := err.(mssql.Error) | ||
| if !ok { | ||
| return err | ||
| } else { | ||
| return SQLUserError{ | ||
| if sqlerr, ok := err.(mssql.Error); ok { | ||
| return MSSQLUserError{ | ||
| Wrapped: sqlerr, | ||
| Batch: b, | ||
| } | ||
| } | ||
|
|
||
| // TODO(ks) PGSQLUserError | ||
| return fmt.Errorf("failed to upload deployable:%s in schema:%s:%w", d.CodeBase, d.SchemaSuffix, err) | ||
|
|
||
| } | ||
| } | ||
| err = tx.Commit() | ||
|
|
@@ -123,8 +128,36 @@ func (d *Deployable) Upload(ctx context.Context, dbc DB) error { | |
|
|
||
| return nil | ||
|
|
||
| }) | ||
| } | ||
|
|
||
| if _, ok := driver.(*mssql.Driver); ok { | ||
| // First, impersonate a user with minimal privileges to get at least | ||
| // some level of sandboxing so that migration scripts can't do anything | ||
| // the caller didn't expect them to. | ||
| qs["sqlcode.CreateCodeSchema"] = []interface { | ||
| }{ | ||
| sql.Named("schemasuffix", d.SchemaSuffix), | ||
| } | ||
|
|
||
| return impersonate(ctx, dbc, "sqlcode-deploy-sandbox-user", uploadFunc) | ||
| } | ||
|
|
||
| if _, ok := driver.(*stdlib.Driver); ok { | ||
| qs[`set role "sqlcode-deploy-sandbox-user"`] = nil | ||
| qs[`call sqlcode.createcodeschema(@schemasuffix)`] = []interface{}{ | ||
| pgx.NamedArgs{"schemasuffix": d.SchemaSuffix}, | ||
| } | ||
| conn, err := dbc.Conn(ctx) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer func() { | ||
| _ = conn.Close() | ||
| }() | ||
| return uploadFunc(conn) | ||
| } | ||
|
|
||
| return fmt.Errorf("failed to determine sql driver to upload schema: %s", d.SchemaSuffix) | ||
| } | ||
|
|
||
| // EnsureUploaded checks that the schema with the suffix already exists, | ||
|
|
@@ -137,36 +170,51 @@ func (d *Deployable) EnsureUploaded(ctx context.Context, dbc DB) error { | |
| return nil | ||
| } | ||
|
|
||
| driver := dbc.Driver() | ||
| lockResourceName := "sqlcode.EnsureUploaded/" + d.SchemaSuffix | ||
|
|
||
| var lockRetCode int | ||
| var lockQs string | ||
| var unlockQs string | ||
| var err error | ||
|
|
||
| // When a lock is opened with the Transaction lock owner, | ||
| // that lock is released when the transaction is committed or rolled back. | ||
| var lockRetCode int | ||
| err := dbc.QueryRowContext(ctx, ` | ||
| declare @retcode int; | ||
| exec @retcode = sp_getapplock @Resource = @resource, @LockMode = 'Shared', @LockOwner = 'Session', @LockTimeout = @timeoutMs; | ||
| select @retcode; | ||
| `, | ||
| sql.Named("resource", lockResourceName), | ||
| sql.Named("timeoutMs", 20000), | ||
| ).Scan(&lockRetCode) | ||
| if _, ok := driver.(*pgxstdlib.Driver); ok { | ||
| lockQs = `select sqlcode.get_applock(@resource, @timeout)` | ||
| unlockQs = `select sqlcode.release_applock(@resource)` | ||
|
|
||
| err = dbc.QueryRowContext(ctx, lockQs, pgx.NamedArgs{ | ||
| "resource": lockResourceName, | ||
| "timeoutMs": 20000, | ||
| }).Scan(&lockRetCode) | ||
|
|
||
| defer func() { | ||
| dbc.ExecContext(ctx, unlockQs, pgx.NamedArgs{"resource": lockResourceName}) | ||
| }() | ||
| } | ||
|
|
||
| if _, ok := driver.(*mssql.Driver); ok { | ||
| // TODO | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO: Add back the mssql sql to lock on upload. |
||
|
|
||
| defer func() { | ||
| // TODO: This returns an error if the lock is already released | ||
| _, _ = dbc.ExecContext(ctx, unlockQs, | ||
| sql.Named("Resource", lockResourceName), | ||
| sql.Named("LockOwner", "Session"), | ||
| ) | ||
| }() | ||
| } | ||
|
|
||
| if err != nil { | ||
| return err | ||
| } | ||
| if lockRetCode < 0 { | ||
| return errors.New("was not able to get lock before timeout") | ||
| } | ||
|
|
||
| defer func() { | ||
| _, _ = dbc.ExecContext(ctx, `sp_releaseapplock`, | ||
| sql.Named("Resource", lockResourceName), | ||
| sql.Named("LockOwner", "Session"), | ||
| ) | ||
| }() | ||
|
|
||
| exists, err := Exists(ctx, dbc, d.SchemaSuffix) | ||
| if err != nil { | ||
| return err | ||
| return fmt.Errorf("unable to determine if schema %s exists: %w", d.SchemaSuffix, err) | ||
| } | ||
|
|
||
| if exists { | ||
|
|
@@ -195,11 +243,28 @@ func (d Deployable) DropAndUpload(ctx context.Context, dbc DB) error { | |
| } | ||
|
|
||
| // Patch will preprocess the sql passed in so that it will call SQL code | ||
| // deployed by the receiver Deployable | ||
| // deployed by the receiver Deployable for SQL Server. | ||
| // NOTE: This will be deprecated and eventually replaced with CodePatch. | ||
| func (d Deployable) Patch(sql string) string { | ||
| return preprocessString(d.SchemaSuffix, sql) | ||
| } | ||
|
|
||
| // CodePatch will preprocess the sql passed in to call | ||
| // the correct SQL code deployed to the provided database. | ||
| // Q: Nameing? DBPatch, PatchV2, ??? | ||
| func (d Deployable) CodePatch(dbc *sql.DB, sql string) string { | ||
| driver := dbc.Driver() | ||
| if _, ok := driver.(*mssql.Driver); ok { | ||
| return codeSchemaRegexp.ReplaceAllString(sql, fmt.Sprintf(`[code@%s]`, d.SchemaSuffix)) | ||
| } | ||
|
|
||
| if _, ok := driver.(*stdlib.Driver); ok { | ||
| return codeSchemaRegexp.ReplaceAllString(sql, fmt.Sprintf(`"code@%s"`, d.SchemaSuffix)) | ||
| } | ||
|
|
||
| panic("unhandled sql driver") | ||
| } | ||
|
|
||
| func (d *Deployable) markAsUploaded(dbc DB) { | ||
| d.uploaded[dbc] = struct{}{} | ||
| } | ||
|
|
@@ -211,7 +276,6 @@ func (d *Deployable) IsUploadedFromCache(dbc DB) bool { | |
|
|
||
| // TODO: StringConst. This requires parsing a SQL literal, a bit too complex | ||
| // to code up just-in-case | ||
|
|
||
| func (d Deployable) IntConst(s string) (int, error) { | ||
| for _, declare := range d.CodeBase.Declares { | ||
| if declare.VariableName == s { | ||
|
|
@@ -280,10 +344,28 @@ func (s *SchemaObject) Suffix() string { | |
|
|
||
| // Return a list of sqlcode schemas that have been uploaded to the database. | ||
| // This includes all current and unused schemas. | ||
| func (d *Deployable) ListUploaded(ctx context.Context, dbc DB) []*SchemaObject { | ||
| func (d *Deployable) ListUploaded(ctx context.Context, dbc DB) ([]*SchemaObject, error) { | ||
| objects := []*SchemaObject{} | ||
| impersonate(ctx, dbc, "sqlcode-deploy-sandbox-user", func(conn *sql.Conn) error { | ||
| rows, err := conn.QueryContext(ctx, ` | ||
| driver := dbc.Driver() | ||
| var qs string | ||
|
|
||
| var list = func(conn *sql.Conn) error { | ||
| rows, err := conn.QueryContext(ctx, qs) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| for rows.Next() { | ||
| zero := &SchemaObject{} | ||
| rows.Scan(&zero.Name, &zero.Objects, &zero.SchemaId, &zero.CreateDate, &zero.ModifyDate) | ||
| objects = append(objects, zero) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| if _, ok := driver.(*mssql.Driver); ok { | ||
| qs = ` | ||
| select | ||
| s.name | ||
| , s.schema_id | ||
|
|
@@ -298,18 +380,32 @@ func (d *Deployable) ListUploaded(ctx context.Context, dbc DB) []*SchemaObject { | |
| from sys.objects o | ||
| where o.schema_id = s.schema_id | ||
| ) as o | ||
| where s.name like 'code@%'`) | ||
| where s.name like 'code@%'` | ||
| impersonate(ctx, dbc, "sqlcode-deploy-sandbox-user", list) | ||
| } | ||
|
|
||
| // TODO(ks) the timestamps for schemas | ||
| if _, ok := driver.(*stdlib.Driver); ok { | ||
| qs = `select nspname as name | ||
| , oid as schema_id | ||
| , 0 as objects | ||
| , '' as create_date | ||
| , '' as modify_date | ||
| from pg_namespace | ||
| where nspname like 'code@%' | ||
| order by nspname` | ||
| conn, err := dbc.Conn(ctx) | ||
| if err != nil { | ||
| return err | ||
| return nil, err | ||
| } | ||
|
|
||
| for rows.Next() { | ||
| zero := &SchemaObject{} | ||
| rows.Scan(&zero.Name, &zero.Objects, &zero.SchemaId, &zero.CreateDate, &zero.ModifyDate) | ||
| objects = append(objects, zero) | ||
| err = list(conn) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| defer func() { | ||
| _ = conn.Close() | ||
| }() | ||
| } | ||
|
|
||
| return nil | ||
| }) | ||
| return objects | ||
| return objects, nil | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: The cli module is not yet compatible with postgres.