PySparkの基本的なコマンドをまとめました。
PySparkの初期化
PySparkを使用するには、SparkSession
を初期化する必要があります。以下はその基本的なコードです。
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MyPySparkApp") \
.getOrCreate()
csvデータの読み込みと表示
csvデータをPySparkで読み込み、データの中身を確認してみましょう。
# データの読み込み
df = spark.read.csv("demo_data.csv", header=True, inferSchema=True)
# データの表示
df.show()
データの基本統計量
データの基本的な統計量を取得することもできます。
# 基本統計量の表示
df.describe().show()
データのフィルタリング
特定の条件を満たすデータを抽出するには、filter
メソッドを使用します。
# 年齢が30歳以上のデータを抽出
filtered_df = df.filter(df['Age'] >= 30)
filtered_df.show()
データの集計
データを集計するためには、groupBy
と集計関数を組み合わせます。
# 平均年齢と最高給与の計算
result_df = df.groupBy().agg({'Age': 'mean', 'Salary': 'max'})
result_df.show()
データのソート
データを特定の列で昇順または降順にソートすることができます。
# 給与の降順でソート
sorted_df = df.orderBy('Salary', ascending=False)
sorted_df.show()
データの列の選択と新しい列の追加
特定の列を選択したり、新しい列を追加することができます。
# 名前と給与列の選択
selected_df = df.select('Name', 'Salary')
# 新しい列の追加
df_with_bonus = df.withColumn('Bonus', df['Salary'] * 0.1)
selected_df.show()
df_with_bonus.show()
データの欠損値の処理
データ内の欠損値を処理するために、na
メソッドを使用できます。
# 欠損値を含む行を削除
cleaned_df = df.na.drop()
# 特定の列の欠損値を置換
filled_df = df.na.fill({'Salary': 0})
cleaned_df.show()
filled_df.show()
データの結合
複数のデータフレームを結合することができます。
# 別のデモデータの作成
other_data = spark.createDataFrame([('John', 'Engineering'), ('Alice', 'Marketing')],
['Name', 'Department'])
# データの結合
joined_df = df.join(other_data, on='Name', how='inner')
joined_df.show()
データの保存
処理したデータをファイルに保存することができます。
# CSVファイルに保存
df.write.csv('output_data.csv', header=True)
# Parquet形式で保存
df.write.parquet('output_data.parquet')
データのサンプリング
データセットからランダムにサンプルを抽出することができます。
# ランダムに10%のデータをサンプリング
sampled_df = df.sample(fraction=0.1, seed=42)
sampled_df.show()
データのパーティショニング
データを特定のキーでパーティション分割することで、処理の効率を向上させることができます。
# 名前を基準にデータをパーティション分割
partitioned_df = df.repartition('Name')
データのカラムの変更とリネーム
既存の列のデータを変更したり、列の名前を変更することができます。
# 年齢列の2倍の値に変更
df = df.withColumn('Age', df['Age'] * 2)
# 列のリネーム
df = df.withColumnRenamed('Salary', 'MonthlySalary')
df.show()
データのUDF(User Defined Function)の適用
独自の関数をデータに適用することができます。
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
# 簡単なUDFの定義
@udf(IntegerType())
def add_bonus(salary):
return salary + 1000
# UDFの適用
df = df.withColumn('SalaryWithBonus', add_bonus(df['MonthlySalary']))
df.show()
sqlクエリの実行
SQLクエリを使用してデータを処理することも可能です。
# テーブルの登録
df.createOrReplaceTempView('employee_data')
# SQLクエリの実行
result_sql = spark.sql('SELECT Name, MonthlySalary FROM employee_data WHERE Age >= 30')
result_sql.show()
データのキャッシュ
データを一時的にキャッシュすることで、後続の処理が高速になります。
# データのキャッシュ
df.cache()
データのクリーンアップ
不要なデータやキャッシュを解放してメモリを効果的に管理します。
# キャッシュの解放
df.unpersist()
データの可視化
PySparkではデータを可視化するためのツールが限られていますが、Pandasデータフレームに変換してからMatplotlibやSeabornなどのライブラリを使用して可視化が可能です。
import pandas as pd
import matplotlib.pyplot as plt
# PySparkデータフレームをPandasデータフレームに変換
pandas_df = df.toPandas()
# ヒストグラムの描画
pandas_df['Age'].plot(kind='hist', bins=10)
plt.title('Age Distribution')
plt.show()
コメント