PySpark作业是否可以在增量表中写入,并在同一代码中从该表中读取?这就是我要做的。
问题陈述:我在控制台上打印数据以查看流动情况时遇到问题。
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta import *
spark = SparkSession \
.builder \
.appName("test") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.1.0") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "demo.topic") \
.option("startingOffsets", "earliest") \
.load() \
.withColumn("ingested_timestamp", unix_timestamp()) \
.withColumn("value_str", col("value").cast(StringType())) \
.select("ingested_timestamp", "value_str")
# code to write in the delta table called events
stream = kafka_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "./data/tmp/delta/events/_checkpoints/") \
.toTable("events")
# code to read the same delta table
read_df = spark.read.format("delta").table("events");
read_df.show(5)
stream.awaitTermination()
使用以下命令运行代码时不会出错。
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,io.delta:delta-core_2.12:2.1.0 kafka_and_create_delta_table.py
我正在尝试将我要刷新到Kafka的数据可视化到Delta表中,以确保数据流动良好,底层组件也工作良好。
即使向我的主题发送了流量,我也可以看到一个空表。
Found no committed offset for the partition demo.topic-0 +------------------+---------+ |ingested_timestamp|value_str| +------------------+---------+ +------------------+---------+
任何形式的帮助都会有帮助。