diff --git a/.gitignore b/.gitignore index 09835820..a653472a 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ /vendor dist node_modules +.vscode diff --git a/README.md b/README.md index c86a48ae..e9413acb 100644 --- a/README.md +++ b/README.md @@ -253,6 +253,33 @@ DATABASE_URL="sqlite:/tmp/database.sqlite3" DATABASE_URL="clickhouse://username:password@127.0.0.1:9000/database_name" ``` +To work with ClickHouse cluster, there are 4 connection query parameters that can be supplied: + +- `on_cluster` - Indicataion to use cluster statements and replicated migration table. (default: `false`) If this parameter is not supplied, other cluster related query parameters are ignored. +```sh +DATABASE_URL="clickhouse://username:password@127.0.0.1:9000/database_name?on_cluster" + +DATABASE_URL="clickhouse://username:password@127.0.0.1:9000/database_name?on_cluster=true" +``` + +- `cluster_macro` (Optional) - Macro value to be used for ON CLUSTER statements and for the replciated migration table engine zookeeper path. (default: `{cluster}`) + +```sh +DATABASE_URL="clickhouse://username:password@127.0.0.1:9000/database_name?on_cluster&cluster_macro={my_cluster}" +``` + +- `replica_macro` (Optional) - Macro value to be used for the replica name in the replciated migration table engine. (default: `{replica}`) + +```sh +DATABASE_URL="clickhouse://username:password@127.0.0.1:9000/database_name?on_cluster&replica_macro={my_replica}" +``` + +- `zoo_path` (Optional) - The path to the table migration in ClickHouse/Zoo Keeper. (default: `/clickhouse/tables//{table}`) + +```sh +DATABASE_URL="clickhouse://username:password@127.0.0.1:9000/database_name?on_cluster&zoo_path=/zk/path/tables" +``` + [See other supported connection options](https://github.com/ClickHouse/clickhouse-go#dsn). ### Creating Migrations diff --git a/docker-compose.yml b/docker-compose.yml index ef80af87..70396bbc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,8 +10,12 @@ services: - mysql - postgres - clickhouse + - clickhouse-cluster-01 + - clickhouse-cluster-02 environment: CLICKHOUSE_TEST_URL: clickhouse://clickhouse:9000/dbmate_test + CLICKHOUSE_CLUSTER_01_TEST_URL: clickhouse://ch-cluster-01:9000/dbmate_test + CLICKHOUSE_CLUSTER_02_TEST_URL: clickhouse://ch-cluster-02:9000/dbmate_test MYSQL_TEST_URL: mysql://root:root@mysql/dbmate_test POSTGRES_TEST_URL: postgres://postgres:postgres@postgres/dbmate_test?sslmode=disable SQLITE_TEST_URL: sqlite3:/tmp/dbmate_test.sqlite3 @@ -35,3 +39,27 @@ services: clickhouse: image: clickhouse/clickhouse-server:22.8 + + zookeeper: + image: zookeeper + hostname: zookeeper + + clickhouse-cluster-01: + image: clickhouse/clickhouse-server:22.8 + hostname: ch-cluster-01 + environment: + - CLICKHOUSE_CONFIG=/etc/clickhouse-server/config.xml + depends_on: + - zookeeper + volumes: + - ./pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-01:/etc/clickhouse-server + + clickhouse-cluster-02: + image: clickhouse/clickhouse-server:22.8 + hostname: ch-cluster-02 + environment: + - CLICKHOUSE_CONFIG=/etc/clickhouse-server/config.xml + depends_on: + - zookeeper + volumes: + - ./pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-02:/etc/clickhouse-server diff --git a/pkg/driver/clickhouse/clickhouse.go b/pkg/driver/clickhouse/clickhouse.go index 28935d71..0f176f64 100644 --- a/pkg/driver/clickhouse/clickhouse.go +++ b/pkg/driver/clickhouse/clickhouse.go @@ -25,6 +25,7 @@ type Driver struct { migrationsTableName string databaseURL *url.URL log io.Writer + clusterParameters *ClusterParameters } // NewDriver initializes the driver @@ -33,6 +34,7 @@ func NewDriver(config dbmate.DriverConfig) dbmate.Driver { migrationsTableName: config.MigrationsTableName, databaseURL: config.DatabaseURL, log: config.Log, + clusterParameters: ExtractClusterParametersFromURL(config.DatabaseURL), } } @@ -74,6 +76,8 @@ func connectionString(initialURL *url.URL) string { u.RawQuery = query.Encode() + u = ClearClusterParametersFromURL(u) + return u.String() } @@ -95,6 +99,15 @@ func (drv *Driver) openClickHouseDB() (*sql.DB, error) { return sql.Open("clickhouse", clickhouseURL.String()) } +func (drv *Driver) onClusterClause() string { + clusterClause := "" + if drv.clusterParameters.OnCluster { + escapedClusterMacro := drv.escapeString(drv.clusterParameters.ClusterMacro) + clusterClause = fmt.Sprintf(" ON CLUSTER '%s'", escapedClusterMacro) + } + return clusterClause +} + func (drv *Driver) databaseName() string { name := strings.TrimLeft(dbutil.MustParseURL(connectionString(drv.databaseURL)).Path, "/") if name == "" { @@ -115,6 +128,12 @@ func (drv *Driver) quoteIdentifier(str string) string { return fmt.Sprintf(`"%s"`, str) } +func (drv *Driver) escapeString(str string) string { + quoteEscaper := strings.NewReplacer(`'`, `\'`, `\`, `\\`) + str = quoteEscaper.Replace(str) + return str +} + // CreateDatabase creates the specified database func (drv *Driver) CreateDatabase() error { name := drv.databaseName() @@ -126,7 +145,9 @@ func (drv *Driver) CreateDatabase() error { } defer dbutil.MustClose(db) - _, err = db.Exec("create database " + drv.quoteIdentifier(name)) + q := fmt.Sprintf("CREATE DATABASE %s%s", drv.quoteIdentifier(name), drv.onClusterClause()) + + _, err = db.Exec(q) return err } @@ -142,14 +163,16 @@ func (drv *Driver) DropDatabase() error { } defer dbutil.MustClose(db) - _, err = db.Exec("drop database if exists " + drv.quoteIdentifier(name)) + q := fmt.Sprintf("DROP DATABASE IF EXISTS %s%s", drv.quoteIdentifier(name), drv.onClusterClause()) + + _, err = db.Exec(q) return err } func (drv *Driver) schemaDump(db *sql.DB, buf *bytes.Buffer, databaseName string) error { buf.WriteString("\n--\n-- Database schema\n--\n\n") - buf.WriteString("CREATE DATABASE IF NOT EXISTS " + drv.quoteIdentifier(databaseName) + ";\n\n") + buf.WriteString(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s%s;\n\n", drv.quoteIdentifier(databaseName), drv.onClusterClause())) tables, err := dbutil.QueryColumn(db, "show tables") if err != nil { @@ -250,15 +273,22 @@ func (drv *Driver) MigrationsTableExists(db *sql.DB) (bool, error) { // CreateMigrationsTable creates the schema migrations table func (drv *Driver) CreateMigrationsTable(db *sql.DB) error { + engineClause := "ReplacingMergeTree(ts)" + if drv.clusterParameters.OnCluster { + escapedZooPath := drv.escapeString(drv.clusterParameters.ZooPath) + escapedReplicaMacro := drv.escapeString(drv.clusterParameters.ReplicaMacro) + engineClause = fmt.Sprintf("ReplicatedReplacingMergeTree('%s', '%s', ts)", escapedZooPath, escapedReplicaMacro) + } + _, err := db.Exec(fmt.Sprintf(` - create table if not exists %s ( + create table if not exists %s%s ( version String, ts DateTime default now(), applied UInt8 default 1 - ) engine = ReplacingMergeTree(ts) + ) engine = %s primary key version order by version - `, drv.quotedMigrationsTableName())) + `, drv.quotedMigrationsTableName(), drv.onClusterClause(), engineClause)) return err } diff --git a/pkg/driver/clickhouse/clickhouse_cluster_test.go b/pkg/driver/clickhouse/clickhouse_cluster_test.go new file mode 100644 index 00000000..0c50a351 --- /dev/null +++ b/pkg/driver/clickhouse/clickhouse_cluster_test.go @@ -0,0 +1,351 @@ +package clickhouse + +import ( + "database/sql" + "fmt" + "os" + "testing" + + "github.com/amacneil/dbmate/v2/pkg/dbmate" + "github.com/amacneil/dbmate/v2/pkg/dbutil" + + "github.com/stretchr/testify/require" +) + +func testClickHouseDriverCluster01(t *testing.T) *Driver { + u := fmt.Sprintf("%s?on_cluster", os.Getenv("CLICKHOUSE_CLUSTER_01_TEST_URL")) + return testClickHouseDriverURL(t, u) +} + +func testClickHouseDriverCluster02(t *testing.T) *Driver { + u := fmt.Sprintf("%s?on_cluster", os.Getenv("CLICKHOUSE_CLUSTER_02_TEST_URL")) + return testClickHouseDriverURL(t, u) +} + +func assertDatabaseExists(t *testing.T, drv *Driver, shouldExist bool) { + db, err := sql.Open("clickhouse", drv.databaseURL.String()) + require.NoError(t, err) + defer dbutil.MustClose(db) + + err = db.Ping() + if shouldExist { + require.NoError(t, err) + } else { + require.EqualError(t, err, "code: 81, message: Database dbmate_test doesn't exist") + } +} + +// Makes sure driver creatinon is atomic +func TestDriverCreationSanity(t *testing.T) { + url := fmt.Sprintf("%s?on_cluster", os.Getenv("CLICKHOUSE_CLUSTER_01_TEST_URL")) + u := dbutil.MustParseURL(url) + dbm := dbmate.New(u) + drv, err := dbm.Driver() + require.NoError(t, err) + drvAgain, err := dbm.Driver() + require.NoError(t, err) + + require.Equal(t, drv, drvAgain) +} + +func TestOnClusterClause(t *testing.T) { + cases := []struct { + input string + expected string + }{ + // not on cluster + {"clickhouse://myhost:9000", ""}, + // on_cluster supplied + {"clickhouse://myhost:9000?on_cluster", " ON CLUSTER '{cluster}'"}, + // on_cluster with supplied macro + {"clickhouse://myhost:9000?on_cluster&cluster_macro={cluster2}", " ON CLUSTER '{cluster2}'"}, + } + + for _, c := range cases { + t.Run(c.input, func(t *testing.T) { + drv := testClickHouseDriverURL(t, c.input) + + actual := drv.onClusterClause() + require.Equal(t, c.expected, actual) + }) + } +} + +func TestClickHouseCreateDropDatabaseOnCluster(t *testing.T) { + drv01 := testClickHouseDriverCluster01(t) + drv02 := testClickHouseDriverCluster02(t) + + // drop any existing database + err := drv01.DropDatabase() + require.NoError(t, err) + + // create database + err = drv01.CreateDatabase() + require.NoError(t, err) + + // check that database exists and we can connect to it + assertDatabaseExists(t, drv01, true) + // check that database exists on the other clickhouse node and we can connect to it + assertDatabaseExists(t, drv02, true) + + // drop the database + err = drv01.DropDatabase() + require.NoError(t, err) + + // check that database no longer exists + assertDatabaseExists(t, drv01, false) + // check that database no longer exists on the other clickhouse node + assertDatabaseExists(t, drv02, false) +} + +func TestClickHouseDumpSchemaOnCluster(t *testing.T) { + drv := testClickHouseDriverCluster01(t) + drv.migrationsTableName = "test_migrations" + + // prepare database + db := prepTestClickHouseDB(t, drv) + defer dbutil.MustClose(db) + err := drv.CreateMigrationsTable(db) + require.NoError(t, err) + + // insert migration + tx, err := db.Begin() + require.NoError(t, err) + err = drv.InsertMigration(tx, "abc1") + require.NoError(t, err) + err = tx.Commit() + require.NoError(t, err) + tx, err = db.Begin() + require.NoError(t, err) + err = drv.InsertMigration(tx, "abc2") + require.NoError(t, err) + err = tx.Commit() + require.NoError(t, err) + + // DumpSchema should return schema + schema, err := drv.DumpSchema(db) + require.NoError(t, err) + require.Contains(t, string(schema), "CREATE TABLE "+drv.databaseName()+".test_migrations") + require.Contains(t, string(schema), "ENGINE = ReplicatedReplacingMergeTree") + require.Contains(t, string(schema), "--\n"+ + "-- Dbmate schema migrations\n"+ + "--\n\n"+ + "INSERT INTO test_migrations (version) VALUES\n"+ + " ('abc1'),\n"+ + " ('abc2');\n") + + // DumpSchema should return error if command fails + drv.databaseURL.Path = "/fakedb" + db, err = sql.Open("clickhouse", drv.databaseURL.String()) + require.NoError(t, err) + + schema, err = drv.DumpSchema(db) + require.Nil(t, schema) + require.EqualError(t, err, "code: 81, message: Database fakedb doesn't exist") +} + +func TestClickHouseCreateMigrationsTableOnCluster(t *testing.T) { + testCases := []struct { + name string + migrationsTable string + expectedTableName string + }{ + { + name: "default table", + migrationsTable: "", + expectedTableName: "schema_migrations", + }, + { + name: "custom table", + migrationsTable: "testMigrations", + expectedTableName: "\"testMigrations\"", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + drv01 := testClickHouseDriverCluster01(t) + drv02 := testClickHouseDriverCluster02(t) + if tc.migrationsTable != "" { + drv01.migrationsTableName = tc.migrationsTable + drv02.migrationsTableName = tc.migrationsTable + } + + db01 := prepTestClickHouseDB(t, drv01) + defer dbutil.MustClose(db01) + + db02 := prepTestClickHouseDB(t, drv02) + defer dbutil.MustClose(db02) + + // migrations table should not exist + exists, err := drv01.MigrationsTableExists(db01) + require.NoError(t, err) + require.Equal(t, false, exists) + + // migrations table should not exist on the other node + exists, err = drv02.MigrationsTableExists(db02) + require.NoError(t, err) + require.Equal(t, false, exists) + + // create table + err = drv01.CreateMigrationsTable(db01) + require.NoError(t, err) + + // migrations table should exist + exists, err = drv01.MigrationsTableExists(db01) + require.NoError(t, err) + require.Equal(t, true, exists) + + // migrations table should exist on other node + exists, err = drv02.MigrationsTableExists(db02) + require.NoError(t, err) + require.Equal(t, true, exists) + + // create table should be idempotent + err = drv01.CreateMigrationsTable(db01) + require.NoError(t, err) + }) + } +} + +func TestClickHouseSelectMigrationsOnCluster(t *testing.T) { + drv01 := testClickHouseDriverCluster01(t) + drv02 := testClickHouseDriverCluster02(t) + drv01.migrationsTableName = "test_migrations" + drv02.migrationsTableName = "test_migrations" + + db01 := prepTestClickHouseDB(t, drv01) + defer dbutil.MustClose(db01) + + db02 := prepTestClickHouseDB(t, drv02) + defer dbutil.MustClose(db02) + + err := drv01.CreateMigrationsTable(db01) + require.NoError(t, err) + + tx, err := db01.Begin() + require.NoError(t, err) + stmt, err := tx.Prepare("insert into test_migrations (version) values (?)") + require.NoError(t, err) + _, err = stmt.Exec("abc2") + require.NoError(t, err) + _, err = stmt.Exec("abc1") + require.NoError(t, err) + _, err = stmt.Exec("abc3") + require.NoError(t, err) + err = tx.Commit() + require.NoError(t, err) + + migrations01, err := drv01.SelectMigrations(db01, -1) + require.NoError(t, err) + require.Equal(t, true, migrations01["abc1"]) + require.Equal(t, true, migrations01["abc2"]) + require.Equal(t, true, migrations01["abc3"]) + + // Assert select on other node + migrations02, err := drv02.SelectMigrations(db02, -1) + require.NoError(t, err) + require.Equal(t, true, migrations02["abc1"]) + require.Equal(t, true, migrations02["abc2"]) + require.Equal(t, true, migrations02["abc3"]) + + // test limit param + migrations01, err = drv01.SelectMigrations(db01, 1) + require.NoError(t, err) + require.Equal(t, true, migrations01["abc3"]) + require.Equal(t, false, migrations01["abc1"]) + require.Equal(t, false, migrations01["abc2"]) + + // test limit param on other node + migrations02, err = drv02.SelectMigrations(db02, 1) + require.NoError(t, err) + require.Equal(t, true, migrations02["abc3"]) + require.Equal(t, false, migrations02["abc1"]) + require.Equal(t, false, migrations02["abc2"]) +} + +func TestClickHouseInsertMigrationOnCluster(t *testing.T) { + drv01 := testClickHouseDriverCluster01(t) + drv02 := testClickHouseDriverCluster02(t) + drv01.migrationsTableName = "test_migrations" + drv02.migrationsTableName = "test_migrations" + + db01 := prepTestClickHouseDB(t, drv01) + defer dbutil.MustClose(db01) + + db02 := prepTestClickHouseDB(t, drv02) + defer dbutil.MustClose(db02) + + err := drv01.CreateMigrationsTable(db01) + require.NoError(t, err) + + count01 := 0 + err = db01.QueryRow("select count(*) from test_migrations").Scan(&count01) + require.NoError(t, err) + require.Equal(t, 0, count01) + + count02 := 0 + err = db02.QueryRow("select count(*) from test_migrations").Scan(&count02) + require.NoError(t, err) + require.Equal(t, 0, count02) + + // insert migration + tx, err := db01.Begin() + require.NoError(t, err) + err = drv01.InsertMigration(tx, "abc1") + require.NoError(t, err) + err = tx.Commit() + require.NoError(t, err) + + err = db01.QueryRow("select count(*) from test_migrations where version = 'abc1'").Scan(&count01) + require.NoError(t, err) + require.Equal(t, 1, count01) + + err = db02.QueryRow("select count(*) from test_migrations where version = 'abc1'").Scan(&count02) + require.NoError(t, err) + require.Equal(t, 1, count02) +} + +func TestClickHouseDeleteMigrationOnCluster(t *testing.T) { + drv01 := testClickHouseDriverCluster01(t) + drv02 := testClickHouseDriverCluster02(t) + drv01.migrationsTableName = "test_migrations" + drv02.migrationsTableName = "test_migrations" + + db01 := prepTestClickHouseDB(t, drv01) + defer dbutil.MustClose(db01) + + db02 := prepTestClickHouseDB(t, drv02) + defer dbutil.MustClose(db02) + + err := drv01.CreateMigrationsTable(db01) + require.NoError(t, err) + + tx, err := db01.Begin() + require.NoError(t, err) + stmt, err := tx.Prepare("insert into test_migrations (version) values (?)") + require.NoError(t, err) + _, err = stmt.Exec("abc2") + require.NoError(t, err) + _, err = stmt.Exec("abc1") + require.NoError(t, err) + err = tx.Commit() + require.NoError(t, err) + + tx, err = db01.Begin() + require.NoError(t, err) + err = drv01.DeleteMigration(tx, "abc2") + require.NoError(t, err) + err = tx.Commit() + require.NoError(t, err) + + count01 := 0 + err = db01.QueryRow("select count(*) from test_migrations final where applied").Scan(&count01) + require.NoError(t, err) + require.Equal(t, 1, count01) + + count02 := 0 + err = db02.QueryRow("select count(*) from test_migrations final where applied").Scan(&count02) + require.NoError(t, err) + require.Equal(t, 1, count02) +} diff --git a/pkg/driver/clickhouse/clickhouse_test.go b/pkg/driver/clickhouse/clickhouse_test.go index ffd7ee03..ef1f521c 100644 --- a/pkg/driver/clickhouse/clickhouse_test.go +++ b/pkg/driver/clickhouse/clickhouse_test.go @@ -3,7 +3,6 @@ package clickhouse import ( "database/sql" "net/url" - "os" "testing" "github.com/amacneil/dbmate/v2/pkg/dbmate" @@ -12,32 +11,6 @@ import ( "github.com/stretchr/testify/require" ) -func testClickHouseDriver(t *testing.T) *Driver { - u := dbutil.MustParseURL(os.Getenv("CLICKHOUSE_TEST_URL")) - drv, err := dbmate.New(u).Driver() - require.NoError(t, err) - - return drv.(*Driver) -} - -func prepTestClickHouseDB(t *testing.T) *sql.DB { - drv := testClickHouseDriver(t) - - // drop any existing database - err := drv.DropDatabase() - require.NoError(t, err) - - // create database - err = drv.CreateDatabase() - require.NoError(t, err) - - // connect database - db, err := sql.Open("clickhouse", drv.databaseURL.String()) - require.NoError(t, err) - - return db -} - func TestGetDriver(t *testing.T) { db := dbmate.New(dbutil.MustParseURL("clickhouse://")) drvInterface, err := db.Driver() @@ -123,7 +96,7 @@ func TestClickHouseDumpSchema(t *testing.T) { drv.migrationsTableName = "test_migrations" // prepare database - db := prepTestClickHouseDB(t) + db := prepTestClickHouseDB(t, drv) defer dbutil.MustClose(db) err := drv.CreateMigrationsTable(db) require.NoError(t, err) @@ -203,7 +176,7 @@ func TestClickHouseDatabaseExists_Error(t *testing.T) { func TestClickHouseCreateMigrationsTable(t *testing.T) { t.Run("default table", func(t *testing.T) { drv := testClickHouseDriver(t) - db := prepTestClickHouseDB(t) + db := prepTestClickHouseDB(t, drv) defer dbutil.MustClose(db) // migrations table should not exist @@ -242,7 +215,7 @@ func TestClickHouseCreateMigrationsTable(t *testing.T) { drv := testClickHouseDriver(t) drv.migrationsTableName = "testMigrations" - db := prepTestClickHouseDB(t) + db := prepTestClickHouseDB(t, drv) defer dbutil.MustClose(db) // migrations table should not exist @@ -282,7 +255,7 @@ func TestClickHouseSelectMigrations(t *testing.T) { drv := testClickHouseDriver(t) drv.migrationsTableName = "test_migrations" - db := prepTestClickHouseDB(t) + db := prepTestClickHouseDB(t, drv) defer dbutil.MustClose(db) err := drv.CreateMigrationsTable(db) @@ -319,7 +292,7 @@ func TestClickHouseInsertMigration(t *testing.T) { drv := testClickHouseDriver(t) drv.migrationsTableName = "test_migrations" - db := prepTestClickHouseDB(t) + db := prepTestClickHouseDB(t, drv) defer dbutil.MustClose(db) err := drv.CreateMigrationsTable(db) @@ -347,7 +320,7 @@ func TestClickHouseDeleteMigration(t *testing.T) { drv := testClickHouseDriver(t) drv.migrationsTableName = "test_migrations" - db := prepTestClickHouseDB(t) + db := prepTestClickHouseDB(t, drv) defer dbutil.MustClose(db) err := drv.CreateMigrationsTable(db) @@ -418,3 +391,26 @@ func TestClickHouseQuotedMigrationsTableName(t *testing.T) { require.Equal(t, `"bizarre""$name"`, name) }) } + +func TestEscapeString(t *testing.T) { + cases := []struct { + input string + expected string + }{ + // nothig to escape + {`lets go`, `lets go`}, + // escape ' + {`let's go`, `let\'s go`}, + // escape \ + {`let\s go`, `let\\s go`}, + } + + for _, c := range cases { + t.Run(c.input, func(t *testing.T) { + drv := testClickHouseDriver(t) + + actual := drv.escapeString(c.input) + require.Equal(t, c.expected, actual) + }) + } +} diff --git a/pkg/driver/clickhouse/clickhouse_testutils_test.go b/pkg/driver/clickhouse/clickhouse_testutils_test.go new file mode 100644 index 00000000..0dfb4cff --- /dev/null +++ b/pkg/driver/clickhouse/clickhouse_testutils_test.go @@ -0,0 +1,40 @@ +package clickhouse + +import ( + "database/sql" + "os" + "testing" + + "github.com/amacneil/dbmate/v2/pkg/dbmate" + "github.com/amacneil/dbmate/v2/pkg/dbutil" + + "github.com/stretchr/testify/require" +) + +func testClickHouseDriverURL(t *testing.T, url string) *Driver { + u := dbutil.MustParseURL(url) + drv, err := dbmate.New(u).Driver() + require.NoError(t, err) + + return drv.(*Driver) +} + +func testClickHouseDriver(t *testing.T) *Driver { + return testClickHouseDriverURL(t, os.Getenv("CLICKHOUSE_TEST_URL")) +} + +func prepTestClickHouseDB(t *testing.T, drv *Driver) *sql.DB { + // drop any existing database + err := drv.DropDatabase() + require.NoError(t, err) + + // create database + err = drv.CreateDatabase() + require.NoError(t, err) + + // connect database + db, err := drv.Open() + require.NoError(t, err) + + return db +} diff --git a/pkg/driver/clickhouse/cluster_parameters.go b/pkg/driver/clickhouse/cluster_parameters.go new file mode 100644 index 00000000..3bd3212d --- /dev/null +++ b/pkg/driver/clickhouse/cluster_parameters.go @@ -0,0 +1,83 @@ +package clickhouse + +import ( + "fmt" + "net/url" +) + +const ( + OnClusterQueryParam = "on_cluster" + ZooPathQueryParam = "zoo_path" + ClusterMacroQueryParam = "cluster_macro" + ReplicaMacroQueryParam = "replica_macro" +) + +type ClusterParameters struct { + OnCluster bool + ZooPath string + ClusterMacro string + ReplicaMacro string +} + +func ClearClusterParametersFromURL(u *url.URL) *url.URL { + q := u.Query() + q.Del(OnClusterQueryParam) + q.Del(ClusterMacroQueryParam) + q.Del(ReplicaMacroQueryParam) + q.Del(ZooPathQueryParam) + u.RawQuery = q.Encode() + + return u +} + +func ExtractClusterParametersFromURL(u *url.URL) *ClusterParameters { + onCluster := extractOnCluster(u) + clusterMacro := extractClusterMacro(u) + replicaMacro := extractReplicaMacro(u) + zookeeperPath := extractZookeeperPath(u) + + r := &ClusterParameters{ + OnCluster: onCluster, + ZooPath: zookeeperPath, + ClusterMacro: clusterMacro, + ReplicaMacro: replicaMacro, + } + + return r +} + +func extractOnCluster(u *url.URL) bool { + v := u.Query() + hasOnCluster := v.Has(OnClusterQueryParam) + onClusterValue := v.Get(OnClusterQueryParam) + onCluster := hasOnCluster && (onClusterValue == "" || onClusterValue == "true") + return onCluster +} + +func extractClusterMacro(u *url.URL) string { + v := u.Query() + clusterMacro := v.Get(ClusterMacroQueryParam) + if clusterMacro == "" { + clusterMacro = "{cluster}" + } + return clusterMacro +} + +func extractReplicaMacro(u *url.URL) string { + v := u.Query() + replicaMacro := v.Get(ReplicaMacroQueryParam) + if replicaMacro == "" { + replicaMacro = "{replica}" + } + return replicaMacro +} + +func extractZookeeperPath(u *url.URL) string { + v := u.Query() + clusterMacro := extractClusterMacro(u) + zookeeperPath := v.Get(ZooPathQueryParam) + if zookeeperPath == "" { + zookeeperPath = fmt.Sprintf("/clickhouse/tables/%s/{table}", clusterMacro) + } + return zookeeperPath +} diff --git a/pkg/driver/clickhouse/cluster_parameters_test.go b/pkg/driver/clickhouse/cluster_parameters_test.go new file mode 100644 index 00000000..fb8c29f7 --- /dev/null +++ b/pkg/driver/clickhouse/cluster_parameters_test.go @@ -0,0 +1,97 @@ +package clickhouse + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/amacneil/dbmate/v2/pkg/dbutil" +) + +func TestOnCluster(t *testing.T) { + cases := []struct { + input string + expected bool + }{ + // param not supplied + {"clickhouse://myhost:9000", false}, + // empty on_cluster parameter + {"clickhouse://myhost:9000?on_cluster", true}, + // true on_cluster parameter + {"clickhouse://myhost:9000?on_cluster=true", true}, + // any other value on_cluster parameter + {"clickhouse://myhost:9000?on_cluster=falsy", false}, + } + + for _, c := range cases { + t.Run(c.input, func(t *testing.T) { + u := dbutil.MustParseURL(c.input) + + actual := extractOnCluster(u) + require.Equal(t, c.expected, actual) + }) + } +} + +func TestClusterMacro(t *testing.T) { + cases := []struct { + input string + expected string + }{ + // cluster_macro not supplied + {"clickhouse://myhost:9000", "{cluster}"}, + // cluster_macro supplied + {"clickhouse://myhost:9000?cluster_macro={cluster2}", "{cluster2}"}, + } + + for _, c := range cases { + t.Run(c.input, func(t *testing.T) { + u := dbutil.MustParseURL(c.input) + + actual := extractClusterMacro(u) + require.Equal(t, c.expected, actual) + }) + } +} + +func TestReplicaMacro(t *testing.T) { + cases := []struct { + input string + expected string + }{ + // replica_macro not supplied + {"clickhouse://myhost:9000", "{replica}"}, + // replica_macro supplied + {"clickhouse://myhost:9000?replica_macro={replica2}", "{replica2}"}, + } + + for _, c := range cases { + t.Run(c.input, func(t *testing.T) { + u := dbutil.MustParseURL(c.input) + + actual := extractReplicaMacro(u) + require.Equal(t, c.expected, actual) + }) + } +} + +func TestZookeeperPath(t *testing.T) { + cases := []struct { + input string + expected string + }{ + // zoo_path not supplied + {"clickhouse://myhost:9000", "/clickhouse/tables/{cluster}/{table}"}, + // zoo_path supplied + {"clickhouse://myhost:9000?zoo_path=/zk/path/tables", "/zk/path/tables"}, + } + + for _, c := range cases { + t.Run(c.input, func(t *testing.T) { + u := dbutil.MustParseURL(c.input) + + actual := extractZookeeperPath(u) + require.Equal(t, c.expected, actual) + }) + } +} diff --git a/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-01/config.d/database_atomic.xml b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-01/config.d/database_atomic.xml new file mode 100644 index 00000000..f0a41568 --- /dev/null +++ b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-01/config.d/database_atomic.xml @@ -0,0 +1,3 @@ + + 0 + diff --git a/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-01/config.d/docker_related_config.xml b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-01/config.d/docker_related_config.xml new file mode 100644 index 00000000..3025dc26 --- /dev/null +++ b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-01/config.d/docker_related_config.xml @@ -0,0 +1,12 @@ + + + :: + 0.0.0.0 + 1 + + + diff --git a/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-01/config.xml b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-01/config.xml new file mode 100644 index 00000000..23a7c8b4 --- /dev/null +++ b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-01/config.xml @@ -0,0 +1,127 @@ + + + + + trace + /var/log/clickhouse-server/clickhouse-server.log + /var/log/clickhouse-server/clickhouse-server.err.log + 100M + 2 + + + 8123 + 9000 + 9009 + + 4096 + 3 + 0 + 100 + 0 + 10000 + 0.9 + 4194304 + 0 + 8589934592 + 5368709120 + 1000 + 134217728 + 10000 + + /var/lib/clickhouse/ + /var/lib/clickhouse/tmp/ + /var/lib/clickhouse/user_files/ + + + + users.xml + + + /var/lib/clickhouse/access/ + + + + + false + false + false + false + + + default + default + true + + + + + + + + + + + true + + ch-cluster-01 + 9000 + + + + + + true + + ch-cluster-02 + 9000 + + + + + + + + zookeeper + 2181 + + + + + cluster-01 + shard-01 + ch_cluster-01 + + + + 3600 + 3600 + 60 + + + system + query_log
+ toYYYYMM(event_date) + + 7500 +
+ + *_dictionary.xml + + + + + /clickhouse/task_queue/ddl + + + + /var/lib/clickhouse/format_schemas/ + +
diff --git a/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-01/users.d/database_atomic_drop_sync.xml b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-01/users.d/database_atomic_drop_sync.xml new file mode 100644 index 00000000..386fce01 --- /dev/null +++ b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-01/users.d/database_atomic_drop_sync.xml @@ -0,0 +1,7 @@ + + + + 1 + + + diff --git a/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-01/users.d/default_profile.xml b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-01/users.d/default_profile.xml new file mode 100644 index 00000000..34cce837 --- /dev/null +++ b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-01/users.d/default_profile.xml @@ -0,0 +1,10 @@ + + + + + 2 + + + diff --git a/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-01/users.xml b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-01/users.xml new file mode 100644 index 00000000..96067d01 --- /dev/null +++ b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-01/users.xml @@ -0,0 +1,120 @@ + + + + + + + + + + random + + + + + 1 + + + + + + + + + + + + + ::/0 + + + + default + + + default + + + + + + + + + + + + + + 3600 + + + 0 + 0 + 0 + 0 + 0 + + + + diff --git a/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-02/config.d/database_atomic.xml b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-02/config.d/database_atomic.xml new file mode 100644 index 00000000..f0a41568 --- /dev/null +++ b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-02/config.d/database_atomic.xml @@ -0,0 +1,3 @@ + + 0 + diff --git a/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-02/config.d/docker_related_config.xml b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-02/config.d/docker_related_config.xml new file mode 100644 index 00000000..3025dc26 --- /dev/null +++ b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-02/config.d/docker_related_config.xml @@ -0,0 +1,12 @@ + + + :: + 0.0.0.0 + 1 + + + diff --git a/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-02/config.xml b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-02/config.xml new file mode 100644 index 00000000..b9a1e2cb --- /dev/null +++ b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-02/config.xml @@ -0,0 +1,127 @@ + + + + + trace + /var/log/clickhouse-server/clickhouse-server.log + /var/log/clickhouse-server/clickhouse-server.err.log + 100M + 2 + + + 8123 + 9000 + 9009 + + 4096 + 3 + 0 + 100 + 0 + 10000 + 0.9 + 4194304 + 0 + 8589934592 + 5368709120 + 1000 + 134217728 + 10000 + + /var/lib/clickhouse/ + /var/lib/clickhouse/tmp/ + /var/lib/clickhouse/user_files/ + + + + users.xml + + + /var/lib/clickhouse/access/ + + + + + false + false + false + false + + + default + default + true + + + + + + + + + + + true + + ch-cluster-01 + 9000 + + + + + + true + + ch-cluster-02 + 9000 + + + + + + + + zookeeper + 2181 + + + + + cluster-01 + shard-02 + ch_cluster-02 + + + + 3600 + 3600 + 60 + + + system + query_log
+ toYYYYMM(event_date) + + 7500 +
+ + *_dictionary.xml + + + + + /clickhouse/task_queue/ddl + + + + /var/lib/clickhouse/format_schemas/ + +
diff --git a/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-02/users.d/database_atomic_drop_sync.xml b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-02/users.d/database_atomic_drop_sync.xml new file mode 100644 index 00000000..386fce01 --- /dev/null +++ b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-02/users.d/database_atomic_drop_sync.xml @@ -0,0 +1,7 @@ + + + + 1 + + + diff --git a/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-02/users.d/default_profile.xml b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-02/users.d/default_profile.xml new file mode 100644 index 00000000..34cce837 --- /dev/null +++ b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-02/users.d/default_profile.xml @@ -0,0 +1,10 @@ + + + + + 2 + + + diff --git a/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-02/users.xml b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-02/users.xml new file mode 100644 index 00000000..96067d01 --- /dev/null +++ b/pkg/driver/clickhouse/testdata/cluster_config/ch-cluster-02/users.xml @@ -0,0 +1,120 @@ + + + + + + + + + + random + + + + + 1 + + + + + + + + + + + + + ::/0 + + + + default + + + default + + + + + + + + + + + + + + 3600 + + + 0 + 0 + 0 + 0 + 0 + + + +