JDBC — один из самых распространённых источников данных в Spark.
В этом разделе мы подробно рассмотрим, как
использовать официальный коннектор JDBC для ClickHouse со Spark.
Java
Scala
Python
Spark SQL
public static void main(String[] args) {
// Инициализация сеанса Spark
SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();
String jdbcURL = "jdbc:ch://localhost:8123/default";
String query = "select * from example_table where id > 2";
//---------------------------------------------------------------------------------------------------
// Загрузка таблицы из ClickHouse с помощью метода jdbc
//---------------------------------------------------------------------------------------------------
Properties jdbcProperties = new Properties();
jdbcProperties.put("user", "default");
jdbcProperties.put("password", "123456");
Dataset<Row> df1 = spark.read().jdbc(jdbcURL, String.format("(%s)", query), jdbcProperties);
df1.show();
//---------------------------------------------------------------------------------------------------
// Загрузка таблицы из ClickHouse с помощью метода load
//---------------------------------------------------------------------------------------------------
Dataset<Row> df2 = spark.read()
.format("jdbc")
.option("url", jdbcURL)
.option("user", "default")
.option("password", "123456")
.option("query", query)
.load();
df2.show();
// Остановка сеанса Spark
spark.stop();
}
object ReadData extends App {
// Инициализация сеанса Spark
val spark: SparkSession = SparkSession.builder.appName("example").master("local").getOrCreate
val jdbcURL = "jdbc:ch://localhost:8123/default"
val query: String = "select * from example_table where id > 2"
//---------------------------------------------------------------------------------------------------
// Загрузка таблицы из ClickHouse с помощью метода jdbc
//---------------------------------------------------------------------------------------------------
val connectionProperties = new Properties()
connectionProperties.put("user", "default")
connectionProperties.put("password", "123456")
val df1: Dataset[Row] = spark.read.
jdbc(jdbcURL, s"($query)", connectionProperties)
df1.show()
//---------------------------------------------------------------------------------------------------
// Загрузка таблицы из ClickHouse с помощью метода load
//---------------------------------------------------------------------------------------------------
val df2: Dataset[Row] = spark.read
.format("jdbc")
.option("url", jdbcURL)
.option("user", "default")
.option("password", "123456")
.option("query", query)
.load()
df2.show()
// Остановка сеанса Spark// Остановка сеанса Spark
spark.stop()
}
from pyspark.sql import SparkSession
jar_files = [
"jars/clickhouse-jdbc-X.X.X-SNAPSHOT-all.jar"
]
# Инициализация сеанса Spark с JAR-файлами
spark = SparkSession.builder \
.appName("example") \
.master("local") \
.config("spark.jars", ",".join(jar_files)) \
.getOrCreate()
url = "jdbc:ch://localhost:8123/default"
user = "your_user"
password = "your_password"
query = "select * from example_table where id > 2"
driver = "com.clickhouse.jdbc.ClickHouseDriver"
df = (spark.read
.format('jdbc')
.option('driver', driver)
.option('url', url)
.option('user', user)
.option('password', password).option(
'query', query).load())
df.show()
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:ch://localhost:8123/default",
dbtable "schema.tablename",
user "username",
password "password",
driver "com.clickhouse.jdbc.ClickHouseDriver"
);
SELECT * FROM jdbcTable;
Java
Scala
Python
Spark SQL
public static void main(String[] args) {
// Инициализация сеанса Spark
SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();
// Сведения о подключении JDBC
String jdbcUrl = "jdbc:ch://localhost:8123/default";
Properties jdbcProperties = new Properties();
jdbcProperties.put("user", "default");
jdbcProperties.put("password", "123456");
// Создание примера DataFrame
StructType schema = new StructType(new StructField[]{
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false)
});
List<Row> rows = new ArrayList<Row>();
rows.add(RowFactory.create(1, "John"));
rows.add(RowFactory.create(2, "Doe"));
Dataset<Row> df = spark.createDataFrame(rows, schema);
//---------------------------------------------------------------------------------------------------
// Запись df в ClickHouse с использованием метода jdbc
//---------------------------------------------------------------------------------------------------
df.write()
.mode(SaveMode.Append)
.jdbc(jdbcUrl, "example_table", jdbcProperties);
//---------------------------------------------------------------------------------------------------
// Запись df в ClickHouse с использованием метода save
//---------------------------------------------------------------------------------------------------
df.write()
.format("jdbc")
.mode("append")
.option("url", jdbcUrl)
.option("dbtable", "example_table")
.option("user", "default")
.option("password", "123456")
.save();
// Остановка сеанса Spark
spark.stop();
}
object WriteData extends App {
val spark: SparkSession = SparkSession.builder.appName("example").master("local").getOrCreate
// Сведения о подключении JDBC
val jdbcUrl: String = "jdbc:ch://localhost:8123/default"
val jdbcProperties: Properties = new Properties
jdbcProperties.put("user", "default")
jdbcProperties.put("password", "123456")
// Создание примера DataFrame
val rows = Seq(Row(1, "John"), Row(2, "Doe"))
val schema = List(
StructField("id", DataTypes.IntegerType, nullable = false),
StructField("name", StringType, nullable = true)
)
val df: DataFrame = spark.createDataFrame(
spark.sparkContext.parallelize(rows),
StructType(schema)
)
//---------------------------------------------------------------------------------------------------//---------------------------------------------------------------------------------------------------
// Запись df в ClickHouse с использованием метода jdbc
//---------------------------------------------------------------------------------------------------//---------------------------------------------------------------------------------------------------
df.write
.mode(SaveMode.Append)
.jdbc(jdbcUrl, "example_table", jdbcProperties)
//---------------------------------------------------------------------------------------------------//---------------------------------------------------------------------------------------------------
// Запись df в ClickHouse с использованием метода save
//---------------------------------------------------------------------------------------------------//---------------------------------------------------------------------------------------------------
df.write
.format("jdbc")
.mode("append")
.option("url", jdbcUrl)
.option("dbtable", "example_table")
.option("user", "default")
.option("password", "123456")
.save()
// Остановка сеанса Spark// Остановка сеанса Spark
spark.stop()
}
from pyspark.sql import SparkSession
from pyspark.sql import Row
jar_files = [
"jars/clickhouse-jdbc-X.X.X-SNAPSHOT-all.jar"
]
# Инициализация сеанса Spark с JAR-файлами
spark = SparkSession.builder \
.appName("example") \
.master("local") \
.config("spark.jars", ",".join(jar_files)) \
.getOrCreate()
# Создание DataFrame
data = [Row(id=11, name="John"), Row(id=12, name="Doe")]
df = spark.createDataFrame(data)
url = "jdbc:ch://localhost:8123/default"
user = "your_user"
password = "your_password"
driver = "com.clickhouse.jdbc.ClickHouseDriver"
# Запись DataFrame в ClickHouse
df.write \
.format("jdbc") \
.option("driver", driver) \
.option("url", url) \
.option("user", user) \
.option("password", password) \
.option("dbtable", "example_table") \
.mode("append") \
.save()
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:ch://localhost:8123/default",
dbtable "schema.tablename",
user "username",
password "password",
driver "com.clickhouse.jdbc.ClickHouseDriver"
);
-- resultTable можно создать с помощью df.createTempView или Spark SQL
INSERT INTO TABLE jdbcTable
SELECT * FROM resultTable;
При использовании Spark JDBC Spark читает данные, используя одну партицию. Чтобы добиться более высокого параллелизма, необходимо указать
partitionColumn, lowerBound, upperBound и numPartitions, которые задают, как разбивать таблицу на партиции при
параллельном чтении несколькими воркерами.
Подробнее см. в официальной документации Apache Spark
по настройкам JDBC.
- Spark JDBC не поддерживает сложные типы (MAP, ARRAY, STRUCT) из-за отсутствия диалекта ClickHouse — для полной поддержки сложных типов используйте нативный коннектор Spark-ClickHouse.
- На данный момент с помощью JDBC можно выполнять вставку данных только в существующие таблицы (сейчас нет возможности автоматически создавать
таблицу при вставке из DF, как это Spark делает с другими коннекторами).
Последнее изменение 10 июня 2026 г.