PySpark作业是否可以在增量表中写入,并在同一代码中从该表中读取?这就是我要做的。
问题陈述:我在控制台上打印数据以查看流动情况时遇到问题。
from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.sql.types import * from delta import * spark = SparkSession \ .builder \ .appName("test") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.jars.packages", "io.delta:delta-core_2.12:2.1.0") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ .getOrCreate() kafka_df = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "demo.topic") \ .option("startingOffsets", "earliest") \ .load() \ .withColumn("ingested_timestamp", unix_timestamp()) \ .withColumn("value_str", col("value").cast(StringType())) \ .select("ingested_timestamp", "value_str") # code to write in the delta table called events stream = kafka_df.writeStream \ .format("delta") \ .outputMode("append") \ .option("checkpointLocation", "./data/tmp/delta/events/_checkpoints/") \ .toTable("events") # code to read the same delta table read_df = spark.read.format("delta").table("events"); read_df.show(5) stream.awaitTermination()
使用以下命令运行代码时不会出错。
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,io.delta:delta-core_2.12:2.1.0 kafka_and_create_delta_table.py
我正在尝试将我要刷新到Kafka的数据可视化到Delta表中,以确保数据流动良好,底层组件也工作良好。
即使向我的主题发送了流量,我也可以看到一个空表。
Found no committed offset for the partition demo.topic-0 +------------------+---------+ |ingested_timestamp|value_str| +------------------+---------+ +------------------+---------+
任何形式的帮助都会有帮助。