PysparkのDataFrameで差分のみを取得する方法

技術記

PysparkのDataFrameで差分のみを取得する方法

端的に言えば, left_anti で join すれば差分情報のみにすることができます。

目的

今回の目的は まったく同じの列を持つ2つのDataFrameがあり、1つは新しいデータがあり、1つは古いデータがある状態です。 古いデータから 更新があった行 追加された行 のみを抽出した差分のDataFrameを作る事が目的です。

内容

  • left_anti join を使い、 new_df (新しいデータ)から old_df (古いデータ)に存在しない行を取得します。
  • left_anti は、new_df の中で old_df に存在しない行のみを抽出するので、新規追加された行や変更された行が対象になります。
  • 完全一致する行は old_df に存在するので除外される ため、変更行も含まれます。
  • キー列を基準にした単純な存在チェックではなく、全カラムを比較 することで、値の変更も考慮します。

left_antiとは

https://spark.apache.org/docs/3.5.3/sql-ref-syntax-qry-select-join.html アンチ結合は、右側と一致しない左側の関係からの値を返します。これは、左アンチ結合とも呼ばれます。

leftを new_df , rightを old_df として、 left_anti join で下記の図のように差分を取得します。 

Left anti ベン図

実際のコード

結果

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# 例としてデータを作成
data1 = [
    (1, "A", 100, "X"),  # 変更なし
    (2, "B", 200, "Y"),  # 変更なし
    (3, "C", 300, "Z")   # 変更あり(新データでは col3 が 999 になっている)
]
data2 = [
    (1, "A", 100, "X"),  # 変更なし
    (2, "B", 200, "Y"),  # 変更なし
    (3, "C", 999, "Z"),  # 変更あり
    (4, "D", 400, "W")   # 新規追加
]

# 古いデータ
old_df = spark.createDataFrame(data1, ["col1", "col2", "col3", "col4"])
# 新しいデータ
new_df = spark.createDataFrame(data2, ["col1", "col2", "col3", "col4"])

# new_df にあって old_df にない(追加 or 変更)行を取得
df3 = new_df.alias("new").join(old_df.alias("old"), on=old_df.columns, how="left_anti")

# 結果表示
df3.show()
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   3|   C| 999|   Z|  # col3 の値が 300 -> 999 に変更された行
|   4|   D| 400|   W|  # 新しく追加された行
+----+----+----+----+

補足: 更新された行のみを取得する場合

もし 更新された行(df1 にはあるが df2 には無い)のみを取得したい場合は、df1 を基準に left_anti を適用します。

df_deleted = df1.alias("old").join(df2.alias("new"), on=df1.columns, how="left_anti")
df_deleted.show()

結果(削除された行)

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   3|   C| 300|   Z|  # df1にあったがdf2で値が変更された
+----+----+----+----+

コメント

タイトルとURLをコピーしました