admin 管理员组

文章数量: 1086019

I am trying to migrate Delta Tables to Iceberg using Scala-Spark keeping the data intact, following this - .4.3/delta-lake-migration/

Here is the sample code (credentials and directory names are changed for posting here) -

package com.test.iceberg.poc

import io.delta.tables.DeltaTable
import .apache.spark.sql.SparkSession
import .apache.iceberg.catalog.TableIdentifier
import .apache.iceberg.hadoop.HadoopCatalog
import .apache.hadoop.conf.Configuration
import .apache.iceberg.delta.DeltaLakeToIcebergMigrationActionsProvider

object DeltaToIcebergMigration {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("Delta to Iceberg Migration")
      .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
      .config("spark.sql.catalog.spark_catalog", ".apache.spark.sql.delta.catalog.DeltaCatalog")
      .config("spark.hadoop.fs.s3a.endpoint", ";)
      .config("spark.hadoop.fs.s3a.access.key", "access_key")
      .config("spark.hadoop.fs.s3a.secret.key", "secret_key")
      .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "true")
      .config("spark.hadoop.fs.s3a.path.style.access", "true")
      .config("spark.hadoop.fs.s3a.impl", ".apache.hadoop.fs.s3a.S3AFileSystem")
      .config("spark.sql.legacy.timeParserPolicy", "LEGACY")
      .config("spark.master", "local[*]")
      .getOrCreate()

    try {
      val sourceDeltaLakeTableLocation = "s3a://sample-bucket-dev/one/two/three/four/five_tmp/"

      val destTableLocation = "s3a://sample-bucket-dev/one/two/three/demo-iceberg-warehouse/default/five_tmp_iceberg"

      val destTableIdentifier = TableIdentifier.of("default", "five_tmp_iceberg")

      val hadoopConf = spark.sparkContext.hadoopConfiguration

      val icebergCatalog = new HadoopCatalog(hadoopConf, "s3a://sample-bucket-dev/one/two/three/demo-iceberg-warehouse/")

      if (DeltaTable.isDeltaTable(spark, sourceDeltaLakeTableLocation)) {

        DeltaLakeToIcebergMigrationActionsProvider
          .defaultActions()
          .snapshotDeltaLakeTable(sourceDeltaLakeTableLocation) 
          .as(destTableIdentifier) 
          .icebergCatalog(icebergCatalog) 
          .tableLocation(destTableLocation) 
          .deltaLakeConfiguration(hadoopConf) 
          .execute()
      } else {
        println(s"Skipping migration: No valid Delta table found at $sourceDeltaLakeTableLocation")
      }


      println(s"Iceberg metadata created at: $destTableLocation")

      val icebergTable = spark.read.format("iceberg").load("iceberg.default.five_tmp_iceberg")
      icebergTable.show(10)

    } catch {
      case e: Exception =>
        println(s"Migration failed: ${e.getMessage}")
        e.printStackTrace()
    } finally {
      spark.stop()
    }
  }
}

It ends up failing with,

Migration failed: Delta Lake table at s3a://sample-bucket-dev/one/two/three/four/five_tmp/ contains no constructable snapshot
.apache.iceberg.exceptions.ValidationException: Delta Lake table at s3a://sample-bucket-dev/one/two/three/four/five_tmp/ contains no constructable snapshot
    at .apache.iceberg.delta.BaseSnapshotDeltaLakeTableActionmitInitialDeltaSnapshotToIcebergTransaction(BaseSnapshotDeltaLakeTableAction.java:270)
    at .apache.iceberg.delta.BaseSnapshotDeltaLakeTableAction.execute(BaseSnapshotDeltaLakeTableAction.java:180)
    at .apache.iceberg.delta.BaseSnapshotDeltaLakeTableAction.execute(BaseSnapshotDeltaLakeTableAction.java:76)
...

The Delta Tables are correctly created earlier with _delta_log intact. I can read and print them as well.

One thing that I have noticed while running in debug mode, while executing this,

DeltaLakeToIcebergMigrationActionsProvider
              .defaultActions()
              .snapshotDeltaLakeTable(sourceDeltaLakeTableLocation) 
              .as(destTableIdentifier) 
              .icebergCatalog(icebergCatalog) 
              .tableLocation(destTableLocation) 
              .deltaLakeConfiguration(hadoopConf) 
              .execute()

The control goes into BaseSnapshotDeltaLakeTableAction class that resides in below maven dependency,

<dependency>
      <groupId>.apache.iceberg</groupId>
      <artifactId>iceberg-delta-lake</artifactId>
      <version>1.2.1</version>
</dependency>

Inside that class, it goes on to form a full path to each and every delta table parquet file using this method,

private static String getFullFilePath(String path, String tableRoot) {
    URI dataFileUri = URI.create(path);
    String decodedPath = URLDecoder.decode(path, StandardCharsets.UTF_8);
    if (dataFileUri.isAbsolute()) {
      return decodedPath;
    } else {
      return tableRoot + File.separator + decodedPath;
    }
  }

The path it forms looks like this,

s3a://sample-bucket-dev/one/two/three/four/five_tmp\import_date=2025-03-04/part-00014-dac0df00-d1d8-4a5f-ba36-c0f392816ada.c000.snappy.parquet

Here, it adds that backslash before import_date partition through above method. This looks to be malformed and eventually it prints below error message in the console,

File s3a://sample-bucket-dev/one/two/three/four/five_tmp\import_date=2025-03-04/part-00014-dac0df00-d1d8-4a5f-ba36-c0f392816ada.c000.snappy.parquet is referenced in the logs of Delta Lake table at s3a://sample-bucket-dev/one/two/three/four/five_tmp/, but cannot be found in the storage

What am I missing here possibly?

本文标签: Delta Table to Iceberg metadata migration is failingStack Overflow