我有一个简单的Delta Live Tables管道,它将多个csv文件从cloudFiles(s3存储)流式读取到发布到蜂巢元存储的Delta表中。
我有两个要求使我的情况更加复杂/独特:
- 由于csv文件的格式,我需要将
skipRows参数用于autoLoader。这需要使用Databricks运行时的预览通道(编写时为v11.3)。source - 我需要将表
columnMapping.mode属性设置为name,因为csv数据的列名中包含Delta/Parquet本机不允许的字符。来源
以上两个功能似乎都是预览/测试功能,所以我观察到的行为可能是一个bug。
我的管道定义如下:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
s3_url = "s3://<path_to_csvs>"
@dlt.table(
comment="...",
table_properties={
'delta.minReaderVersion' : '2',
'delta.minWriterVersion' : '5',
'delta.columnMapping.mode' : 'name',
'quality': 'bronze'
}
)
def bronze_my_csv_data_raw():
return (
spark.readStream.format("cloudFiles")
.option("skipRows", 1)
.option("header", "true")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("pathGlobFilter", "*.csv")
.load(s3_url)
)
当第一次设置并运行管道时,这会按预期工作,但在进行更改并运行管道的“完全刷新所有”(以刷新所有数据)时,我会收到以下错误:
com.databricks.sql.transaction.tahoe.DeltaColumnMappingUnsupportedException: Schema change is detected: old schema: root new schema: root |-- TIMESTAMP: string (nullable = true) |-- RECORD: string (nullable = true) |-- Samples_Max: string (nullable = true) ... Schema changes are not allowed during the change of column mapping mode.
即使更改目标表名以创建新的空表,也会发生这种情况。一旦发生,即使在常规(非完全刷新)运行中,也会发生相同的错误。
如有任何帮助,将不胜感激