From a74bd6f5c247abe96f6fa4bfb1c2d2cc418275cb Mon Sep 17 00:00:00 2001 From: NamanMahor Date: Thu, 2 Jul 2026 12:50:10 +0530 Subject: [PATCH 1/4] Merge getTable and Lookup function in information sche,a --- runtime/drivers/athena/information_schema.go | 109 +++--- .../drivers/athena/information_schema_test.go | 14 +- .../drivers/bigquery/information_schema.go | 93 ++---- .../drivers/clickhouse/information_schema.go | 114 ++----- .../clickhouse/information_schema_test.go | 165 ++++----- .../drivers/databricks/information_schema.go | 77 +---- .../databricks/information_schema_test.go | 48 +-- runtime/drivers/druid/information_schema.go | 106 ++---- .../drivers/druid/information_schema_test.go | 55 ++- runtime/drivers/duckdb/information_schema.go | 112 +------ .../drivers/duckdb/information_schema_test.go | 63 ++-- runtime/drivers/information_schema.go | 12 +- runtime/drivers/mysql/information_schema.go | 60 ++-- runtime/drivers/pinot/information_schema.go | 129 ++------ .../drivers/pinot/information_schema_test.go | 69 ++-- .../drivers/postgres/information_schema.go | 56 ++-- .../drivers/redshift/information_schema.go | 80 ++--- .../drivers/snowflake/information_schema.go | 71 ++-- .../drivers/starrocks/information_schema.go | 313 +++++++----------- runtime/server/connector_service.go | 12 +- 20 files changed, 584 insertions(+), 1174 deletions(-) diff --git a/runtime/drivers/athena/information_schema.go b/runtime/drivers/athena/information_schema.go index 86491edb6417..24f129428029 100644 --- a/runtime/drivers/athena/information_schema.go +++ b/runtime/drivers/athena/information_schema.go @@ -115,43 +115,67 @@ func (c *Connection) ListTables(ctx context.Context, database, databaseSchema st return res, next, nil } -func (c *Connection) GetTable(ctx context.Context, database, databaseSchema, table string) (*drivers.TableMetadata, error) { +func (c *Connection) Lookup(ctx context.Context, database, databaseSchema, name string) (*drivers.OlapTable, error) { q := fmt.Sprintf(` -SELECT - CASE t.table_type WHEN 'VIEW' THEN true ELSE false END AS view, - column_name, - data_type -FROM %s.information_schema.columns c -JOIN %s.information_schema.tables t - ON t.table_schema = c.table_schema AND t.table_name = c.table_name -WHERE c.table_schema = ? AND c.table_name = ? -ORDER BY c.ordinal_position -`, sqlSafeName(database), sqlSafeName(database)) + SELECT + CASE t.table_type WHEN 'VIEW' THEN true ELSE false END AS view, + column_name, + data_type + FROM %s.information_schema.columns c + JOIN %s.information_schema.tables t + ON t.table_schema = c.table_schema AND t.table_name = c.table_name + WHERE c.table_schema = ? AND c.table_name = ? + ORDER BY c.ordinal_position + `, sqlSafeName(database), sqlSafeName(database)) rows, err := c.Query(ctx, &drivers.Statement{ Query: q, - Args: []any{databaseSchema, table}, + Args: []any{databaseSchema, name}, }) if err != nil { return nil, err } defer rows.Close() - res := &drivers.TableMetadata{ - Schema: make(map[string]string), - } + var view bool var col, typ string + fields := make([]*runtimev1.StructType_Field, 0) for rows.Next() { - err = rows.Scan(&res.View, &col, &typ) - if err != nil { + if err = rows.Scan(&view, &col, &typ); err != nil { return nil, err } - res.Schema[col] = typ - } - if err := rows.Err(); err != nil { - return nil, err + fields = append(fields, &runtimev1.StructType_Field{ + Name: col, + Type: athenaTypeToRuntimeType(typ), + }) } - return res, nil + + return &drivers.OlapTable{ + Database: database, + DatabaseSchema: databaseSchema, + Name: name, + View: view, + Schema: &runtimev1.StructType{ + Fields: fields, + }, + UnsupportedCols: nil, + PhysicalSizeBytes: 0, + }, rows.Err() +} + +// All implements drivers.OLAPInformationSchema. +func (c *Connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { + return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c) +} + +// LoadPhysicalSize implements drivers.OLAPInformationSchema. +func (c *Connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { + return nil +} + +// LoadDDL implements drivers.OLAPInformationSchema. +func (c *Connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { + return nil // Not implemented } func (c *Connection) listCatalogs(ctx context.Context, client *athena.Client) ([]string, error) { @@ -181,47 +205,6 @@ func (c *Connection) listCatalogs(ctx context.Context, client *athena.Client) ([ return catalogs, nil } -// All implements drivers.OLAPInformationSchema. -func (c *Connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { - return drivers.AllFromInformationSchema(ctx, like, pageSize, pageToken, c) -} - -// LoadPhysicalSize implements drivers.OLAPInformationSchema. -func (c *Connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { - return nil -} - -// LoadDDL implements drivers.OLAPInformationSchema. -func (c *Connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { - return nil // Not implemented -} - -// Lookup implements drivers.OLAPInformationSchema. -func (c *Connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { - meta, err := c.GetTable(ctx, db, schema, name) - if err != nil { - return nil, err - } - runtimeSchema := &runtimev1.StructType{ - Fields: make([]*runtimev1.StructType_Field, 0, len(meta.Schema)), - } - for name, typ := range meta.Schema { - runtimeSchema.Fields = append(runtimeSchema.Fields, &runtimev1.StructType_Field{ - Name: name, - Type: athenaTypeToRuntimeType(typ), - }) - } - return &drivers.OlapTable{ - Database: db, - DatabaseSchema: schema, - Name: name, - View: meta.View, - Schema: runtimeSchema, - UnsupportedCols: nil, - PhysicalSizeBytes: 0, - }, nil -} - func (c *Connection) listSchemasForCatalog(ctx context.Context, client *athena.Client, catalog string) ([]*drivers.DatabaseSchemaInfo, error) { // Use catalog if specified var q string diff --git a/runtime/drivers/athena/information_schema_test.go b/runtime/drivers/athena/information_schema_test.go index 979f0ac104d3..de331233c355 100644 --- a/runtime/drivers/athena/information_schema_test.go +++ b/runtime/drivers/athena/information_schema_test.go @@ -20,20 +20,20 @@ func TestGetTable(t *testing.T) { require.True(t, ok) // Test getting metadata for the all_datatypes table - metadata, err := infoSchema.GetTable(ctx, "awsdatacatalog", "integration_test", "all_datatypes") + metadata, err := infoSchema.Lookup(ctx, "awsdatacatalog", "integration_test", "all_datatypes") require.NoError(t, err) require.NotNil(t, metadata) require.False(t, metadata.View) require.NotEmpty(t, metadata.Schema) // Verify some expected columns exist - _, hasID := metadata.Schema["id"] + hasID := metadata.Schema.Fields[0].Name == "id" require.True(t, hasID, "Expected 'id' column in table schema") - _, hasInt32 := metadata.Schema["int32_col"] + hasInt32 := metadata.Schema.Fields[2].Name == "int32_col" require.True(t, hasInt32, "Expected 'int32_col' column in table schema") - _, hasFloat := metadata.Schema["float_col"] + hasFloat := metadata.Schema.Fields[4].Name == "float_col" require.True(t, hasFloat, "Expected 'float_col' column in table schema") }) @@ -53,17 +53,17 @@ func TestGetTable(t *testing.T) { }) // Get metadata for the view - metadata, err := infoSchema.GetTable(ctx, "awsdatacatalog", "integration_test", "test_view") + metadata, err := infoSchema.Lookup(ctx, "awsdatacatalog", "integration_test", "test_view") require.NoError(t, err) require.NotNil(t, metadata) require.True(t, metadata.View, "Expected test_view to be identified as a view") require.NotEmpty(t, metadata.Schema) // Verify columns from the view - _, hasID := metadata.Schema["id"] + hasID := metadata.Schema.Fields[0].Name == "id" require.True(t, hasID, "Expected 'id' column in view schema") - _, hasInt32 := metadata.Schema["int32_col"] + hasInt32 := metadata.Schema.Fields[1].Name == "int32_col" require.True(t, hasInt32, "Expected 'int32_col' column in view schema") }) } diff --git a/runtime/drivers/bigquery/information_schema.go b/runtime/drivers/bigquery/information_schema.go index 657904074680..7f2ae533aae7 100644 --- a/runtime/drivers/bigquery/information_schema.go +++ b/runtime/drivers/bigquery/information_schema.go @@ -124,54 +124,37 @@ func (c *Connection) ListTables(ctx context.Context, database, databaseSchema st return res, next, nil } -func (c *Connection) GetTable(ctx context.Context, database, databaseSchema, table string) (*drivers.TableMetadata, error) { - q := fmt.Sprintf(` - SELECT - CASE t.table_type WHEN 'VIEW' THEN true else false END AS is_view, - c.column_name, - c.data_type - FROM `+"`%s.%s.INFORMATION_SCHEMA.TABLES`"+` AS t - JOIN `+"`%s.%s.INFORMATION_SCHEMA.COLUMNS`"+` AS c - ON t.table_name = c.table_name - WHERE c.table_name = @table - ORDER BY c.ordinal_position - `, database, databaseSchema, database, databaseSchema) - +func (c *Connection) Lookup(ctx context.Context, database, databaseSchema, name string) (*drivers.OlapTable, error) { client, err := c.getClient(ctx) if err != nil { return nil, fmt.Errorf("failed to get BigQuery client: %w", err) } - cq := client.Query(q) - cq.Parameters = []bigquery.QueryParameter{ - {Name: "table", Value: table}, - } - it, err := cq.Read(ctx) - if err != nil { - return nil, fmt.Errorf("failed to run INFORMATION_SCHEMA query: %w", err) + var table *bigquery.Table + if database != "" { + table = client.DatasetInProject(database, databaseSchema).Table(name) + } else { + table = client.Dataset(databaseSchema).Table(name) } - r := &drivers.TableMetadata{ - Schema: make(map[string]string), + meta, err := table.Metadata(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get table metadata: %w", err) } - var row struct { - IsView bool `bigquery:"is_view"` - ColumnName string `bigquery:"column_name"` - DataType string `bigquery:"data_type"` + runtimeSchema, err := fromBQSchema(meta.Schema) + if err != nil { + return nil, err } - for { - err := it.Next(&row) - if errors.Is(err, iterator.Done) { - break - } - if err != nil { - return nil, fmt.Errorf("failed to iterate over schema rows: %w", err) - } - r.Schema[row.ColumnName] = row.DataType - r.View = row.IsView + tbl := &drivers.OlapTable{ + Database: database, + DatabaseSchema: databaseSchema, + Name: name, + View: meta.Type == bigquery.ViewTable, + Schema: runtimeSchema, + UnsupportedCols: nil, // all columns are currently being mapped though may not be as specific as in BigQuery + PhysicalSizeBytes: 0, } - - return r, nil + return tbl, nil } // All implements drivers.OLAPInformationSchema. @@ -212,37 +195,3 @@ func (c *Connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) erro table.DDL = row.DDL return nil } - -// Lookup implements drivers.OLAPInformationSchema. -func (c *Connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { - client, err := c.getClient(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get BigQuery client: %w", err) - } - - var table *bigquery.Table - if db != "" { - table = client.DatasetInProject(db, schema).Table(name) - } else { - table = client.Dataset(schema).Table(name) - } - - meta, err := table.Metadata(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get table metadata: %w", err) - } - runtimeSchema, err := fromBQSchema(meta.Schema) - if err != nil { - return nil, err - } - tbl := &drivers.OlapTable{ - Database: db, - DatabaseSchema: schema, - Name: name, - View: meta.Type == bigquery.ViewTable, - Schema: runtimeSchema, - UnsupportedCols: nil, // all columns are currently being mapped though may not be as specific as in BigQuery - PhysicalSizeBytes: 0, - } - return tbl, nil -} diff --git a/runtime/drivers/clickhouse/information_schema.go b/runtime/drivers/clickhouse/information_schema.go index 9139eee13d72..5e55ce23453f 100644 --- a/runtime/drivers/clickhouse/information_schema.go +++ b/runtime/drivers/clickhouse/information_schema.go @@ -158,58 +158,51 @@ func (c *Connection) ListTables(ctx context.Context, database, databaseSchema st return res, next, nil } -func (c *Connection) GetTable(ctx context.Context, database, databaseSchema, table string) (*drivers.TableMetadata, error) { +func (c *Connection) Lookup(ctx context.Context, database, databaseSchema, name string) (*drivers.OlapTable, error) { conn, release, err := c.acquireMetaConn(ctx) if err != nil { return nil, err } defer func() { _ = release() }() - q := ` - SELECT - CASE WHEN match(engine, 'View') THEN true ELSE false END AS view, - c.name AS column_name, - c.type AS data_type - FROM system.tables t - LEFT JOIN system.columns c - ON t.database = c.database AND t.name = c.table - WHERE t.database = ? AND t.name = ? - ORDER BY c.position - ` - rows, err := conn.QueryxContext(ctx, q, databaseSchema, table) + var q string + var args []any + q = ` + SELECT + T.database AS SCHEMA, + T.database = currentDatabase() AS is_default_schema, + T.name AS NAME, + if(lower(T.engine) like '%view%', 'VIEW', 'TABLE') AS TABLE_TYPE, + C.name AS COLUMNS, + C.type AS COLUMN_TYPE, + C.position AS ORDINAL_POSITION + FROM system.tables T + JOIN system.columns C ON T.database = C.database AND T.name = C.table + WHERE T.database = coalesce(?, currentDatabase()) AND T.name = ? + ORDER BY SCHEMA, NAME, TABLE_TYPE, ORDINAL_POSITION + ` + if databaseSchema == "" { + args = append(args, nil, name) + } else { + args = append(args, databaseSchema, name) + } + + rows, err := conn.QueryxContext(ctx, q, args...) if err != nil { return nil, err } defer rows.Close() - schemaMap := make(map[string]string) - var view bool - var colName, dataType string - for rows.Next() { - if err := rows.Scan(&view, &colName, &dataType); err != nil { - return nil, err - } - if pbType, err := databaseTypeToPB(dataType, false); err != nil { - if errors.Is(err, errUnsupportedType) { - schemaMap[colName] = fmt.Sprintf("UNKNOWN(%s)", dataType) - } else { - return nil, err - } - } else if pbType.Code == runtimev1.Type_CODE_UNSPECIFIED { - schemaMap[colName] = fmt.Sprintf("UNKNOWN(%s)", dataType) - } else { - schemaMap[colName] = strings.TrimPrefix(pbType.Code.String(), "CODE_") - } + tables, err := scanTables(rows) + if err != nil { + return nil, err } - if err := rows.Err(); err != nil { - return nil, err + if len(tables) == 0 { + return nil, drivers.ErrNotFound } - return &drivers.TableMetadata{ - Schema: schemaMap, - View: view, - }, nil + return tables[0], nil } func (c *Connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { @@ -310,53 +303,6 @@ func (c *Connection) All(ctx context.Context, like string, pageSize uint32, page return tables, next, nil } -func (c *Connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { - conn, release, err := c.acquireMetaConn(ctx) - if err != nil { - return nil, err - } - defer func() { _ = release() }() - - var q string - var args []any - q = ` - SELECT - T.database AS SCHEMA, - T.database = currentDatabase() AS is_default_schema, - T.name AS NAME, - if(lower(T.engine) like '%view%', 'VIEW', 'TABLE') AS TABLE_TYPE, - C.name AS COLUMNS, - C.type AS COLUMN_TYPE, - C.position AS ORDINAL_POSITION - FROM system.tables T - JOIN system.columns C ON T.database = C.database AND T.name = C.table - WHERE T.database = coalesce(?, currentDatabase()) AND T.name = ? - ORDER BY SCHEMA, NAME, TABLE_TYPE, ORDINAL_POSITION - ` - if schema == "" { - args = append(args, nil, name) - } else { - args = append(args, schema, name) - } - - rows, err := conn.QueryxContext(ctx, q, args...) - if err != nil { - return nil, err - } - defer rows.Close() - - tables, err := scanTables(rows) - if err != nil { - return nil, err - } - - if len(tables) == 0 { - return nil, drivers.ErrNotFound - } - - return tables[0], nil -} - func (c *Connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { if len(tables) == 0 { return nil diff --git a/runtime/drivers/clickhouse/information_schema_test.go b/runtime/drivers/clickhouse/information_schema_test.go index 5c1dbe391f44..b5b29081ca9e 100644 --- a/runtime/drivers/clickhouse/information_schema_test.go +++ b/runtime/drivers/clickhouse/information_schema_test.go @@ -31,14 +31,13 @@ func TestInformationSchema(t *testing.T) { require.NoError(t, err) testInformationSchemaSystemAllLike(t, conn) }) - t.Run("testInformationSchemaLookup", func(t *testing.T) { testInformationSchemaLookup(t, conn) }) t.Run("testInformationSchemaAllPagination", func(t *testing.T) { testInformationSchemaAllPagination(t, conn) }) t.Run("testInformationSchemaAllPaginationWithLike", func(t *testing.T) { testInformationSchemaAllPaginationWithLike(t, conn) }) t.Run("testInformationSchemaListDatabaseSchemas", func(t *testing.T) { testInformationSchemaListDatabaseSchemas(t, infoSchema) }) - t.Run("testInformationSchemaListTables", func(t *testing.T) { testInformationSchemaListTables(t, infoSchema) }) - t.Run("testInformationSchemaGetTable", func(t *testing.T) { testInformationSchemaGetTable(t, infoSchema) }) t.Run("testInformationSchemaListDatabaseSchemasPagination", func(t *testing.T) { testInformationSchemaListDatabaseSchemasPagination(t, infoSchema) }) + t.Run("testInformationSchemaListTables", func(t *testing.T) { testInformationSchemaListTables(t, infoSchema) }) t.Run("testInformationSchemaListTablesPagination", func(t *testing.T) { testInformationSchemaListTablesPagination(t, infoSchema) }) + t.Run("testInformationSchemaLookup", func(t *testing.T) { testInformationSchemaLookup(t, conn) }) t.Run("testLoadDDL", func(t *testing.T) { testLoadDDL(t, conn) }) } @@ -104,29 +103,6 @@ func testInformationSchemaSystemAllLike(t *testing.T, conn drivers.Handle) { require.Equal(t, "bar", tables[0].Name) } -func testInformationSchemaLookup(t *testing.T, conn drivers.Handle) { - olap, _ := conn.AsOLAP("") - ctx := context.Background() - table, err := olap.InformationSchema().Lookup(ctx, "", "", "foo") - require.NoError(t, err) - require.Equal(t, "foo", table.Name) - require.Equal(t, true, table.IsDefaultDatabaseSchema) - - _, err = olap.InformationSchema().Lookup(ctx, "", "", "bad") - require.Equal(t, drivers.ErrNotFound, err) - - table, err = olap.InformationSchema().Lookup(ctx, "", "", "model") - require.NoError(t, err) - require.Equal(t, "model", table.Name) - require.Equal(t, true, table.IsDefaultDatabaseSchema) - - table, err = olap.InformationSchema().Lookup(ctx, "", "other", "foo") - require.NoError(t, err) - require.Equal(t, "foo", table.Name) - require.Equal(t, "other", table.DatabaseSchema) - require.Equal(t, false, table.IsDefaultDatabaseSchema) -} - func testInformationSchemaAllPagination(t *testing.T, conn drivers.Handle) { olap, _ := conn.AsOLAP("") ctx := context.Background() @@ -208,6 +184,29 @@ func testInformationSchemaListDatabaseSchemas(t *testing.T, infoSchema drivers.I require.Equal(t, "other", databaseSchemaInfo[2].DatabaseSchema) } +func testInformationSchemaListDatabaseSchemasPagination(t *testing.T, infoSchema drivers.InformationSchema) { + ctx := context.Background() + pageSize := 2 + + // First page + page1, token1, err := infoSchema.ListDatabaseSchemas(ctx, uint32(pageSize), "") + require.NoError(t, err) + require.Len(t, page1, pageSize) + require.NotEmpty(t, token1) + + // second page + page2, token2, err := infoSchema.ListDatabaseSchemas(ctx, uint32(pageSize), token1) + require.NoError(t, err) + require.NotEmpty(t, page2) + require.Empty(t, token2) + + // Page size 0 + all, token, err := infoSchema.ListDatabaseSchemas(ctx, 0, "") + require.NoError(t, err) + require.Equal(t, len(all), 3) + require.Empty(t, token) +} + func testInformationSchemaListTables(t *testing.T, infoSchema drivers.InformationSchema) { tables, _, err := infoSchema.ListTables(context.Background(), "", "default", 0, "") require.NoError(t, err) @@ -230,82 +229,71 @@ func testInformationSchemaListTables(t *testing.T, infoSchema drivers.Informatio require.Equal(t, false, tables[1].View) } -func testInformationSchemaGetTable(t *testing.T, infoSchema drivers.InformationSchema) { - ctx := context.Background() - - // Existing table - foo, err := infoSchema.GetTable(ctx, "", "default", "foo") - require.NoError(t, err) - require.Len(t, foo.Schema, 2) - require.Equal(t, "STRING", foo.Schema["bar"]) - require.Equal(t, "INT32", foo.Schema["baz"]) - require.False(t, foo.View) - - // Non-existent table - noTable, err := infoSchema.GetTable(ctx, "", "default", "nonexistent_table") - require.NoError(t, err) - require.Empty(t, noTable.Schema) - - // View - model, err := infoSchema.GetTable(ctx, "", "default", "model") - require.NoError(t, err) - require.Equal(t, "UINT8", model.Schema["1"]) - require.Equal(t, "UINT8", model.Schema["2"]) - require.Equal(t, "UINT8", model.Schema["3"]) - require.True(t, model.View) - - ofoo, err := infoSchema.GetTable(ctx, "", "other", "foo") - require.NoError(t, err) - require.Equal(t, "STRING", ofoo.Schema["bar"]) - require.Equal(t, "INT32", ofoo.Schema["baz"]) - require.Equal(t, false, ofoo.View) - -} - -func testInformationSchemaListDatabaseSchemasPagination(t *testing.T, infoSchema drivers.InformationSchema) { +func testInformationSchemaListTablesPagination(t *testing.T, infoSchema drivers.InformationSchema) { ctx := context.Background() pageSize := 2 // First page - page1, token1, err := infoSchema.ListDatabaseSchemas(ctx, uint32(pageSize), "") + page1, token1, err := infoSchema.ListTables(ctx, "", "default", uint32(pageSize), "") require.NoError(t, err) require.Len(t, page1, pageSize) require.NotEmpty(t, token1) - // second page - page2, token2, err := infoSchema.ListDatabaseSchemas(ctx, uint32(pageSize), token1) + // Second page + page2, token2, err := infoSchema.ListTables(ctx, "", "default", uint32(pageSize), token1) require.NoError(t, err) require.NotEmpty(t, page2) require.Empty(t, token2) // Page size 0 - all, token, err := infoSchema.ListDatabaseSchemas(ctx, 0, "") + all, token, err := infoSchema.ListTables(ctx, "", "default", 0, "") require.NoError(t, err) - require.Equal(t, len(all), 3) + require.GreaterOrEqual(t, len(all), 3) require.Empty(t, token) } -func testInformationSchemaListTablesPagination(t *testing.T, infoSchema drivers.InformationSchema) { +func testInformationSchemaLookup(t *testing.T, conn drivers.Handle) { + olap, _ := conn.AsOLAP("") ctx := context.Background() - pageSize := 2 + table, err := olap.InformationSchema().Lookup(ctx, "", "", "foo") + require.NoError(t, err) + require.Equal(t, "foo", table.Name) + require.Equal(t, true, table.IsDefaultDatabaseSchema) - // First page - page1, token1, err := infoSchema.ListTables(ctx, "", "default", uint32(pageSize), "") + _, err = olap.InformationSchema().Lookup(ctx, "", "", "bad") + require.Equal(t, drivers.ErrNotFound, err) + + table, err = olap.InformationSchema().Lookup(ctx, "", "", "model") require.NoError(t, err) - require.Len(t, page1, pageSize) - require.NotEmpty(t, token1) + require.Equal(t, "model", table.Name) + require.Equal(t, true, table.IsDefaultDatabaseSchema) - // Second page - page2, token2, err := infoSchema.ListTables(ctx, "", "default", uint32(pageSize), token1) + table, err = olap.InformationSchema().Lookup(ctx, "", "other", "foo") require.NoError(t, err) - require.NotEmpty(t, page2) - require.Empty(t, token2) + require.Equal(t, "foo", table.Name) + require.Equal(t, "other", table.DatabaseSchema) + require.Equal(t, false, table.IsDefaultDatabaseSchema) +} - // Page size 0 - all, token, err := infoSchema.ListTables(ctx, "", "default", 0, "") +func testLoadDDL(t *testing.T, conn drivers.Handle) { + olap, _ := conn.AsOLAP("") + ctx := context.Background() + + // Test DDL for a table + table, err := olap.InformationSchema().Lookup(ctx, "", "", "foo") require.NoError(t, err) - require.GreaterOrEqual(t, len(all), 3) - require.Empty(t, token) + err = olap.InformationSchema().LoadDDL(ctx, table) + require.NoError(t, err) + require.Contains(t, table.DDL, "CREATE TABLE") + require.Contains(t, table.DDL, "foo") + + // Test DDL for a view + view, err := olap.InformationSchema().Lookup(ctx, "", "", "model") + require.NoError(t, err) + err = olap.InformationSchema().LoadDDL(ctx, view) + require.NoError(t, err) + require.Contains(t, view.DDL, "CREATE VIEW") + require.Contains(t, view.DDL, "model") } func prepareConn(t *testing.T, conn drivers.Handle) { @@ -375,24 +363,3 @@ func prepareConn(t *testing.T, conn drivers.Handle) { }) require.NoError(t, err) } - -func testLoadDDL(t *testing.T, conn drivers.Handle) { - olap, _ := conn.AsOLAP("") - ctx := context.Background() - - // Test DDL for a table - table, err := olap.InformationSchema().Lookup(ctx, "", "", "foo") - require.NoError(t, err) - err = olap.InformationSchema().LoadDDL(ctx, table) - require.NoError(t, err) - require.Contains(t, table.DDL, "CREATE TABLE") - require.Contains(t, table.DDL, "foo") - - // Test DDL for a view - view, err := olap.InformationSchema().Lookup(ctx, "", "", "model") - require.NoError(t, err) - err = olap.InformationSchema().LoadDDL(ctx, view) - require.NoError(t, err) - require.Contains(t, view.DDL, "CREATE VIEW") - require.Contains(t, view.DDL, "model") -} diff --git a/runtime/drivers/databricks/information_schema.go b/runtime/drivers/databricks/information_schema.go index 2ca6fc28a919..08cead19288b 100644 --- a/runtime/drivers/databricks/information_schema.go +++ b/runtime/drivers/databricks/information_schema.go @@ -135,7 +135,7 @@ func (c *connection) ListTables(ctx context.Context, database, databaseSchema st return res, next, nil } -func (c *connection) GetTable(ctx context.Context, database, databaseSchema, table string) (*drivers.TableMetadata, error) { +func (c *connection) Lookup(ctx context.Context, database, databaseSchema, name string) (*drivers.OlapTable, error) { prefix := catalogPrefix(database) q := fmt.Sprintf(` SELECT @@ -149,34 +149,40 @@ func (c *connection) GetTable(ctx context.Context, database, databaseSchema, tab ORDER BY c.ordinal_position `, prefix, prefix) - db, err := c.getDB(ctx) + conn, err := c.getDB(ctx) if err != nil { return nil, err } - rows, err := db.QueryContext(ctx, q, databaseSchema, table) + rows, err := conn.QueryContext(ctx, q, databaseSchema, name) if err != nil { return nil, err } defer rows.Close() - t := &drivers.TableMetadata{ - Schema: make(map[string]string), - } - var colName, colType string var isView bool + var fields []*runtimev1.StructType_Field + var colName, colType string for rows.Next() { if err := rows.Scan(&isView, &colName, &colType); err != nil { return nil, err } - t.Schema[colName] = colType - t.View = isView + fields = append(fields, &runtimev1.StructType_Field{ + Name: colName, + Type: databaseTypeToPB(colType), + }) } if err := rows.Err(); err != nil { return nil, err } - return t, nil + return &drivers.OlapTable{ + Database: database, + DatabaseSchema: databaseSchema, + Name: name, + View: isView, + Schema: &runtimev1.StructType{Fields: fields}, + }, nil } // All implements drivers.OLAPInformationSchema. @@ -212,57 +218,6 @@ func (c *connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) erro return nil } -// Lookup implements drivers.OLAPInformationSchema. -func (c *connection) Lookup(ctx context.Context, database, schema, name string) (*drivers.OlapTable, error) { - prefix := catalogPrefix(database) - q := fmt.Sprintf(` - SELECT - CASE WHEN t.table_type = 'VIEW' THEN true ELSE false END AS is_view, - c.column_name, - c.data_type - FROM %sinformation_schema.tables t - JOIN %sinformation_schema.columns c - ON t.table_schema = c.table_schema AND t.table_name = c.table_name - WHERE t.table_schema = ? AND t.table_name = ? - ORDER BY c.ordinal_position - `, prefix, prefix) - - conn, err := c.getDB(ctx) - if err != nil { - return nil, err - } - - rows, err := conn.QueryContext(ctx, q, schema, name) - if err != nil { - return nil, err - } - defer rows.Close() - - var isView bool - var fields []*runtimev1.StructType_Field - var colName, colType string - for rows.Next() { - if err := rows.Scan(&isView, &colName, &colType); err != nil { - return nil, err - } - fields = append(fields, &runtimev1.StructType_Field{ - Name: colName, - Type: databaseTypeToPB(colType), - }) - } - if err := rows.Err(); err != nil { - return nil, err - } - - return &drivers.OlapTable{ - Database: database, - DatabaseSchema: schema, - Name: name, - View: isView, - Schema: &runtimev1.StructType{Fields: fields}, - }, nil -} - // catalogPrefix returns "." if catalog is non-empty, or "" otherwise. func catalogPrefix(catalog string) string { if catalog == "" { diff --git a/runtime/drivers/databricks/information_schema_test.go b/runtime/drivers/databricks/information_schema_test.go index 1a84ac58aaf5..34902a085064 100644 --- a/runtime/drivers/databricks/information_schema_test.go +++ b/runtime/drivers/databricks/information_schema_test.go @@ -51,7 +51,7 @@ func TestListTables(t *testing.T) { require.True(t, found, "expected all_datatypes table to be present") } -func TestGetTable(t *testing.T) { +func TestLookup(t *testing.T) { t.Skip("skipping due to inactive Databricks account") testmode.Expensive(t) @@ -59,7 +59,7 @@ func TestGetTable(t *testing.T) { is, ok := conn.AsInformationSchema() require.True(t, ok) - meta, err := is.GetTable(t.Context(), "", "integration_test", "all_datatypes") + meta, err := is.Lookup(t.Context(), "", "integration_test", "all_datatypes") require.NoError(t, err) require.NotNil(t, meta) require.False(t, meta.View) @@ -68,27 +68,31 @@ func TestGetTable(t *testing.T) { // Verify expected columns and types from the init SQL. // Databricks information_schema uses its own type aliases (e.g. SHORT, LONG, BYTE) // and strips precision/length from scalar types (e.g. DECIMAL instead of DECIMAL(18,6)). - expected := map[string]string{ - "id": "INT", - "boolean_col": "BOOLEAN", - "tinyint_col": "BYTE", - "smallint_col": "SHORT", - "int32_col": "INT", - "int64_col": "LONG", - "float_col": "FLOAT", - "double_col": "DOUBLE", - "decimal_col": "DECIMAL", - "string_col": "STRING", - "varchar_col": "STRING", - "date_col": "DATE", - "timestamp_col": "TIMESTAMP", - "timestamp_ntz_col": "TIMESTAMP_NTZ", - "binary_col": "BINARY", - "array_col": "ARRAY", - "map_col": "MAP", - "struct_col": "STRUCT", + expected := []struct { + Name string + Type string + }{ + {Name: "id", Type: "INT"}, + {Name: "boolean_col", Type: "BOOLEAN"}, + {Name: "tinyint_col", Type: "BYTE"}, + {Name: "smallint_col", Type: "SHORT"}, + {Name: "int32_col", Type: "INT"}, + {Name: "int64_col", Type: "LONG"}, + {Name: "float_col", Type: "FLOAT"}, + {Name: "double_col", Type: "DOUBLE"}, + {Name: "decimal_col", Type: "DECIMAL"}, + {Name: "string_col", Type: "STRING"}, + {Name: "tinyint_col", Type: "BYTE"}, + {Name: "varchar_col", Type: "STRING"}, + {Name: "date_col", Type: "DATE"}, + {Name: "timestamp_col", Type: "TIMESTAMP"}, + {Name: "timestamp_ntz_col", Type: "TIMESTAMP_NTZ"}, + {Name: "binary_col", Type: "BINARY"}, + {Name: "array_col", Type: "ARRAY"}, + {Name: "map_col", Type: "MAP"}, + {Name: "struct_col", Type: "STRUCT"}, } for col, typ := range expected { - require.Equal(t, typ, meta.Schema[col], "unexpected type for column %q", col) + require.Equal(t, typ, meta.Schema.Fields[col].Type.Code, "unexpected type for column %q", col) } } diff --git a/runtime/drivers/druid/information_schema.go b/runtime/drivers/druid/information_schema.go index a83382555aa8..f22223d69dff 100644 --- a/runtime/drivers/druid/information_schema.go +++ b/runtime/drivers/druid/information_schema.go @@ -3,7 +3,6 @@ package druid import ( "context" "fmt" - "strings" "github.com/jmoiron/sqlx" runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" @@ -79,7 +78,7 @@ func (c *connection) ListTables(ctx context.Context, database, databaseSchema st return res, next, nil } -func (c *connection) GetTable(ctx context.Context, database, databaseSchema, table string) (*drivers.TableMetadata, error) { +func (c *connection) Lookup(ctx context.Context, _, _, name string) (*drivers.OlapTable, error) { // Ensure Coordinator is ready. // The issues is that the request // SELECT ... @@ -91,53 +90,42 @@ func (c *connection) GetTable(ctx context.Context, database, databaseSchema, tab // if SQL tries to retrieve the dynamic schema and the will be no error from Druid Router // (because if the dynamic schema is empty - it's considered OK by the Druid cluster). q := "SELECT * FROM sys.segments LIMIT 1" - rows, err := c.db.QueryxContext(ctx, q) + rows, err := c.db.QueryxContext(ctx, q, name) if err != nil { return nil, err } rows.Close() q = ` - SELECT - T.TABLE_TYPE = 'VIEW' AS view, - C.COLUMN_NAME, - C.DATA_TYPE, - C.IS_NULLABLE = 'YES' AS is_nullable - FROM INFORMATION_SCHEMA.TABLES T - LEFT JOIN INFORMATION_SCHEMA.COLUMNS C - ON T.TABLE_SCHEMA = C.TABLE_SCHEMA AND T.TABLE_NAME = C.TABLE_NAME - WHERE T.TABLE_SCHEMA = ? AND T.TABLE_NAME = ? - ORDER BY C.ORDINAL_POSITION - ` - rows, err = c.db.QueryxContext(ctx, q, databaseSchema, table) + SELECT + T.TABLE_SCHEMA AS SCHEMA, + T.TABLE_NAME AS NAME, + T.TABLE_TYPE AS TABLE_TYPE, + C.COLUMN_NAME AS COLUMN_NAME, + C.DATA_TYPE AS COLUMN_TYPE, + C.IS_NULLABLE = 'YES' AS IS_NULLABLE + FROM INFORMATION_SCHEMA.TABLES T + JOIN INFORMATION_SCHEMA.COLUMNS C ON T.TABLE_SCHEMA = C.TABLE_SCHEMA AND T.TABLE_NAME = C.TABLE_NAME + WHERE T.TABLE_SCHEMA = 'druid' AND T.TABLE_NAME = ? + ORDER BY SCHEMA, NAME, TABLE_TYPE, C.ORDINAL_POSITION + ` + + rows, err = c.db.QueryxContext(ctx, q, name) if err != nil { return nil, err } defer rows.Close() - schemaMap := make(map[string]string) - var view, nullable bool - var colName, dataType string - for rows.Next() { - if err := rows.Scan(&view, &colName, &dataType, &nullable); err != nil { - return nil, err - } - pbType := databaseTypeToPB(dataType, nullable) - if pbType.Code == runtimev1.Type_CODE_UNSPECIFIED { - schemaMap[colName] = fmt.Sprintf("UNKNOWN(%s)", dataType) - } else { - schemaMap[colName] = strings.TrimPrefix(pbType.Code.String(), "CODE_") - } + tables, err := scanTables(rows) + if err != nil { + return nil, err } - if err := rows.Err(); err != nil { - return nil, err + if len(tables) == 0 { + return nil, drivers.ErrNotFound } - return &drivers.TableMetadata{ - Schema: schemaMap, - View: view, - }, nil + return tables[0], nil } func (c *connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { @@ -202,56 +190,6 @@ func (c *connection) All(ctx context.Context, like string, pageSize uint32, page return tables, next, nil } -func (c *connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { - // Ensure Coordinator is ready. - // The issues is that the request - // SELECT ... - // FROM INFORMATION_SCHEMA.TABLES T - // JOIN INFORMATION_SCHEMA.COLUMNS C ON T.TABLE_SCHEMA = C.TABLE_SCHEMA AND T.TABLE_NAME = C.TABLE_NAME - // WHERE T.TABLE_SCHEMA = 'druid' AND T.TABLE_NAME = ? - // ORDER BY SCHEMA, NAME, TABLE_TYPE, C.ORDINAL_POSITION - // returns false-negative if the Coordinator is being restarted. Retrier is a more abstract component and it doesn't check - // if SQL tries to retrieve the dynamic schema and the will be no error from Druid Router - // (because if the dynamic schema is empty - it's considered OK by the Druid cluster). - q := "SELECT * FROM sys.segments LIMIT 1" - rows, err := c.db.QueryxContext(ctx, q, name) - if err != nil { - return nil, err - } - rows.Close() - - q = ` - SELECT - T.TABLE_SCHEMA AS SCHEMA, - T.TABLE_NAME AS NAME, - T.TABLE_TYPE AS TABLE_TYPE, - C.COLUMN_NAME AS COLUMN_NAME, - C.DATA_TYPE AS COLUMN_TYPE, - C.IS_NULLABLE = 'YES' AS IS_NULLABLE - FROM INFORMATION_SCHEMA.TABLES T - JOIN INFORMATION_SCHEMA.COLUMNS C ON T.TABLE_SCHEMA = C.TABLE_SCHEMA AND T.TABLE_NAME = C.TABLE_NAME - WHERE T.TABLE_SCHEMA = 'druid' AND T.TABLE_NAME = ? - ORDER BY SCHEMA, NAME, TABLE_TYPE, C.ORDINAL_POSITION - ` - - rows, err = c.db.QueryxContext(ctx, q, name) - if err != nil { - return nil, err - } - defer rows.Close() - - tables, err := scanTables(rows) - if err != nil { - return nil, err - } - - if len(tables) == 0 { - return nil, drivers.ErrNotFound - } - - return tables[0], nil -} - func (c *connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { return nil // Not implemented } diff --git a/runtime/drivers/druid/information_schema_test.go b/runtime/drivers/druid/information_schema_test.go index be651f8f7a87..169903295309 100644 --- a/runtime/drivers/druid/information_schema_test.go +++ b/runtime/drivers/druid/information_schema_test.go @@ -33,12 +33,10 @@ func TestInformationSchema(t *testing.T) { t.Run("testInformationSchemaAllLike", func(t *testing.T) { testInformationSchemaAllLike(t, olap, expectedTables) }) t.Run("testInformationSchemaAllPagination", func(t *testing.T) { testInformationSchemaAllPagination(t, olap, expectedTables) }) t.Run("testInformationSchemaAllPaginationWithLike", func(t *testing.T) { testInformationSchemaAllPaginationWithLike(t, olap, expectedTables) }) - t.Run("testInformationSchemaLookup", func(t *testing.T) { testInformationSchemaLookup(t, olap, expectedTables) }) t.Run("testInformationSchemaListDatabaseSchemas", func(t *testing.T) { testInformationSchemaListDatabaseSchemas(t, infoSchema, expectedTables) }) t.Run("testInformationSchemaListTables", func(t *testing.T) { testInformationSchemaListTables(t, infoSchema, expectedTables) }) - t.Run("testInformationSchemaGetTable", func(t *testing.T) { testInformationSchemaGetTable(t, infoSchema, expectedTables) }) t.Run("testInformationSchemaListTablesPagination", func(t *testing.T) { testInformationSchemaListTablesPagination(t, infoSchema, expectedTables) }) - + t.Run("testInformationSchemaLookup", func(t *testing.T) { testInformationSchemaLookup(t, olap, expectedTables) }) } type expectedTable struct { @@ -169,24 +167,6 @@ func testInformationSchemaAllPaginationWithLike(t *testing.T, olap drivers.OLAPS } } -func testInformationSchemaLookup(t *testing.T, olap drivers.OLAPStore, expected []expectedTable) { - ctx := context.Background() - - require.GreaterOrEqual(t, len(expected), 1, "expected one table for schema lookup test") - testTable := expected[0].Name - testSchema := expected[0].Schema - - // Lookup the table - table, err := olap.InformationSchema().Lookup(ctx, testSchema, "", testTable) - require.NoError(t, err) - require.Equal(t, testTable, table.Name) - require.Equal(t, testSchema, table.DatabaseSchema) - - // Lookup a table that does not exist - _, err = olap.InformationSchema().Lookup(ctx, "", "", "nonexistent_table") - require.Equal(t, drivers.ErrNotFound, err) -} - func testInformationSchemaListDatabaseSchemas(t *testing.T, infoSchema drivers.InformationSchema, expected []expectedTable) { ctx := context.Background() @@ -214,21 +194,6 @@ func testInformationSchemaListTables(t *testing.T, infoSchema drivers.Informatio } } -func testInformationSchemaGetTable(t *testing.T, infoSchema drivers.InformationSchema, expected []expectedTable) { - ctx := context.Background() - - require.GreaterOrEqual(t, len(expected), 1, "expected one table for schema get table test") - testTable := expected[0].Name - - // Lookup the table - table, err := infoSchema.GetTable(ctx, "", "druid", testTable) - require.NoError(t, err) - require.Greater(t, len(table.Schema), 1) - - table, err = infoSchema.GetTable(ctx, "", "druid", "nonexistent_table") - require.Equal(t, 0, len(table.Schema)) -} - func testInformationSchemaListTablesPagination(t *testing.T, infoSchema drivers.InformationSchema, expected []expectedTable) { ctx := context.Background() pageSize := 2 @@ -256,3 +221,21 @@ func testInformationSchemaListTablesPagination(t *testing.T, infoSchema drivers. require.Equal(t, tbl.Name, resultTables[i]) } } + +func testInformationSchemaLookup(t *testing.T, olap drivers.OLAPStore, expected []expectedTable) { + ctx := context.Background() + + require.GreaterOrEqual(t, len(expected), 1, "expected one table for schema lookup test") + testTable := expected[0].Name + testSchema := expected[0].Schema + + // Lookup the table + table, err := olap.InformationSchema().Lookup(ctx, testSchema, "", testTable) + require.NoError(t, err) + require.Equal(t, testTable, table.Name) + require.Equal(t, testSchema, table.DatabaseSchema) + + // Lookup a table that does not exist + _, err = olap.InformationSchema().Lookup(ctx, "", "", "nonexistent_table") + require.Equal(t, drivers.ErrNotFound, err) +} diff --git a/runtime/drivers/duckdb/information_schema.go b/runtime/drivers/duckdb/information_schema.go index f2f107bf57ee..e8936b068b43 100644 --- a/runtime/drivers/duckdb/information_schema.go +++ b/runtime/drivers/duckdb/information_schema.go @@ -130,96 +130,29 @@ func (c *connection) ListTables(ctx context.Context, database, databaseSchema st return res, next, nil } -func (c *connection) GetTable(ctx context.Context, database, databaseSchema, table string) (*drivers.TableMetadata, error) { - conn, release, err := c.acquireMetaConn(ctx) +func (c *connection) Lookup(ctx context.Context, _, _, name string) (*drivers.OlapTable, error) { + // TODO: this bypasses the acquireMetaConn call in the original implementation. Fix this. + db, release, err := c.acquireDB() if err != nil { return nil, err } defer func() { _ = release() }() - var args []any - var q string - if c.config.Path != "" || c.config.Attach != "" { - // for generic duckdb implementation, we use the current schema from the connection. - q = ` - SELECT - t.table_type = 'VIEW' AS view, - c.column_name, - c.data_type - FROM information_schema.tables t - LEFT JOIN information_schema.columns c - ON t.table_schema = c.table_schema AND t.table_name = c.table_name - WHERE t.table_catalog = ? AND t.table_schema = ? AND t.table_name = ? - ORDER BY c.ordinal_position - ` - args = []any{database, databaseSchema, table} - } else { - // for rduckdb implementation, we use the current database and schema from the connection. - // due to external table storage the information_schema always returns table type as view - // so we look at attached tables to determine if it is a view or table - q = ` - WITH attached AS ( - SELECT - DISTINCT regexp_extract(database_name, '^(.*?)__\d+__db$', 1) AS attached_table - FROM duckdb_databases() - WHERE regexp_matches(database_name, '^.+__\d+__db$') - ) - SELECT - CASE - WHEN a.attached_table IS NOT NULL THEN FALSE - ELSE t.table_type = 'VIEW' - END AS view, - c.column_name, - c.data_type - FROM information_schema.tables t - LEFT JOIN information_schema.columns c - ON t.table_schema = c.table_schema AND t.table_name = c.table_name - LEFT JOIN attached a - ON t.table_name = a.attached_table - WHERE t.table_catalog = current_database() AND t.table_schema = current_schema() AND t.table_name = ? - ORDER BY c.ordinal_position; - ` - args = []any{table} - } - rows, err := conn.QueryxContext(ctx, q, args...) + rows, _, err := db.Schema(ctx, "", name, 0, "") if err != nil { return nil, c.checkErr(err) } - defer rows.Close() - schemaMap := make(map[string]string) - var view bool - var colName, dataType string - for rows.Next() { - if err := rows.Scan(&view, &colName, &dataType); err != nil { - return nil, err - } - // For views that depend on secrets, we have an inaccessible schema since - // the secret is only set at write time. - if strings.HasPrefix(colName, "error(") && dataType == "\"NULL\"" { - return nil, fmt.Errorf("failed to get schema (try setting `materialize: true` — this usually happens for non-materialized views): %s", colName) - } - if pbType, err := databaseTypeToPB(dataType, false); err != nil { - if errors.Is(err, errUnsupportedType) { - schemaMap[colName] = fmt.Sprintf("UNKNOWN(%s)", dataType) - } else { - return nil, err - } - } else if pbType.Code == runtimev1.Type_CODE_UNSPECIFIED { - schemaMap[colName] = fmt.Sprintf("UNKNOWN(%s)", dataType) - } else { - schemaMap[colName] = strings.TrimPrefix(pbType.Code.String(), "CODE_") - } + tables, err := scanTables(rows) + if err != nil { + return nil, err } - if err := rows.Err(); err != nil { - return nil, err + if len(tables) == 0 { + return nil, drivers.ErrNotFound } - return &drivers.TableMetadata{ - Schema: schemaMap, - View: view, - }, nil + return tables[0], nil } func (c *connection) All(ctx context.Context, ilike string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { @@ -243,31 +176,6 @@ func (c *connection) All(ctx context.Context, ilike string, pageSize uint32, pag return tables, nextToken, nil } -func (c *connection) Lookup(ctx context.Context, _, _, name string) (*drivers.OlapTable, error) { - // TODO: this bypasses the acquireMetaConn call in the original implementation. Fix this. - db, release, err := c.acquireDB() - if err != nil { - return nil, err - } - defer func() { _ = release() }() - - rows, _, err := db.Schema(ctx, "", name, 0, "") - if err != nil { - return nil, c.checkErr(err) - } - - tables, err := scanTables(rows) - if err != nil { - return nil, err - } - - if len(tables) == 0 { - return nil, drivers.ErrNotFound - } - - return tables[0], nil -} - func (c *connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { // already populated in All and Lookup calls return nil diff --git a/runtime/drivers/duckdb/information_schema_test.go b/runtime/drivers/duckdb/information_schema_test.go index ff1747758db6..a4684f9cd441 100644 --- a/runtime/drivers/duckdb/information_schema_test.go +++ b/runtime/drivers/duckdb/information_schema_test.go @@ -34,13 +34,12 @@ func TestInformationSchema(t *testing.T) { databaseSchema := "main" t.Run("testInformationSchemaAll", func(t *testing.T) { testInformationSchemaAll(t, olap) }) t.Run("testInformationSchemaAllLike", func(t *testing.T) { testInformationSchemaAllLike(t, olap) }) - t.Run("testInformationSchemaLookup", func(t *testing.T) { testInformationSchemaLookup(t, olap) }) t.Run("testInformationSchemaAllPagination", func(t *testing.T) { testInformationSchemaAllPagination(t, olap) }) t.Run("testInformationSchemaAllPaginationWithLike", func(t *testing.T) { testInformationSchemaAllPaginationWithLike(t, olap) }) t.Run("testInformationSchemaListDatabaseSchemas", func(t *testing.T) { testInformationSchemaListDatabaseSchemas(t, infoSchema, database, databaseSchema) }) t.Run("testInformationSchemaListTables", func(t *testing.T) { testInformationSchemaListTables(t, infoSchema, database, databaseSchema) }) - t.Run("testInformationSchemaGetTable", func(t *testing.T) { testInformationSchemaGetTable(t, infoSchema, database, databaseSchema) }) t.Run("testInformationSchemaListTablesPagination", func(t *testing.T) { testInformationSchemaListTablesPagination(t, infoSchema, database, databaseSchema) }) + t.Run("testInformationSchemaLookup", func(t *testing.T) { testInformationSchemaLookup(t, olap) }) t.Run("testLoadDDL", func(t *testing.T) { testLoadDDL(t, olap) }) } @@ -75,13 +74,12 @@ schema_name: integration_test databaseSchema := "integration_test" t.Run("testInformationSchemaAll", func(t *testing.T) { testInformationSchemaAll(t, olap) }) t.Run("testInformationSchemaAllLike", func(t *testing.T) { testInformationSchemaAllLike(t, olap) }) - t.Run("testInformationSchemaLookup", func(t *testing.T) { testInformationSchemaLookup(t, olap) }) t.Run("testInformationSchemaAllPagination", func(t *testing.T) { testInformationSchemaAllPagination(t, olap) }) t.Run("testInformationSchemaAllPaginationWithLike", func(t *testing.T) { testInformationSchemaAllPaginationWithLike(t, olap) }) t.Run("testInformationSchemaListDatabaseSchemas", func(t *testing.T) { testInformationSchemaListDatabaseSchemas(t, infoSchema, database, databaseSchema) }) t.Run("testInformationSchemaListTables", func(t *testing.T) { testInformationSchemaListTables(t, infoSchema, database, databaseSchema) }) - t.Run("testInformationSchemaGetTable", func(t *testing.T) { testInformationSchemaGetTable(t, infoSchema, database, databaseSchema) }) t.Run("testInformationSchemaListTablesPagination", func(t *testing.T) { testInformationSchemaListTablesPagination(t, infoSchema, database, databaseSchema) }) + t.Run("testInformationSchemaLookup", func(t *testing.T) { testInformationSchemaLookup(t, olap) }) } func testInformationSchemaAll(t *testing.T, olap drivers.OLAPStore) { @@ -124,26 +122,6 @@ func testInformationSchemaAllLike(t *testing.T, olap drivers.OLAPStore) { require.Equal(t, 0, len(tables)) } -func testInformationSchemaLookup(t *testing.T, olap drivers.OLAPStore) { - ctx := context.Background() - bar, err := olap.InformationSchema().Lookup(ctx, "", "", "bar") - require.NoError(t, err) - require.Equal(t, "bar", bar.Name) - require.Equal(t, 2, len(bar.Schema.Fields)) - require.Equal(t, "bar", bar.Schema.Fields[0].Name) - require.Equal(t, runtimev1.Type_CODE_STRING, bar.Schema.Fields[0].Type.Code) - require.Equal(t, "baz", bar.Schema.Fields[1].Name) - require.Equal(t, runtimev1.Type_CODE_INT32, bar.Schema.Fields[1].Type.Code) - require.Equal(t, false, bar.View) - - _, err = olap.InformationSchema().Lookup(ctx, "", "", "nonexistent_table") - require.Equal(t, drivers.ErrNotFound, err) - - table, err := olap.InformationSchema().Lookup(ctx, "", "", "model") - require.NoError(t, err) - require.Equal(t, "model", table.Name) -} - func testInformationSchemaAllPagination(t *testing.T, olap drivers.OLAPStore) { ctx := context.Background() pageSize := 2 @@ -233,23 +211,6 @@ func testInformationSchemaListTables(t *testing.T, infoSchema drivers.Informatio require.Equal(t, true, model.View) } -func testInformationSchemaGetTable(t *testing.T, infoSchema drivers.InformationSchema, database, databaseSchema string) { - ctx := context.Background() - bar, err := infoSchema.GetTable(ctx, database, databaseSchema, "bar") - require.NoError(t, err) - require.Equal(t, 2, len(bar.Schema)) - require.Equal(t, "STRING", bar.Schema["bar"]) - require.Equal(t, "INT32", bar.Schema["baz"]) - require.Equal(t, false, bar.View) - - noTable, err := infoSchema.GetTable(ctx, database, databaseSchema, "nonexistent_table") - require.Equal(t, 0, len(noTable.Schema)) - - table, err := infoSchema.GetTable(ctx, database, databaseSchema, "model") - require.NoError(t, err) - require.Equal(t, true, table.View) -} - func testInformationSchemaListTablesPagination(t *testing.T, infoSchema drivers.InformationSchema, database, databaseSchema string) { ctx := context.Background() pageSize := 2 @@ -304,3 +265,23 @@ func testLoadDDL(t *testing.T, olap drivers.OLAPStore) { require.Contains(t, view.DDL, "CREATE VIEW") require.Contains(t, view.DDL, "model") } + +func testInformationSchemaLookup(t *testing.T, olap drivers.OLAPStore) { + ctx := context.Background() + bar, err := olap.InformationSchema().Lookup(ctx, "", "", "bar") + require.NoError(t, err) + require.Equal(t, "bar", bar.Name) + require.Equal(t, 2, len(bar.Schema.Fields)) + require.Equal(t, "bar", bar.Schema.Fields[0].Name) + require.Equal(t, runtimev1.Type_CODE_STRING, bar.Schema.Fields[0].Type.Code) + require.Equal(t, "baz", bar.Schema.Fields[1].Name) + require.Equal(t, runtimev1.Type_CODE_INT32, bar.Schema.Fields[1].Type.Code) + require.Equal(t, false, bar.View) + + _, err = olap.InformationSchema().Lookup(ctx, "", "", "nonexistent_table") + require.Equal(t, drivers.ErrNotFound, err) + + table, err := olap.InformationSchema().Lookup(ctx, "", "", "model") + require.NoError(t, err) + require.Equal(t, "model", table.Name) +} diff --git a/runtime/drivers/information_schema.go b/runtime/drivers/information_schema.go index 03cc70a531d5..dd2f334372aa 100644 --- a/runtime/drivers/information_schema.go +++ b/runtime/drivers/information_schema.go @@ -11,20 +11,18 @@ type InformationSchema interface { // All returns metadata about all tables and views. // The like argument can optionally be passed to filter the tables by name. All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*OlapTable, string, error) + // ListDatabaseSchemas returns all schemas across databases + ListDatabaseSchemas(ctx context.Context, pageSize uint32, pageToken string) ([]*DatabaseSchemaInfo, string, error) + // ListTables returns all tables in a schema. + ListTables(ctx context.Context, database, databaseSchema string, pageSize uint32, pageToken string) ([]*TableInfo, string, error) // Lookup returns metadata about a specific tables and views. - Lookup(ctx context.Context, db, schema, name string) (*OlapTable, error) + Lookup(ctx context.Context, database, databaseSchema, name string) (*OlapTable, error) // LoadPhysicalSize populates the PhysicalSizeBytes field of table metadata. // It should be called aft`er All or Lookup and not on manually created tables. LoadPhysicalSize(ctx context.Context, tables []*OlapTable) error // LoadDDL populates the DDL field of a single table's metadata. // Drivers that don't support DDL retrieval should return nil (leaving DDL empty). LoadDDL(ctx context.Context, table *OlapTable) error - // ListDatabaseSchemas returns all schemas across databases - ListDatabaseSchemas(ctx context.Context, pageSize uint32, pageToken string) ([]*DatabaseSchemaInfo, string, error) - // ListTables returns all tables in a schema. - ListTables(ctx context.Context, database, databaseSchema string, pageSize uint32, pageToken string) ([]*TableInfo, string, error) - // GetTable returns metadata about a specific table. - GetTable(ctx context.Context, database, databaseSchema, table string) (*TableMetadata, error) } // OlapTable represents a table in an information schema. diff --git a/runtime/drivers/mysql/information_schema.go b/runtime/drivers/mysql/information_schema.go index c9f631837b83..f95f952a8253 100644 --- a/runtime/drivers/mysql/information_schema.go +++ b/runtime/drivers/mysql/information_schema.go @@ -129,7 +129,7 @@ func (c *connection) ListTables(ctx context.Context, database, databaseSchema st return res, next, nil } -func (c *connection) GetTable(ctx context.Context, database, databaseSchema, table string) (*drivers.TableMetadata, error) { +func (c *connection) Lookup(ctx context.Context, database, databaseSchema, name string) (*drivers.OlapTable, error) { q := ` SELECT CASE WHEN t.table_type = 'VIEW' THEN true ELSE false END AS view, @@ -147,28 +147,35 @@ func (c *connection) GetTable(ctx context.Context, database, databaseSchema, tab return nil, err } - rows, err := db.QueryxContext(ctx, q, databaseSchema, table) + rows, err := db.QueryxContext(ctx, q, databaseSchema, name) if err != nil { return nil, err } defer rows.Close() - res := &drivers.TableMetadata{ - Schema: make(map[string]string), - } + var view bool + var col, typ string + fields := make([]*runtimev1.StructType_Field, 0) for rows.Next() { - var colName, dataType string - if err := rows.Scan(&res.View, &colName, &dataType); err != nil { + if err = rows.Scan(&view, &col, &typ); err != nil { return nil, err } - res.Schema[colName] = dataType - } - - if err := rows.Err(); err != nil { - return nil, err + fields = append(fields, &runtimev1.StructType_Field{ + Name: col, + Type: databaseTypeToPB(typ, true), + }) } - - return res, nil + return &drivers.OlapTable{ + Database: database, + DatabaseSchema: databaseSchema, + Name: name, + View: view, + Schema: &runtimev1.StructType{ + Fields: fields, + }, + UnsupportedCols: nil, + PhysicalSizeBytes: 0, + }, rows.Err() } // All implements drivers.OLAPInformationSchema. @@ -214,28 +221,3 @@ func (c *connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) erro } return rows.Err() } - -// Lookup implements drivers.OLAPInformationSchema. -func (c *connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { - meta, err := c.GetTable(ctx, db, schema, name) - if err != nil { - return nil, err - } - - rtSchema := &runtimev1.StructType{} - for name, typ := range meta.Schema { - rtSchema.Fields = append(rtSchema.Fields, &runtimev1.StructType_Field{ - Name: name, - Type: databaseTypeToPB(typ, true), - }) - } - return &drivers.OlapTable{ - Database: db, - DatabaseSchema: schema, - Name: name, - View: meta.View, - Schema: rtSchema, - UnsupportedCols: nil, - PhysicalSizeBytes: 0, - }, nil -} diff --git a/runtime/drivers/pinot/information_schema.go b/runtime/drivers/pinot/information_schema.go index 77306bf161dc..be77fb9b2fcb 100644 --- a/runtime/drivers/pinot/information_schema.go +++ b/runtime/drivers/pinot/information_schema.go @@ -127,7 +127,7 @@ func (c *connection) ListTables(ctx context.Context, database, databaseSchema st return result, next, nil } -func (c *connection) GetTable(ctx context.Context, database, databaseSchema, name string) (*drivers.TableMetadata, error) { +func (c *connection) Lookup(ctx context.Context, database, databaseSchema, name string) (*drivers.OlapTable, error) { req, _ := http.NewRequestWithContext(ctx, http.MethodGet, c.schemaURL+"/tables/"+name+"/schema", http.NoBody) for k, v := range c.headers { req.Header.Set(k, v) @@ -150,47 +150,51 @@ func (c *connection) GetTable(ctx context.Context, database, databaseSchema, nam return nil, err } - schema := make(map[string]string) + unsupportedCols := make(map[string]string) + var schemaFields []*runtimev1.StructType_Field for _, field := range schemaResponse.DateTimeFieldSpecs { if field.DataType != "TIMESTAMP" && field.DataType != "LONG" { - schema[field.Name] = fmt.Sprintf("UNKNOWN(%s)", field.DataType+"_DATE_TIME") + unsupportedCols[field.Name] = field.DataType + "_(DATE_TIME_FIELD)" continue } - pbType := databaseTypeToPB(field.DataType, !field.NotNull, true) - if pbType.Code == runtimev1.Type_CODE_UNSPECIFIED { - schema[field.Name] = fmt.Sprintf("UNKNOWN(%s)", field.DataType) - } else { - schema[field.Name] = strings.TrimPrefix(pbType.Code.String(), "CODE_") - } + schemaFields = append(schemaFields, &runtimev1.StructType_Field{Name: field.Name, Type: databaseTypeToPB(field.DataType, !field.NotNull, true)}) } for _, field := range schemaResponse.DimensionFieldSpecs { - // Skip fields where SingleValueField is false (i.e. arrays). - if field.SingleValueField != nil && !*field.SingleValueField { - schema[field.Name] = fmt.Sprintf("UNKNOWN(%s)", field.DataType+"_ARRAY") - continue + singleValueField := true + if field.SingleValueField != nil { + singleValueField = *field.SingleValueField } - pbType := databaseTypeToPB(field.DataType, !field.NotNull, true) - if pbType.Code == runtimev1.Type_CODE_UNSPECIFIED { - schema[field.Name] = fmt.Sprintf("UNKNOWN(%s)", field.DataType) - } else { - schema[field.Name] = strings.TrimPrefix(pbType.Code.String(), "CODE_") + if !singleValueField { + // Skip array fields for now + unsupportedCols[field.Name] = field.DataType + "_ARRAY" + continue } + schemaFields = append(schemaFields, &runtimev1.StructType_Field{Name: field.Name, Type: databaseTypeToPB(field.DataType, !field.NotNull, singleValueField)}) } for _, field := range schemaResponse.MetricFieldSpecs { - // Skip fields where SingleValueField is false (i.e. arrays). - if field.SingleValueField != nil && !*field.SingleValueField { - schema[field.Name] = fmt.Sprintf("UNKNOWN(%s)", field.DataType+"_ARRAY") - continue + singleValueField := true + if field.SingleValueField != nil { + singleValueField = *field.SingleValueField } - pbType := databaseTypeToPB(field.DataType, !field.NotNull, true) - if pbType.Code == runtimev1.Type_CODE_UNSPECIFIED { - schema[field.Name] = fmt.Sprintf("UNKNOWN(%s)", field.DataType) - } else { - schema[field.Name] = strings.TrimPrefix(pbType.Code.String(), "CODE_") + if !singleValueField { + // Skip array fields for now + unsupportedCols[field.Name] = field.DataType + "_ARRAY" + continue } + schemaFields = append(schemaFields, &runtimev1.StructType_Field{Name: field.Name, Type: databaseTypeToPB(field.DataType, !field.NotNull, singleValueField)}) + } + + table := &drivers.OlapTable{ + Database: "", + DatabaseSchema: databaseSchema, + Name: name, + View: false, + Schema: &runtimev1.StructType{Fields: schemaFields}, + UnsupportedCols: unsupportedCols, + PhysicalSizeBytes: -1, } - return &drivers.TableMetadata{Schema: schema, View: false}, nil + return table, nil } func (c *connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { @@ -291,75 +295,6 @@ func (c *connection) All(ctx context.Context, like string, pageSize uint32, page return tables, next, nil } -func (c *connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { - req, _ := http.NewRequestWithContext(ctx, http.MethodGet, c.schemaURL+"/tables/"+name+"/schema", http.NoBody) - for k, v := range c.headers { - req.Header.Set(k, v) - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) - } - - var schemaResponse pinotSchema - err = json.NewDecoder(resp.Body).Decode(&schemaResponse) - if err != nil { - return nil, err - } - - unsupportedCols := make(map[string]string) - var schemaFields []*runtimev1.StructType_Field - for _, field := range schemaResponse.DateTimeFieldSpecs { - if field.DataType != "TIMESTAMP" && field.DataType != "LONG" { - unsupportedCols[field.Name] = field.DataType + "_(DATE_TIME_FIELD)" - continue - } - schemaFields = append(schemaFields, &runtimev1.StructType_Field{Name: field.Name, Type: databaseTypeToPB(field.DataType, !field.NotNull, true)}) - } - for _, field := range schemaResponse.DimensionFieldSpecs { - singleValueField := true - if field.SingleValueField != nil { - singleValueField = *field.SingleValueField - } - if !singleValueField { - // Skip array fields for now - unsupportedCols[field.Name] = field.DataType + "_ARRAY" - continue - } - schemaFields = append(schemaFields, &runtimev1.StructType_Field{Name: field.Name, Type: databaseTypeToPB(field.DataType, !field.NotNull, singleValueField)}) - } - for _, field := range schemaResponse.MetricFieldSpecs { - singleValueField := true - if field.SingleValueField != nil { - singleValueField = *field.SingleValueField - } - if !singleValueField { - // Skip array fields for now - unsupportedCols[field.Name] = field.DataType + "_ARRAY" - continue - } - schemaFields = append(schemaFields, &runtimev1.StructType_Field{Name: field.Name, Type: databaseTypeToPB(field.DataType, !field.NotNull, singleValueField)}) - } - - table := &drivers.OlapTable{ - Database: "", - DatabaseSchema: "", - Name: name, - View: false, - Schema: &runtimev1.StructType{Fields: schemaFields}, - UnsupportedCols: unsupportedCols, - PhysicalSizeBytes: -1, - } - - return table, nil -} - // LoadDDL implements drivers.OLAPInformationSchema. func (c *connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { return nil // Not implemented diff --git a/runtime/drivers/pinot/information_schema_test.go b/runtime/drivers/pinot/information_schema_test.go index 9d39431e979d..bb4309a372a3 100644 --- a/runtime/drivers/pinot/information_schema_test.go +++ b/runtime/drivers/pinot/information_schema_test.go @@ -29,14 +29,12 @@ func TestInformationSchema(t *testing.T) { t.Run("testInformationSchemaAll", func(t *testing.T) { testInformationSchemaAll(t, olap) }) t.Run("testInformationSchemaAllLike", func(t *testing.T) { testInformationSchemaAllLike(t, olap) }) - t.Run("testInformationSchemaLookup", func(t *testing.T) { testInformationSchemaLookup(t, olap) }) t.Run("testInformationSchemaAllPagination", func(t *testing.T) { testInformationSchemaAllPagination(t, olap) }) t.Run("testInformationSchemaAllPaginationWithLike", func(t *testing.T) { testInformationSchemaAllPaginationWithLike(t, olap) }) t.Run("testInformationSchemaListDatabaseSchemas", func(t *testing.T) { testInformationSchemaListDatabaseSchemas(t, infoSchema) }) t.Run("testInformationSchemaListTables", func(t *testing.T) { testInformationSchemaListTables(t, infoSchema) }) - t.Run("testInformationSchemaGetTable", func(t *testing.T) { testInformationSchemaGetTable(t, infoSchema) }) t.Run("testInformationSchemaListTablesPagination", func(t *testing.T) { testInformationSchemaListTablesPagination(t, infoSchema) }) - + t.Run("testInformationSchemaLookup", func(t *testing.T) { testInformationSchemaLookup(t, olap) }) } func testInformationSchemaAll(t *testing.T, olap drivers.OLAPStore) { @@ -72,30 +70,6 @@ func testInformationSchemaAllLike(t *testing.T, olap drivers.OLAPStore) { require.Equal(t, 0, len(tables)) } -func testInformationSchemaLookup(t *testing.T, olap drivers.OLAPStore) { - ctx := context.Background() - starbucksStores, err := olap.InformationSchema().Lookup(ctx, "", "", "starbucksStores") - require.NoError(t, err) - require.Equal(t, "starbucksStores", starbucksStores.Name) - - require.Equal(t, 5, len(starbucksStores.Schema.Fields)) - require.Equal(t, "starbucksStores", starbucksStores.Name) - require.Equal(t, "lon", starbucksStores.Schema.Fields[0].Name) - require.Equal(t, runtimev1.Type_CODE_FLOAT32, starbucksStores.Schema.Fields[0].Type.Code) - require.Equal(t, "lat", starbucksStores.Schema.Fields[1].Name) - require.Equal(t, runtimev1.Type_CODE_FLOAT32, starbucksStores.Schema.Fields[1].Type.Code) - require.Equal(t, "name", starbucksStores.Schema.Fields[2].Name) - require.Equal(t, runtimev1.Type_CODE_STRING, starbucksStores.Schema.Fields[2].Type.Code) - require.Equal(t, "address", starbucksStores.Schema.Fields[3].Name) - require.Equal(t, runtimev1.Type_CODE_STRING, starbucksStores.Schema.Fields[3].Type.Code) - require.Equal(t, "location_st_point", starbucksStores.Schema.Fields[4].Name) - require.Equal(t, runtimev1.Type_CODE_BYTES, starbucksStores.Schema.Fields[4].Type.Code) - require.Equal(t, false, starbucksStores.View) - - _, err = olap.InformationSchema().Lookup(ctx, "", "", "nonexistent_table") - require.ErrorContains(t, err, "unexpected status code: 404") -} - func testInformationSchemaAllPagination(t *testing.T, olap drivers.OLAPStore) { ctx := context.Background() pageSize := 4 @@ -185,23 +159,6 @@ func testInformationSchemaListTables(t *testing.T, infoSchema drivers.Informatio require.Equal(t, "testUnnest", tables[9].Name) } -func testInformationSchemaGetTable(t *testing.T, infoSchema drivers.InformationSchema) { - ctx := context.Background() - starbucksStores, err := infoSchema.GetTable(ctx, "", "default", "starbucksStores") - require.NoError(t, err) - - require.Equal(t, 5, len(starbucksStores.Schema)) - require.Equal(t, "FLOAT32", starbucksStores.Schema["lon"]) - require.Equal(t, "FLOAT32", starbucksStores.Schema["lat"]) - require.Equal(t, "STRING", starbucksStores.Schema["name"]) - require.Equal(t, "STRING", starbucksStores.Schema["address"]) - require.Equal(t, "BYTES", starbucksStores.Schema["location_st_point"]) - require.Equal(t, false, starbucksStores.View) - - _, err = infoSchema.GetTable(ctx, "", "default", "nonexistent_table") - require.ErrorContains(t, err, "unexpected status code: 404") -} - func testInformationSchemaListTablesPagination(t *testing.T, infoSchema drivers.InformationSchema) { ctx := context.Background() pageSize := 4 @@ -236,3 +193,27 @@ func testInformationSchemaListTablesPagination(t *testing.T, infoSchema drivers. require.Equal(t, 10, len(tables)) require.Empty(t, nextToken) } + +func testInformationSchemaLookup(t *testing.T, olap drivers.OLAPStore) { + ctx := context.Background() + starbucksStores, err := olap.InformationSchema().Lookup(ctx, "", "", "starbucksStores") + require.NoError(t, err) + require.Equal(t, "starbucksStores", starbucksStores.Name) + + require.Equal(t, 5, len(starbucksStores.Schema.Fields)) + require.Equal(t, "starbucksStores", starbucksStores.Name) + require.Equal(t, "lon", starbucksStores.Schema.Fields[0].Name) + require.Equal(t, runtimev1.Type_CODE_FLOAT32, starbucksStores.Schema.Fields[0].Type.Code) + require.Equal(t, "lat", starbucksStores.Schema.Fields[1].Name) + require.Equal(t, runtimev1.Type_CODE_FLOAT32, starbucksStores.Schema.Fields[1].Type.Code) + require.Equal(t, "name", starbucksStores.Schema.Fields[2].Name) + require.Equal(t, runtimev1.Type_CODE_STRING, starbucksStores.Schema.Fields[2].Type.Code) + require.Equal(t, "address", starbucksStores.Schema.Fields[3].Name) + require.Equal(t, runtimev1.Type_CODE_STRING, starbucksStores.Schema.Fields[3].Type.Code) + require.Equal(t, "location_st_point", starbucksStores.Schema.Fields[4].Name) + require.Equal(t, runtimev1.Type_CODE_BYTES, starbucksStores.Schema.Fields[4].Type.Code) + require.Equal(t, false, starbucksStores.View) + + _, err = olap.InformationSchema().Lookup(ctx, "", "", "nonexistent_table") + require.ErrorContains(t, err, "unexpected status code: 404") +} diff --git a/runtime/drivers/postgres/information_schema.go b/runtime/drivers/postgres/information_schema.go index a2e0b9ad94dd..fdb4c0dd877c 100644 --- a/runtime/drivers/postgres/information_schema.go +++ b/runtime/drivers/postgres/information_schema.go @@ -129,7 +129,7 @@ func (c *connection) ListTables(ctx context.Context, database, databaseSchema st return res, next, rows.Err() } -func (c *connection) GetTable(ctx context.Context, database, databaseSchema, table string) (*drivers.TableMetadata, error) { +func (c *connection) Lookup(ctx context.Context, database, databaseSchema, name string) (*drivers.OlapTable, error) { q := ` SELECT CASE WHEN lower(t.table_type) = 'view' THEN true ELSE false END AS view, @@ -146,9 +146,9 @@ func (c *connection) GetTable(ctx context.Context, database, databaseSchema, tab var args []any if databaseSchema != "" { - args = append(args, databaseSchema, table) + args = append(args, databaseSchema, name) } else { - args = append(args, nil, table) + args = append(args, nil, name) } rows, err := db.QueryContext(ctx, q, args...) if err != nil { @@ -156,18 +156,28 @@ func (c *connection) GetTable(ctx context.Context, database, databaseSchema, tab } defer rows.Close() - columns := make(map[string]string) - var name, typ string var view bool + var col, typ string + fields := make([]*runtimev1.StructType_Field, 0) for rows.Next() { - if err := rows.Scan(&view, &name, &typ); err != nil { + if err = rows.Scan(&view, &col, &typ); err != nil { return nil, err } - columns[name] = typ + fields = append(fields, &runtimev1.StructType_Field{ + Name: col, + Type: databaseTypeToPB(typ), + }) } - return &drivers.TableMetadata{ - View: view, - Schema: columns, + return &drivers.OlapTable{ + Database: database, + DatabaseSchema: databaseSchema, + Name: name, + View: view, + Schema: &runtimev1.StructType{ + Fields: fields, + }, + UnsupportedCols: nil, + PhysicalSizeBytes: 0, }, rows.Err() } @@ -236,29 +246,3 @@ func (c *connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) erro table.DDL = ddl return nil } - -// Lookup implements drivers.OLAPInformationSchema. -func (c *connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { - meta, err := c.GetTable(ctx, db, schema, name) - if err != nil { - return nil, err - } - - rtSchema := &runtimev1.StructType{} - for name, typ := range meta.Schema { - t := databaseTypeToPB(typ) - rtSchema.Fields = append(rtSchema.Fields, &runtimev1.StructType_Field{ - Name: name, - Type: t, - }) - } - return &drivers.OlapTable{ - Database: db, - DatabaseSchema: schema, - Name: name, - View: meta.View, - Schema: rtSchema, - UnsupportedCols: nil, - PhysicalSizeBytes: 0, - }, nil -} diff --git a/runtime/drivers/redshift/information_schema.go b/runtime/drivers/redshift/information_schema.go index f2c4e29ce274..0d8e18ba9705 100644 --- a/runtime/drivers/redshift/information_schema.go +++ b/runtime/drivers/redshift/information_schema.go @@ -139,16 +139,18 @@ func (c *Connection) ListTables(ctx context.Context, database, databaseSchema st return res, next, nil } -func (c *Connection) GetTable(ctx context.Context, database, databaseSchema, table string) (*drivers.TableMetadata, error) { - // Query to get column name and data type +func (c *Connection) Lookup(ctx context.Context, database, databaseSchema, name string) (*drivers.OlapTable, error) { q := fmt.Sprintf(` -SELECT - column_name, - data_type -FROM svv_all_columns -WHERE database_name = %s AND schema_name = %s AND table_name = %s -ORDER BY ordinal_position; -`, escapeStringValue(database), escapeStringValue(databaseSchema), escapeStringValue(table)) + SELECT + CASE WHEN t.table_type = 'VIEW' THEN true ELSE false END AS view, + c.column_name, + c.data_type + FROM svv_all_tables t + JOIN svv_all_columns c + ON t.database_name = c.database_name and t.schema_name = c.schema_name AND t.table_name = c.table_name + WHERE t.database_name = %s AND t.schema_name = %s AND t.table_name = %s + ORDER BY ordinal_position; + `, escapeStringValue(database), escapeStringValue(databaseSchema), escapeStringValue(name)) client, err := c.getClient(ctx) if err != nil { @@ -167,25 +169,37 @@ ORDER BY ordinal_position; if err != nil { return nil, fmt.Errorf("failed to get query results: %w", err) } - - var column, dataType string - schemaMap := make(map[string]string, len(result.Records)) + var view bool + fields := make([]*runtimev1.StructType_Field, len(result.Records)) for _, record := range result.Records { - colField, ok := record[0].(*types.FieldMemberStringValue) + viewField, ok := record[0].(*types.FieldMemberBooleanValue) + if !ok { + return nil, fmt.Errorf("unexpected type for column_name field") + } + view = viewField.Value + colField, ok := record[1].(*types.FieldMemberStringValue) if !ok { return nil, fmt.Errorf("unexpected type for column_name field") } - typeField, ok := record[1].(*types.FieldMemberStringValue) + typeField, ok := record[0].(*types.FieldMemberStringValue) if !ok { return nil, fmt.Errorf("unexpected type for data_type field") } - column = colField.Value - dataType = typeField.Value - schemaMap[column] = dataType + fields = append(fields, &runtimev1.StructType_Field{ + Name: colField.Value, + Type: redshiftTypeToRuntimeType(typeField.Value), + }) } - - return &drivers.TableMetadata{ - Schema: schemaMap, + return &drivers.OlapTable{ + Database: database, + DatabaseSchema: databaseSchema, + Name: name, + View: view, + Schema: &runtimev1.StructType{ + Fields: fields, + }, + UnsupportedCols: nil, + PhysicalSizeBytes: 0, }, nil } @@ -207,29 +221,3 @@ func (c *Connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.Ola func (c *Connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { return nil // Not implemented } - -// Lookup implements drivers.OLAPInformationSchema. -func (c *Connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { - meta, err := c.GetTable(ctx, db, schema, name) - if err != nil { - return nil, err - } - runtimeSchema := &runtimev1.StructType{ - Fields: make([]*runtimev1.StructType_Field, 0, len(meta.Schema)), - } - for name, typ := range meta.Schema { - runtimeSchema.Fields = append(runtimeSchema.Fields, &runtimev1.StructType_Field{ - Name: name, - Type: redshiftTypeToRuntimeType(typ), - }) - } - return &drivers.OlapTable{ - Database: db, - DatabaseSchema: schema, - Name: name, - View: meta.View, - Schema: runtimeSchema, - UnsupportedCols: nil, - PhysicalSizeBytes: 0, - }, nil -} diff --git a/runtime/drivers/snowflake/information_schema.go b/runtime/drivers/snowflake/information_schema.go index 721d0711513a..f3476316bf88 100644 --- a/runtime/drivers/snowflake/information_schema.go +++ b/runtime/drivers/snowflake/information_schema.go @@ -140,7 +140,7 @@ func (c *connection) ListTables(ctx context.Context, database, databaseSchema st return res, next, nil } -func (c *connection) GetTable(ctx context.Context, database, databaseSchema, table string) (*drivers.TableMetadata, error) { +func (c *connection) Lookup(ctx context.Context, database, databaseSchema, name string) (*drivers.OlapTable, error) { q := fmt.Sprintf(` SELECT CASE WHEN t.table_type = 'VIEW' THEN true ELSE false END as is_view, @@ -158,31 +158,39 @@ func (c *connection) GetTable(ctx context.Context, database, databaseSchema, tab return nil, err } - rows, err := db.QueryxContext(ctx, q, databaseSchema, table) + rows, err := db.QueryxContext(ctx, q, databaseSchema, name) if err != nil { return nil, err } defer rows.Close() - t := &drivers.TableMetadata{ - Schema: make(map[string]string), - } - var ( - colName, colType string - isView bool - ) + var view bool + var col, typ string + fields := make([]*runtimev1.StructType_Field, 0) for rows.Next() { - if err := rows.Scan(&isView, &colName, &colType); err != nil { + if err := rows.Scan(&view, &col, &typ); err != nil { return nil, err } - t.Schema[colName] = colType - t.View = isView - } - if err := rows.Err(); err != nil { - return nil, err + t, err := databaseTypeToPB(typ, 0, true) // add scale and nullability if needed + if err != nil { + return nil, err + } + fields = append(fields, &runtimev1.StructType_Field{ + Name: col, + Type: t, + }) } - - return t, nil + return &drivers.OlapTable{ + Database: database, + DatabaseSchema: databaseSchema, + Name: name, + View: view, + Schema: &runtimev1.StructType{ + Fields: fields, + }, + UnsupportedCols: nil, + PhysicalSizeBytes: 0, + }, rows.Err() } // All implements drivers.OLAPInformationSchema. @@ -220,35 +228,6 @@ func (c *connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) erro return nil } -// Lookup implements drivers.OLAPInformationSchema. -func (c *connection) Lookup(ctx context.Context, db, schema, name string) (*drivers.OlapTable, error) { - meta, err := c.GetTable(ctx, db, schema, name) - if err != nil { - return nil, err - } - - rtSchema := &runtimev1.StructType{} - for name, typ := range meta.Schema { - t, err := databaseTypeToPB(typ, 0, true) // add scale and nullability if needed - if err != nil { - return nil, err - } - rtSchema.Fields = append(rtSchema.Fields, &runtimev1.StructType_Field{ - Name: name, - Type: t, - }) - } - return &drivers.OlapTable{ - Database: db, - DatabaseSchema: schema, - Name: name, - View: meta.View, - Schema: rtSchema, - UnsupportedCols: nil, - PhysicalSizeBytes: 0, - }, nil -} - func getCurrentDatabaseAndSchema(ctx context.Context, db *sql.DB) (string, string, error) { query := "SELECT CURRENT_DATABASE(), CURRENT_SCHEMA()" diff --git a/runtime/drivers/starrocks/information_schema.go b/runtime/drivers/starrocks/information_schema.go index 6e5496089f7b..191135e3ee4b 100644 --- a/runtime/drivers/starrocks/information_schema.go +++ b/runtime/drivers/starrocks/information_schema.go @@ -10,18 +10,82 @@ import ( ) // StarRocks Uses fully qualified names (catalog.information_schema.tables) instead of SET CATALOG/USE -// All returns metadata about all tables and views. -// For StarRocks, we query from the configured catalog's information_schema. -func (c *connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { +// ListDatabaseSchemas returns a list of database schemas in StarRocks. +// StarRocks structure: Catalog -> Database -> Table +// We map: Database = catalog, DatabaseSchema = database +func (c *connection) ListDatabaseSchemas(ctx context.Context, pageSize uint32, pageToken string) ([]*drivers.DatabaseSchemaInfo, string, error) { db := c.db catalog := c.configProp.Catalog - // Build query using fully qualified information_schema path - // Pattern: catalog.information_schema.tables + // Query information_schema.schemata using fully qualified path + q := fmt.Sprintf(` + SELECT schema_name + FROM %s.information_schema.schemata + WHERE schema_name NOT IN ('information_schema', '_statistics_', 'mysql', 'sys') + `, safeSQLName(catalog)) + + args := []any{} + + if pageToken != "" { + q += " AND schema_name > ?" + args = append(args, pageToken) + } + + q += " ORDER BY schema_name" + + if pageSize > 0 { + q += fmt.Sprintf(" LIMIT %d", pageSize+1) + } + + rows, err := db.QueryxContext(ctx, q, args...) + if err != nil { + return nil, "", err + } + defer rows.Close() + + var schemas []*drivers.DatabaseSchemaInfo + for rows.Next() { + var schemaName string + if err := rows.Scan(&schemaName); err != nil { + return nil, "", err + } + + // StarRocks mapping: Database = catalog, DatabaseSchema = database + schemas = append(schemas, &drivers.DatabaseSchemaInfo{ + Database: catalog, // Catalog name (e.g., default_catalog, iceberg_catalog) + DatabaseSchema: schemaName, // Database name (e.g., sales, analytics) + }) + } + + if err := rows.Err(); err != nil { + return nil, "", err + } + + // Handle pagination + var nextToken string + if pageSize > 0 && uint32(len(schemas)) > pageSize { + schemas = schemas[:pageSize] + nextToken = schemas[len(schemas)-1].DatabaseSchema + } + + return schemas, nextToken, nil +} + +// ListTables returns a list of tables in a specific database schema. +// database parameter = catalog, databaseSchema parameter = database +func (c *connection) ListTables(ctx context.Context, database, databaseSchema string, pageSize uint32, pageToken string) ([]*drivers.TableInfo, string, error) { + db := c.db + + // StarRocks mapping: database parameter = catalog + catalog := database + + // StarRocks mapping: databaseSchema parameter = database + dbSchema := databaseSchema + + // Query information_schema.tables using fully qualified path q := fmt.Sprintf(` SELECT - table_schema, table_name, CASE WHEN table_type = 'VIEW' THEN true @@ -29,22 +93,17 @@ func (c *connection) All(ctx context.Context, like string, pageSize uint32, page ELSE false END AS is_view FROM %s.information_schema.tables - WHERE table_schema NOT IN ('information_schema', '_statistics_', 'mysql', 'sys') + WHERE table_schema = ? `, safeSQLName(catalog)) - args := []any{} - - if like != "" { - q += " AND table_name LIKE ?" - args = append(args, like) - } + args := []any{dbSchema} if pageToken != "" { q += " AND table_name > ?" args = append(args, pageToken) } - q += " ORDER BY table_schema, table_name" + q += " ORDER BY table_name" if pageSize > 0 { q += fmt.Sprintf(" LIMIT %d", pageSize+1) @@ -56,19 +115,17 @@ func (c *connection) All(ctx context.Context, like string, pageSize uint32, page } defer rows.Close() - var tables []*drivers.OlapTable + var tables []*drivers.TableInfo for rows.Next() { - var schema, name string + var tableName string var isView bool - if err := rows.Scan(&schema, &name, &isView); err != nil { + if err := rows.Scan(&tableName, &isView); err != nil { return nil, "", err } - tables = append(tables, &drivers.OlapTable{ - Database: catalog, // StarRocks catalog -> Rill database - DatabaseSchema: schema, // StarRocks database -> Rill databaseSchema - Name: name, - View: isView, + tables = append(tables, &drivers.TableInfo{ + Name: tableName, + View: isView, }) } @@ -88,7 +145,7 @@ func (c *connection) All(ctx context.Context, like string, pageSize uint32, page // Lookup returns metadata about a specific table or view. // database parameter = catalog, schema parameter = database in StarRocks terms. -func (c *connection) Lookup(ctx context.Context, database, schema, name string) (*drivers.OlapTable, error) { +func (c *connection) Lookup(ctx context.Context, database, databaseSchema, name string) (*drivers.OlapTable, error) { db := c.db // StarRocks mapping: database parameter = catalog @@ -100,7 +157,7 @@ func (c *connection) Lookup(ctx context.Context, database, schema, name string) // StarRocks mapping: schema parameter = database // If schema is empty, use connector's configured database - dbSchema := schema + dbSchema := databaseSchema if dbSchema == "" { dbSchema = c.configProp.Database } @@ -184,114 +241,18 @@ func (c *connection) Lookup(ctx context.Context, database, schema, name string) }, nil } -// LoadPhysicalSize populates the PhysicalSizeBytes field of table metadata. -// For external catalogs, this may not be available. -func (c *connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { - // StarRocks doesn't easily expose physical size for external tables - // For internal tables, we could query be_tablets but it's complex - // Return without error, leaving PhysicalSizeBytes as 0 - return nil -} - -// LoadDDL implements drivers.OLAPInformationSchema. -func (c *connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { - db := c.db - - catalog := table.Database - if catalog == "" { - catalog = c.configProp.Catalog - } - schema := table.DatabaseSchema - if schema == "" { - schema = c.configProp.Database - } - - q := fmt.Sprintf("SHOW CREATE TABLE %s.%s.%s", safeSQLName(catalog), safeSQLName(schema), safeSQLName(table.Name)) - var name, ddl string - err := db.QueryRowxContext(ctx, q).Scan(&name, &ddl) - if err != nil { - return err - } - table.DDL = ddl - return nil -} - -// ListDatabaseSchemas returns a list of database schemas in StarRocks. -// StarRocks structure: Catalog -> Database -> Table -// We map: Database = catalog, DatabaseSchema = database -func (c *connection) ListDatabaseSchemas(ctx context.Context, pageSize uint32, pageToken string) ([]*drivers.DatabaseSchemaInfo, string, error) { +// All returns metadata about all tables and views. +// For StarRocks, we query from the configured catalog's information_schema. +func (c *connection) All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*drivers.OlapTable, string, error) { db := c.db catalog := c.configProp.Catalog - // Query information_schema.schemata using fully qualified path - q := fmt.Sprintf(` - SELECT schema_name - FROM %s.information_schema.schemata - WHERE schema_name NOT IN ('information_schema', '_statistics_', 'mysql', 'sys') - `, safeSQLName(catalog)) - - args := []any{} - - if pageToken != "" { - q += " AND schema_name > ?" - args = append(args, pageToken) - } - - q += " ORDER BY schema_name" - - if pageSize > 0 { - q += fmt.Sprintf(" LIMIT %d", pageSize+1) - } - - rows, err := db.QueryxContext(ctx, q, args...) - if err != nil { - return nil, "", err - } - defer rows.Close() - - var schemas []*drivers.DatabaseSchemaInfo - for rows.Next() { - var schemaName string - if err := rows.Scan(&schemaName); err != nil { - return nil, "", err - } - - // StarRocks mapping: Database = catalog, DatabaseSchema = database - schemas = append(schemas, &drivers.DatabaseSchemaInfo{ - Database: catalog, // Catalog name (e.g., default_catalog, iceberg_catalog) - DatabaseSchema: schemaName, // Database name (e.g., sales, analytics) - }) - } - - if err := rows.Err(); err != nil { - return nil, "", err - } - - // Handle pagination - var nextToken string - if pageSize > 0 && uint32(len(schemas)) > pageSize { - schemas = schemas[:pageSize] - nextToken = schemas[len(schemas)-1].DatabaseSchema - } - - return schemas, nextToken, nil -} - -// ListTables returns a list of tables in a specific database schema. -// database parameter = catalog, databaseSchema parameter = database -func (c *connection) ListTables(ctx context.Context, database, databaseSchema string, pageSize uint32, pageToken string) ([]*drivers.TableInfo, string, error) { - db := c.db - - // StarRocks mapping: database parameter = catalog - catalog := database - - // StarRocks mapping: databaseSchema parameter = database - dbSchema := databaseSchema - - // Query information_schema.tables using fully qualified path + // Build query using fully qualified information_schema path + // Pattern: catalog.information_schema.tables q := fmt.Sprintf(` SELECT + table_schema, table_name, CASE WHEN table_type = 'VIEW' THEN true @@ -299,17 +260,22 @@ func (c *connection) ListTables(ctx context.Context, database, databaseSchema st ELSE false END AS is_view FROM %s.information_schema.tables - WHERE table_schema = ? + WHERE table_schema NOT IN ('information_schema', '_statistics_', 'mysql', 'sys') `, safeSQLName(catalog)) - args := []any{dbSchema} + args := []any{} + + if like != "" { + q += " AND table_name LIKE ?" + args = append(args, like) + } if pageToken != "" { q += " AND table_name > ?" args = append(args, pageToken) } - q += " ORDER BY table_name" + q += " ORDER BY table_schema, table_name" if pageSize > 0 { q += fmt.Sprintf(" LIMIT %d", pageSize+1) @@ -321,17 +287,19 @@ func (c *connection) ListTables(ctx context.Context, database, databaseSchema st } defer rows.Close() - var tables []*drivers.TableInfo + var tables []*drivers.OlapTable for rows.Next() { - var tableName string + var schema, name string var isView bool - if err := rows.Scan(&tableName, &isView); err != nil { + if err := rows.Scan(&schema, &name, &isView); err != nil { return nil, "", err } - tables = append(tables, &drivers.TableInfo{ - Name: tableName, - View: isView, + tables = append(tables, &drivers.OlapTable{ + Database: catalog, // StarRocks catalog -> Rill database + DatabaseSchema: schema, // StarRocks database -> Rill databaseSchema + Name: name, + View: isView, }) } @@ -349,61 +317,34 @@ func (c *connection) ListTables(ctx context.Context, database, databaseSchema st return tables, nextToken, nil } -// GetTable returns metadata about a specific table. -func (c *connection) GetTable(ctx context.Context, database, databaseSchema, tableName string) (*drivers.TableMetadata, error) { - db := c.db - - // StarRocks mapping: database parameter = catalog - catalog := database - - // StarRocks mapping: databaseSchema parameter = database - dbSchema := databaseSchema - - // Query table metadata and columns using JOIN - query := fmt.Sprintf(` - SELECT - CASE - WHEN t.table_type = 'VIEW' THEN true - WHEN t.table_type = 'MATERIALIZED VIEW' THEN true - ELSE false - END AS is_view, - c.column_name, - c.data_type - FROM %s.information_schema.tables t - JOIN %s.information_schema.columns c - ON t.table_schema = c.table_schema - AND t.table_name = c.table_name - WHERE t.table_schema = ? AND LOWER(t.table_name) = LOWER(?) - ORDER BY c.ordinal_position - `, safeSQLName(catalog), safeSQLName(catalog)) - - rows, err := db.QueryxContext(ctx, query, dbSchema, tableName) - if err != nil { - return nil, fmt.Errorf("failed to get table metadata: %w", err) - } - defer rows.Close() +// LoadPhysicalSize populates the PhysicalSizeBytes field of table metadata. +// For external catalogs, this may not be available. +func (c *connection) LoadPhysicalSize(ctx context.Context, tables []*drivers.OlapTable) error { + // StarRocks doesn't easily expose physical size for external tables + // For internal tables, we could query be_tablets but it's complex + // Return without error, leaving PhysicalSizeBytes as 0 + return nil +} - schema := make(map[string]string) - var isView bool +// LoadDDL implements drivers.OLAPInformationSchema. +func (c *connection) LoadDDL(ctx context.Context, table *drivers.OlapTable) error { + db := c.db - for rows.Next() { - var colName, dataType string - if err := rows.Scan(&isView, &colName, &dataType); err != nil { - return nil, err - } - schema[colName] = dataType + catalog := table.Database + if catalog == "" { + catalog = c.configProp.Catalog } - - if err := rows.Err(); err != nil { - return nil, err + schema := table.DatabaseSchema + if schema == "" { + schema = c.configProp.Database } - if len(schema) == 0 { - return nil, fmt.Errorf("table not found") + q := fmt.Sprintf("SHOW CREATE TABLE %s.%s.%s", safeSQLName(catalog), safeSQLName(schema), safeSQLName(table.Name)) + var name, ddl string + err := db.QueryRowxContext(ctx, q).Scan(&name, &ddl) + if err != nil { + return err } - - return &drivers.TableMetadata{ - View: isView, - Schema: schema, - }, nil + table.DDL = ddl + return nil } diff --git a/runtime/server/connector_service.go b/runtime/server/connector_service.go index 185e51c26e34..8182ecee6102 100644 --- a/runtime/server/connector_service.go +++ b/runtime/server/connector_service.go @@ -194,12 +194,20 @@ func (s *Server) GetTable(ctx context.Context, req *runtimev1.GetTableRequest) ( return nil, fmt.Errorf("connector %q does not implement information schema", req.Connector) } - tableMetadata, err := is.GetTable(ctx, req.Database, req.DatabaseSchema, req.Table) + tableMetadata, err := is.Lookup(ctx, req.Database, req.DatabaseSchema, req.Table) if err != nil { return nil, err } + schema := make(map[string]string) + for _, field := range tableMetadata.Schema.Fields { + if field.Type.Code == runtimev1.Type_CODE_UNSPECIFIED { + schema[field.Name] = fmt.Sprintf("UNKNOWN(%s)", field.Type.RawType) + } else { + schema[field.Name] = field.Type.RawType + } + } return &runtimev1.GetTableResponse{ - Schema: tableMetadata.Schema, + Schema: schema, }, nil } From 41001b58b644cc80b0d99480320ed9db770b1b9a Mon Sep 17 00:00:00 2001 From: NamanMahor Date: Thu, 2 Jul 2026 13:04:15 +0530 Subject: [PATCH 2/4] Lint --- runtime/drivers/athena/information_schema.go | 2 +- runtime/drivers/mysql/information_schema.go | 2 +- runtime/drivers/postgres/information_schema.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/runtime/drivers/athena/information_schema.go b/runtime/drivers/athena/information_schema.go index 24f129428029..5bc92e9de901 100644 --- a/runtime/drivers/athena/information_schema.go +++ b/runtime/drivers/athena/information_schema.go @@ -141,7 +141,7 @@ func (c *Connection) Lookup(ctx context.Context, database, databaseSchema, name var col, typ string fields := make([]*runtimev1.StructType_Field, 0) for rows.Next() { - if err = rows.Scan(&view, &col, &typ); err != nil { + if err := rows.Scan(&view, &col, &typ); err != nil { return nil, err } fields = append(fields, &runtimev1.StructType_Field{ diff --git a/runtime/drivers/mysql/information_schema.go b/runtime/drivers/mysql/information_schema.go index f95f952a8253..3381d709cc9d 100644 --- a/runtime/drivers/mysql/information_schema.go +++ b/runtime/drivers/mysql/information_schema.go @@ -157,7 +157,7 @@ func (c *connection) Lookup(ctx context.Context, database, databaseSchema, name var col, typ string fields := make([]*runtimev1.StructType_Field, 0) for rows.Next() { - if err = rows.Scan(&view, &col, &typ); err != nil { + if err := rows.Scan(&view, &col, &typ); err != nil { return nil, err } fields = append(fields, &runtimev1.StructType_Field{ diff --git a/runtime/drivers/postgres/information_schema.go b/runtime/drivers/postgres/information_schema.go index fdb4c0dd877c..3806a5dca23e 100644 --- a/runtime/drivers/postgres/information_schema.go +++ b/runtime/drivers/postgres/information_schema.go @@ -160,7 +160,7 @@ func (c *connection) Lookup(ctx context.Context, database, databaseSchema, name var col, typ string fields := make([]*runtimev1.StructType_Field, 0) for rows.Next() { - if err = rows.Scan(&view, &col, &typ); err != nil { + if err := rows.Scan(&view, &col, &typ); err != nil { return nil, err } fields = append(fields, &runtimev1.StructType_Field{ From 1939316bd7f54b0bad92084c05e8437b6dc00c0f Mon Sep 17 00:00:00 2001 From: NamanMahor Date: Thu, 2 Jul 2026 13:51:50 +0530 Subject: [PATCH 3/4] fixes for type comming as unknown --- runtime/drivers/athena/olap.go | 9 ++++++++- runtime/drivers/postgres/olap.go | 2 +- runtime/drivers/redshift/information_schema.go | 8 ++++---- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/runtime/drivers/athena/olap.go b/runtime/drivers/athena/olap.go index 3ce0698ccaef..2d8bede88770 100644 --- a/runtime/drivers/athena/olap.go +++ b/runtime/drivers/athena/olap.go @@ -256,7 +256,14 @@ func (r *rows) runtimeSchema() *runtimev1.StructType { func athenaTypeToRuntimeType(colType string) *runtimev1.Type { t := &runtimev1.Type{RawType: colType} - switch strings.ToLower(colType) { + + typeLower := strings.ToLower(colType) + baseType := typeLower + if idx := strings.Index(typeLower, "("); idx != -1 { + baseType = typeLower[:idx] + } + + switch baseType { case "tinyint": t.Code = runtimev1.Type_CODE_INT8 case "smallint": diff --git a/runtime/drivers/postgres/olap.go b/runtime/drivers/postgres/olap.go index 0bd9bfc94a72..bd88c3ffd1f5 100644 --- a/runtime/drivers/postgres/olap.go +++ b/runtime/drivers/postgres/olap.go @@ -141,7 +141,7 @@ func databaseTypeToPB(dbt string) *runtimev1.Type { return t } - switch dbt { + switch strings.ToUpper(dbt) { case "NUMERIC", "DECIMAL": t.Code = runtimev1.Type_CODE_DECIMAL case "INT2", "SMALLINT", "SMALLSERIAL": diff --git a/runtime/drivers/redshift/information_schema.go b/runtime/drivers/redshift/information_schema.go index 0d8e18ba9705..dc01a40382f4 100644 --- a/runtime/drivers/redshift/information_schema.go +++ b/runtime/drivers/redshift/information_schema.go @@ -171,7 +171,7 @@ func (c *Connection) Lookup(ctx context.Context, database, databaseSchema, name } var view bool fields := make([]*runtimev1.StructType_Field, len(result.Records)) - for _, record := range result.Records { + for i, record := range result.Records { viewField, ok := record[0].(*types.FieldMemberBooleanValue) if !ok { return nil, fmt.Errorf("unexpected type for column_name field") @@ -181,14 +181,14 @@ func (c *Connection) Lookup(ctx context.Context, database, databaseSchema, name if !ok { return nil, fmt.Errorf("unexpected type for column_name field") } - typeField, ok := record[0].(*types.FieldMemberStringValue) + typeField, ok := record[2].(*types.FieldMemberStringValue) if !ok { return nil, fmt.Errorf("unexpected type for data_type field") } - fields = append(fields, &runtimev1.StructType_Field{ + fields[i] = &runtimev1.StructType_Field{ Name: colField.Value, Type: redshiftTypeToRuntimeType(typeField.Value), - }) + } } return &drivers.OlapTable{ Database: database, From e6edbf680bd7b630bb7dbbcdc3f72ec1d106a19b Mon Sep 17 00:00:00 2001 From: NamanMahor Date: Thu, 2 Jul 2026 14:20:12 +0530 Subject: [PATCH 4/4] review comments --- runtime/server/connector_service.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/runtime/server/connector_service.go b/runtime/server/connector_service.go index 8182ecee6102..2c7124d604f9 100644 --- a/runtime/server/connector_service.go +++ b/runtime/server/connector_service.go @@ -194,17 +194,24 @@ func (s *Server) GetTable(ctx context.Context, req *runtimev1.GetTableRequest) ( return nil, fmt.Errorf("connector %q does not implement information schema", req.Connector) } - tableMetadata, err := is.Lookup(ctx, req.Database, req.DatabaseSchema, req.Table) + table, err := is.Lookup(ctx, req.Database, req.DatabaseSchema, req.Table) if err != nil { return nil, err } - schema := make(map[string]string) - for _, field := range tableMetadata.Schema.Fields { + size := len(table.Schema.Fields) + if table.UnsupportedCols != nil { + size += len(table.UnsupportedCols) + } + schema := make(map[string]string, size) + for _, field := range table.Schema.Fields { + typ := field.Type.RawType if field.Type.Code == runtimev1.Type_CODE_UNSPECIFIED { - schema[field.Name] = fmt.Sprintf("UNKNOWN(%s)", field.Type.RawType) - } else { - schema[field.Name] = field.Type.RawType + typ = fmt.Sprintf("UNKNOWN(%s)", typ) } + schema[field.Name] = typ + } + for name, typ := range table.UnsupportedCols { + schema[name] = fmt.Sprintf("UNKNOWN(%s)", typ) } return &runtimev1.GetTableResponse{