Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
export SPARK_MASTER_HOST=doris--spark-iceberg

# wait iceberg-rest start
while [[ ! $(curl -s --fail http://rest:8181/v1/config) ]]; do
while ! curl -s --fail http://rest:8181/v1/config >/dev/null; do
sleep 1
done

set -ex

mkdir -p /opt/spark/events
SPARK_THRIFT_EXTENSIONS="org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions"

for f in /opt/spark/sbin/*; do
ln -s $f /usr/local/bin/$(basename $f)
Expand All @@ -39,7 +40,6 @@ done
start-master.sh -p 7077
start-worker.sh spark://doris--spark-iceberg:7077
start-history-server.sh
start-thriftserver.sh --driver-java-options "-Dderby.system.home=/tmp/derby"

# The creation of a Spark SQL client is time-consuming,
# and reopening a new client for each SQL file execution leads to significant overhead.
Expand Down Expand Up @@ -68,6 +68,39 @@ END_TIME3=$(date +%s)
EXECUTION_TIME3=$((END_TIME3 - START_TIME3))
echo "Script iceberg load total: {} executed in $EXECUTION_TIME3 seconds"

spark-sql \
--master spark://doris--spark-iceberg:7077 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
-e "CREATE DATABASE IF NOT EXISTS demo.default"

start-thriftserver.sh \
--master spark://doris--spark-iceberg:7077 \
--conf "spark.sql.extensions=${SPARK_THRIFT_EXTENSIONS}" \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.cores.max=8 \
--conf spark.executor.cores=4 \
--conf spark.executor.memory=8g \
--conf spark.driver.memory=4g \
--conf spark.sql.shuffle.partitions=16 \
--conf spark.default.parallelism=16 \
--driver-java-options "-Dderby.system.home=/tmp/derby"

SPARK_THRIFT_READY_ATTEMPTS=0
while ! beeline \
-u "jdbc:hive2://localhost:10000/default" \
-n hadoop \
-p hadoop \
-e "SELECT 1" >/tmp/spark-thriftserver-ready.log 2>&1; do
SPARK_THRIFT_READY_ATTEMPTS=$((SPARK_THRIFT_READY_ATTEMPTS + 1))
if [ "${SPARK_THRIFT_READY_ATTEMPTS}" -ge 120 ]; then
echo "ERROR: Spark thriftserver did not become ready after ${SPARK_THRIFT_READY_ATTEMPTS} attempts" >&2
cat /tmp/spark-thriftserver-ready.log >&2 || true
tail -n 200 /opt/spark/logs/*HiveThriftServer2*.out >&2 || true
exit 1
fi
sleep 1
done

touch /mnt/SUCCESS;

tail -f /dev/null
1 change: 1 addition & 0 deletions docker/thirdparties/docker-compose/iceberg/iceberg.env
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
NOTEBOOK_SERVER_PORT=8888
SPARK_DRIVER_UI_PORT=8080
SPARK_HISTORY_UI_PORT=10000
SPARK_THRIFT_PORT=11000
REST_CATALOG_PORT=18181
MINIO_UI_PORT=9000
MINIO_API_PORT=19001
2 changes: 2 additions & 0 deletions docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ services:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
ports:
- ${SPARK_THRIFT_PORT}:10000
entrypoint: /bin/sh /mnt/scripts/entrypoint.sh
user: root
networks:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,4 +228,4 @@ VALUES (1, NULL, 100.0),
(2, 'NULL', 200.0),
(3, '\\N', 300.0),
(4, 'null', 400.0),
(5, 'A', 500.0);
(5, 'A', 500.0);
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

# Example:
spark.sql.session.timeZone Asia/Shanghai

spark.sql.catalog.demo org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.demo.type rest
spark.sql.catalog.demo.uri http://rest:8181
Expand All @@ -42,4 +43,4 @@ spark.sql.catalog.paimon.warehouse s3://warehouse/wh
spark.sql.catalog.paimon.s3.endpoint http://minio:9000
spark.sql.catalog.paimon.s3.access-key admin
spark.sql.catalog.paimon.s3.secret-key password
spark.sql.catalog.paimon.s3.region us-east-1
spark.sql.catalog.paimon.s3.region us-east-1
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ class Suite implements GroovyInterceptable {

private AmazonS3 s3Client = null
private FileSystem fs = null
private String sparkIcebergContainerNameCache = null

Suite(String name, String group, SuiteContext context, SuiteCluster cluster) {
this.name = name
Expand Down Expand Up @@ -1618,80 +1617,41 @@ class Suite implements GroovyInterceptable {
return result
}

/**
* Get the spark-iceberg container name by querying docker.
* Uses 'docker ps --filter name=spark-iceberg' to find the container.
*/
private String getSparkIcebergContainerName() {
if (!Strings.isNullOrEmpty(sparkIcebergContainerNameCache)) {
return sparkIcebergContainerNameCache
}

try {
// Use docker ps with filter to find containers with 'spark-iceberg' in the name
String command = "docker ps --filter name=spark-iceberg --format {{.Names}}"
def process = command.execute()
process.waitFor()
String output = process.in.text.trim()
private List<List<Object>> spark_sql(String sqlStr, boolean isOrder = false) {
String cleanedSqlStr = sqlStr.replaceAll("\\s*;\\s*\$", "")
logger.info("Execute Spark JDBC SQL: ${cleanedSqlStr}".toString())
logger.info("Spark JDBC URL: ${context.getSparkIcebergJdbcUrl()}".toString())
return sql_impl(context.getSparkIcebergConnection(), cleanedSqlStr, isOrder)
}

if (output) {
// Get the first matching container
String containerName = output.split('\n')[0].trim()
if (containerName) {
sparkIcebergContainerNameCache = containerName
logger.info("Found spark-iceberg container: ${containerName}".toString())
return containerName
}
}
private List spark_sql_multi(Object sqlStatements, boolean isOrder = false) {
def statements = sqlStatements.toString().split(';').collect { it.trim() }.findAll { it }

logger.warn("No spark-iceberg container found via docker ps")
return null
} catch (Exception e) {
logger.warn("Failed to get spark-iceberg container via docker ps: ${e.message}".toString())
return null
if (statements.isEmpty()) {
return []
}

logger.info("Execute Spark JDBC SQL statements via ${context.getSparkIcebergJdbcUrl()}: ${statements}".toString())
Connection sparkConn = context.getSparkIcebergConnection()
return statements.collect { statement -> sql_impl(sparkConn, statement, isOrder) }
}

/**
* Execute Spark SQL on the spark-iceberg container via docker exec.
* Execute Spark SQL on the Spark ThriftServer via Hive JDBC.
*
* Usage in test suite:
* spark_iceberg "CREATE TABLE demo.test_db.t1 (id INT) USING iceberg"
* spark_iceberg "INSERT INTO demo.test_db.t1 VALUES (1)"
* def result = spark_iceberg "SELECT * FROM demo.test_db.t1"
*
* The container name is found by querying 'docker ps --filter name=spark-iceberg'
*/
String spark_iceberg(String sqlStr, int timeoutSeconds = 120) {
String containerName = getSparkIcebergContainerName()
if (containerName == null) {
throw new RuntimeException("spark-iceberg container not found. Please ensure the container is running.")
}
String masterUrl = "spark://${containerName}:7077"

// Escape double quotes in SQL string for shell command
String escapedSql = sqlStr.replaceAll('"', '\\\\"')

// Build docker exec command
String command = """docker exec ${containerName} spark-sql --master ${masterUrl} --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions -e "${escapedSql}" """

logger.info("Executing Spark Iceberg SQL: ${sqlStr}".toString())
logger.info("Container: ${containerName}".toString())

try {
String result = cmd(command, timeoutSeconds)
logger.info("Spark Iceberg SQL result: ${result}".toString())
return result
} catch (Exception e) {
logger.error("Spark Iceberg SQL failed: ${e.message}".toString())
throw e
}
List<List<Object>> spark_iceberg(String sqlStr, boolean isOrder = false) {
return spark_sql(sqlStr, isOrder)
}

/**
* Execute multiple Spark SQL statements on the spark-iceberg container.
* Execute multiple Spark SQL statements on the Spark ThriftServer via Hive JDBC.
* Statements are separated by semicolons.
* All statements are executed in one spark-sql process to reduce startup overhead.
* All statements are executed on one JDBC connection to reduce startup overhead.
*
* Usage:
* spark_iceberg_multi '''
Expand All @@ -1700,46 +1660,36 @@ class Suite implements GroovyInterceptable {
* INSERT INTO demo.test_db.t1 VALUES (1);
* '''
*/
List<String> spark_iceberg_multi(String sqlStatements, int timeoutSeconds = 300) {
def statements = sqlStatements.split(';').collect { it.trim() }.findAll { it }

if (statements.isEmpty()) {
return []
}

String combinedSql = statements.collect { "${it};" }.join(" ")
return [spark_iceberg(combinedSql, timeoutSeconds)]
List spark_iceberg_multi(Object sqlStatements, boolean isOrder = false) {
return spark_sql_multi(sqlStatements, isOrder)
}

/**
* Execute Spark SQL on the spark-iceberg container with Paimon extensions enabled.
* Execute Spark SQL with the Paimon catalog on the Spark ThriftServer via Hive JDBC.
*
* Usage in test suite:
* spark_paimon "CREATE TABLE paimon.test_db.t1 (id INT) USING paimon"
* spark_paimon "INSERT INTO paimon.test_db.t1 VALUES (1)"
* def result = spark_paimon "SELECT * FROM paimon.test_db.t1"
*/
String spark_paimon(String sqlStr, int timeoutSeconds = 120) {
String containerName = getSparkIcebergContainerName()
if (containerName == null) {
throw new RuntimeException("spark-iceberg container not found. Please ensure the container is running.")
}
String masterUrl = "spark://${containerName}:7077"

String escapedSql = sqlStr.replaceAll('"', '\\\\"')
String command = """docker exec ${containerName} spark-sql --master ${masterUrl} --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions -e "${escapedSql}" """

logger.info("Executing Spark Paimon SQL: ${sqlStr}".toString())
logger.info("Container: ${containerName}".toString())
List<List<Object>> spark_paimon(String sqlStr, boolean isOrder = false) {
return spark_sql(sqlStr, isOrder)
}

try {
String result = cmd(command, timeoutSeconds)
logger.info("Spark Paimon SQL result: ${result}".toString())
return result
} catch (Exception e) {
logger.error("Spark Paimon SQL failed: ${e.message}".toString())
throw e
}
/**
* Execute multiple Spark SQL statements with the Paimon catalog on the Spark ThriftServer via Hive JDBC.
* Statements are separated by semicolons.
* All statements are executed on one JDBC connection to reduce startup overhead.
*
* Usage:
* spark_paimon_multi '''
* CREATE DATABASE IF NOT EXISTS paimon.test_db;
* CREATE TABLE paimon.test_db.t1 (id INT) USING paimon;
* INSERT INTO paimon.test_db.t1 VALUES (1);
* '''
*/
List spark_paimon_multi(Object sqlStatements, boolean isOrder = false) {
return spark_sql_multi(sqlStatements, isOrder)
}

List<List<Object>> db2_docker(String sqlStr, boolean isOrder = false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class SuiteContext implements Closeable {
public final ThreadLocal<Connection> threadHive2DockerConn = new ThreadLocal<>()
public final ThreadLocal<Connection> threadHive3DockerConn = new ThreadLocal<>()
public final ThreadLocal<Connection> threadHiveRemoteConn = new ThreadLocal<>()
public final ThreadLocal<Connection> threadSparkIcebergConn = new ThreadLocal<>()
public final ThreadLocal<Connection> threadDB2DockerConn = new ThreadLocal<>()
private final ThreadLocal<Syncer> syncer = new ThreadLocal<>()
public final Config config
Expand Down Expand Up @@ -239,6 +240,15 @@ class SuiteContext implements Closeable {
return threadConn
}

Connection getSparkIcebergConnection() {
def threadConn = threadSparkIcebergConn.get()
if (threadConn == null) {
threadConn = getConnectionBySparkIcebergConfig()
threadSparkIcebergConn.set(threadConn)
}
return threadConn
}

Connection getDB2DockerConnection() {
def threadConn = threadDB2DockerConn.get()
if (threadConn == null) {
Expand Down Expand Up @@ -314,6 +324,21 @@ class SuiteContext implements Closeable {
return DriverManager.getConnection(hiveJdbcUrl, hiveJdbcUser, hiveJdbcPassword)
}

Connection getConnectionBySparkIcebergConfig() {
Class.forName("org.apache.hive.jdbc.HiveDriver");
String sparkJdbcUser = "hadoop"
String sparkJdbcPassword = "hadoop"
String sparkJdbcUrl = getSparkIcebergJdbcUrl()
log.info("Create Spark Iceberg JDBC connection to ${sparkJdbcUrl}".toString())
return DriverManager.getConnection(sparkJdbcUrl, sparkJdbcUser, sparkJdbcPassword)
}

String getSparkIcebergJdbcUrl() {
String sparkHost = config.otherConfigs.get("externalEnvIp")
String sparkPort = config.otherConfigs.get("iceberg_spark_thrift_port") ?: "11000"
return "jdbc:hive2://${sparkHost}:${sparkPort}/default"
}

Connection getConnectionByDB2DockerConfig() {
Class.forName("com.ibm.db2.jcc.DB2Driver");
String db2Host = config.otherConfigs.get("externalEnvIp")
Expand Down Expand Up @@ -616,6 +641,16 @@ class SuiteContext implements Closeable {
log.warn("Close connection failed", t)
}
}

Connection spark_iceberg_conn = threadSparkIcebergConn.get()
if (spark_iceberg_conn != null) {
threadSparkIcebergConn.remove()
try {
spark_iceberg_conn.close()
} catch (Throwable t) {
log.warn("Close connection failed", t)
}
}

}

Expand Down
Loading
Loading