pyspark の基本操作まとめ

プログラミング

PySparkの基本的なコマンドをまとめました。

PySparkの初期化

PySparkを使用するには、SparkSessionを初期化する必要があります。以下はその基本的なコードです。

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyPySparkApp") \
    .getOrCreate()

csvデータの読み込みと表示

csvデータをPySparkで読み込み、データの中身を確認してみましょう。

# データの読み込み
df = spark.read.csv("demo_data.csv", header=True, inferSchema=True)

# データの表示
df.show()

データの基本統計量

データの基本的な統計量を取得することもできます。

# 基本統計量の表示
df.describe().show()

データのフィルタリング

特定の条件を満たすデータを抽出するには、filterメソッドを使用します。

# 年齢が30歳以上のデータを抽出
filtered_df = df.filter(df['Age'] >= 30)
filtered_df.show()

データの集計

データを集計するためには、groupByと集計関数を組み合わせます。

# 平均年齢と最高給与の計算
result_df = df.groupBy().agg({'Age': 'mean', 'Salary': 'max'})
result_df.show()

データのソート

データを特定の列で昇順または降順にソートすることができます。

# 給与の降順でソート
sorted_df = df.orderBy('Salary', ascending=False)
sorted_df.show()

データの列の選択と新しい列の追加

特定の列を選択したり、新しい列を追加することができます。

# 名前と給与列の選択
selected_df = df.select('Name', 'Salary')

# 新しい列の追加
df_with_bonus = df.withColumn('Bonus', df['Salary'] * 0.1)
selected_df.show()
df_with_bonus.show()

データの欠損値の処理

データ内の欠損値を処理するために、naメソッドを使用できます。

# 欠損値を含む行を削除
cleaned_df = df.na.drop()

# 特定の列の欠損値を置換
filled_df = df.na.fill({'Salary': 0})

cleaned_df.show()
filled_df.show()

データの結合

複数のデータフレームを結合することができます。

# 別のデモデータの作成
other_data = spark.createDataFrame([('John', 'Engineering'), ('Alice', 'Marketing')],
                                   ['Name', 'Department'])

# データの結合
joined_df = df.join(other_data, on='Name', how='inner')
joined_df.show()

データの保存

処理したデータをファイルに保存することができます。

# CSVファイルに保存
df.write.csv('output_data.csv', header=True)

# Parquet形式で保存
df.write.parquet('output_data.parquet')

データのサンプリング

データセットからランダムにサンプルを抽出することができます。

# ランダムに10%のデータをサンプリング
sampled_df = df.sample(fraction=0.1, seed=42)
sampled_df.show()

データのパーティショニング

データを特定のキーでパーティション分割することで、処理の効率を向上させることができます。

# 名前を基準にデータをパーティション分割
partitioned_df = df.repartition('Name')

データのカラムの変更とリネーム

既存の列のデータを変更したり、列の名前を変更することができます。

# 年齢列の2倍の値に変更
df = df.withColumn('Age', df['Age'] * 2)

# 列のリネーム
df = df.withColumnRenamed('Salary', 'MonthlySalary')
df.show()

データのUDF(User Defined Function)の適用

独自の関数をデータに適用することができます。

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# 簡単なUDFの定義
@udf(IntegerType())
def add_bonus(salary):
    return salary + 1000

# UDFの適用
df = df.withColumn('SalaryWithBonus', add_bonus(df['MonthlySalary']))
df.show()

sqlクエリの実行

SQLクエリを使用してデータを処理することも可能です。

# テーブルの登録
df.createOrReplaceTempView('employee_data')

# SQLクエリの実行
result_sql = spark.sql('SELECT Name, MonthlySalary FROM employee_data WHERE Age >= 30')
result_sql.show()

データのキャッシュ

データを一時的にキャッシュすることで、後続の処理が高速になります。

# データのキャッシュ
df.cache()

データのクリーンアップ

不要なデータやキャッシュを解放してメモリを効果的に管理します。

# キャッシュの解放
df.unpersist()

データの可視化

PySparkではデータを可視化するためのツールが限られていますが、Pandasデータフレームに変換してからMatplotlibやSeabornなどのライブラリを使用して可視化が可能です。

import pandas as pd
import matplotlib.pyplot as plt

# PySparkデータフレームをPandasデータフレームに変換
pandas_df = df.toPandas()

# ヒストグラムの描画
pandas_df['Age'].plot(kind='hist', bins=10)
plt.title('Age Distribution')
plt.show()

コメント

タイトルとURLをコピーしました