admin 管理员组文章数量: 1086019
I am trying to migrate Delta Tables to Iceberg using Scala-Spark keeping the data intact, following this - .4.3/delta-lake-migration/
Here is the sample code (credentials and directory names are changed for posting here) -
package com.test.iceberg.poc
import io.delta.tables.DeltaTable
import .apache.spark.sql.SparkSession
import .apache.iceberg.catalog.TableIdentifier
import .apache.iceberg.hadoop.HadoopCatalog
import .apache.hadoop.conf.Configuration
import .apache.iceberg.delta.DeltaLakeToIcebergMigrationActionsProvider
object DeltaToIcebergMigration {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Delta to Iceberg Migration")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", ".apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.hadoop.fs.s3a.endpoint", ";)
.config("spark.hadoop.fs.s3a.access.key", "access_key")
.config("spark.hadoop.fs.s3a.secret.key", "secret_key")
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "true")
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.hadoop.fs.s3a.impl", ".apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.sql.legacy.timeParserPolicy", "LEGACY")
.config("spark.master", "local[*]")
.getOrCreate()
try {
val sourceDeltaLakeTableLocation = "s3a://sample-bucket-dev/one/two/three/four/five_tmp/"
val destTableLocation = "s3a://sample-bucket-dev/one/two/three/demo-iceberg-warehouse/default/five_tmp_iceberg"
val destTableIdentifier = TableIdentifier.of("default", "five_tmp_iceberg")
val hadoopConf = spark.sparkContext.hadoopConfiguration
val icebergCatalog = new HadoopCatalog(hadoopConf, "s3a://sample-bucket-dev/one/two/three/demo-iceberg-warehouse/")
if (DeltaTable.isDeltaTable(spark, sourceDeltaLakeTableLocation)) {
DeltaLakeToIcebergMigrationActionsProvider
.defaultActions()
.snapshotDeltaLakeTable(sourceDeltaLakeTableLocation)
.as(destTableIdentifier)
.icebergCatalog(icebergCatalog)
.tableLocation(destTableLocation)
.deltaLakeConfiguration(hadoopConf)
.execute()
} else {
println(s"Skipping migration: No valid Delta table found at $sourceDeltaLakeTableLocation")
}
println(s"Iceberg metadata created at: $destTableLocation")
val icebergTable = spark.read.format("iceberg").load("iceberg.default.five_tmp_iceberg")
icebergTable.show(10)
} catch {
case e: Exception =>
println(s"Migration failed: ${e.getMessage}")
e.printStackTrace()
} finally {
spark.stop()
}
}
}
It ends up failing with,
Migration failed: Delta Lake table at s3a://sample-bucket-dev/one/two/three/four/five_tmp/ contains no constructable snapshot
.apache.iceberg.exceptions.ValidationException: Delta Lake table at s3a://sample-bucket-dev/one/two/three/four/five_tmp/ contains no constructable snapshot
at .apache.iceberg.delta.BaseSnapshotDeltaLakeTableActionmitInitialDeltaSnapshotToIcebergTransaction(BaseSnapshotDeltaLakeTableAction.java:270)
at .apache.iceberg.delta.BaseSnapshotDeltaLakeTableAction.execute(BaseSnapshotDeltaLakeTableAction.java:180)
at .apache.iceberg.delta.BaseSnapshotDeltaLakeTableAction.execute(BaseSnapshotDeltaLakeTableAction.java:76)
...
The Delta Tables are correctly created earlier with _delta_log intact. I can read and print them as well.
One thing that I have noticed while running in debug mode, while executing this,
DeltaLakeToIcebergMigrationActionsProvider
.defaultActions()
.snapshotDeltaLakeTable(sourceDeltaLakeTableLocation)
.as(destTableIdentifier)
.icebergCatalog(icebergCatalog)
.tableLocation(destTableLocation)
.deltaLakeConfiguration(hadoopConf)
.execute()
The control goes into BaseSnapshotDeltaLakeTableAction class that resides in below maven dependency,
<dependency>
<groupId>.apache.iceberg</groupId>
<artifactId>iceberg-delta-lake</artifactId>
<version>1.2.1</version>
</dependency>
Inside that class, it goes on to form a full path to each and every delta table parquet file using this method,
private static String getFullFilePath(String path, String tableRoot) {
URI dataFileUri = URI.create(path);
String decodedPath = URLDecoder.decode(path, StandardCharsets.UTF_8);
if (dataFileUri.isAbsolute()) {
return decodedPath;
} else {
return tableRoot + File.separator + decodedPath;
}
}
The path it forms looks like this,
s3a://sample-bucket-dev/one/two/three/four/five_tmp\import_date=2025-03-04/part-00014-dac0df00-d1d8-4a5f-ba36-c0f392816ada.c000.snappy.parquet
Here, it adds that backslash before import_date partition through above method. This looks to be malformed and eventually it prints below error message in the console,
File s3a://sample-bucket-dev/one/two/three/four/five_tmp\import_date=2025-03-04/part-00014-dac0df00-d1d8-4a5f-ba36-c0f392816ada.c000.snappy.parquet is referenced in the logs of Delta Lake table at s3a://sample-bucket-dev/one/two/three/four/five_tmp/, but cannot be found in the storage
What am I missing here possibly?
本文标签: Delta Table to Iceberg metadata migration is failingStack Overflow
版权声明:本文标题:Delta Table to Iceberg metadata migration is failing - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://roclinux.cn/p/1744096766a2533038.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论