In [1]:
# Cell 0: Imports and Spark session
import os, sys, datetime, pathlib
from pyspark.sql import SparkSession, functions as F
print("Python:", sys.version)
spark = SparkSession.builder.appName("de1-lab1").getOrCreate()
print("Spark:", spark.version)
Python: 3.10.18 (main, Jun 5 2025, 13:14:17) [GCC 11.2.0]
WARNING: Using incubator modules: jdk.incubator.vector Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties 25/12/07 21:39:53 WARN Utils: Your hostname, Wandaogo, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo) 25/12/07 21:39:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/12/07 21:39:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark: 4.0.1
In [2]:
# Cell 1: Load the CSV inputs
src_a = "/home/bibawandaogo/data engineering 1/data/lab1_dataset_a.csv"
src_b = "/home/bibawandaogo/data engineering 1/data/lab1_dataset_b.csv"
df_a = spark.read.option("header","true").option("inferSchema","true").csv(src_a)
df_b = spark.read.option("header","true").option("inferSchema","true").csv(src_b)
df = df_a.unionByName(df_b)
df.cache()
print("Rows:", df.count())
df.printSchema()
df.show(5, truncate=False)
Rows: 4 root |-- id: integer (nullable = true) |-- name: string (nullable = true) |-- value: integer (nullable = true) +---+-------+-----+ |id |name |value| +---+-------+-----+ |1 |Alice |10 | |2 |Bob |20 | |3 |Charlie|30 | |4 |David |40 | +---+-------+-----+
In [4]:
# Vérifie les colonnes
print("Colonnes disponibles:")
print(df.columns)
print("\nSchéma complet:")
df.printSchema()
print("\nPremières lignes:")
df.show()
Colonnes disponibles: ['id', 'name', 'value'] Schéma complet: root |-- id: integer (nullable = true) |-- name: string (nullable = true) |-- value: integer (nullable = true) Premières lignes: +---+-------+-----+ | id| name|value| +---+-------+-----+ | 1| Alice| 10| | 2| Bob| 20| | 3|Charlie| 30| | 4| David| 40| +---+-------+-----+
In [6]:
# Cell 0: Imports and Spark session
import os, sys, datetime, pathlib
from pyspark.sql import SparkSession, functions as F
print("Python:", sys.version)
spark = SparkSession.builder.appName("de1-lab1").getOrCreate()
print("Spark:", spark.version)
Python: 3.10.18 (main, Jun 5 2025, 13:14:17) [GCC 11.2.0] Spark: 4.0.1
In [7]:
# Cell 1: Load the CSV inputs
src_a = "/home/bibawandaogo/data engineering 1/data/lab1_dataset_a.csv"
src_b = "/home/bibawandaogo/data engineering 1/data/lab1_dataset_b.csv"
print(f"File A exists: {os.path.exists(src_a)}")
print(f"File B exists: {os.path.exists(src_b)}")
df_a = spark.read.option("header","true").option("inferSchema","true").csv(src_a)
df_b = spark.read.option("header","true").option("inferSchema","true").csv(src_b)
df = df_a.unionByName(df_b)
df.cache()
print("\n✅ Rows:", df.count())
print("\n✅ Columns:", df.columns)
df.printSchema()
df.show(3, truncate=False)
File A exists: True File B exists: True ✅ Rows: 10 ✅ Columns: ['id', 'category', 'value', 'text'] root |-- id: integer (nullable = true) |-- category: string (nullable = true) |-- value: integer (nullable = true) |-- text: string (nullable = true) +---+--------+-----+-----------------------------+ |id |category|value|text | +---+--------+-----+-----------------------------+ |1 |A |100 |hello world spark programming| |2 |B |200 |data engineering with pyspark| |3 |A |150 |hello spark hello world | +---+--------+-----+-----------------------------+ only showing top 3 rows
In [8]:
# Cell 2: Top-N with RDD API
rdd = df.select("text").rdd.flatMap(lambda row: (row[0] or "").lower().split())
pair = rdd.map(lambda t: (t, 1))
counts = pair.reduceByKey(lambda a,b: a+b)
top_rdd = counts.sortBy(lambda kv: (-kv[1], kv[0])).take(10)
print("\n" + "=" * 60)
print("🔤 Top 10 Tokens (RDD API)")
print("=" * 60)
print(f"{'Token':<20} {'Count':>10}")
print("-" * 60)
for t, c in top_rdd:
print(f"{t:<20} {c:>10}")
print("=" * 60)
# Save as CSV
pathlib.Path("outputs").mkdir(exist_ok=True)
with open("outputs/top10_rdd.csv","w",encoding="utf-8") as f:
f.write("token,count\n")
for t,c in top_rdd:
f.write(f"{t},{c}\n")
print("✅ Wrote outputs/top10_rdd.csv")
============================================================ 🔤 Top 10 Tokens (RDD API) ============================================================ Token Count ------------------------------------------------------------ hello 5 spark 4 world 4 and 3 data 3 pyspark 3 big 2 dataframes 2 engineering 2 learning 2 ============================================================ ✅ Wrote outputs/top10_rdd.csv
In [9]:
# Cell 2.5: RDD plan — evidence
_ = counts.count()
plan_rdd = df._jdf.queryExecution().executedPlan().toString()
pathlib.Path("proof").mkdir(exist_ok=True)
with open("proof/plan_rdd.txt","w") as f:
f.write(str(datetime.datetime.now()) + "\n\n")
f.write(plan_rdd)
print("✅ Saved proof/plan_rdd.txt")
# Affiche les premières lignes du plan
with open("proof/plan_rdd.txt","r") as f:
content = f.read()
print("\n" + "=" * 60)
print("📋 RDD Execution Plan (first 300 chars):")
print("=" * 60)
print(content[:300])
print("...")
print("=" * 60)
✅ Saved proof/plan_rdd.txt
============================================================
📋 RDD Execution Plan (first 300 chars):
============================================================
2025-12-07 21:46:11.736238
InMemoryTableScan [id#301, category#302, value#303, text#304]
+- InMemoryRelation [id#301, category#302, value#303, text#304], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Union
:- FileScan csv [id#301,category#302,value#303,text#304] Ba
...
============================================================
In [10]:
# Cell 3: Top-N with DataFrame API
from pyspark.sql.functions import explode, split, lower, col, count, desc, asc
tokens = explode(split(lower(col("text")), "\\s+")).alias("token")
df_tokens = df.select(tokens).where(col("token") != "")
agg_df = df_tokens.groupBy("token").agg(count("*").alias("count"))
top_df = agg_df.orderBy(desc("count"), asc("token")).limit(10)
print("\n" + "=" * 60)
print("🔤 Top 10 Tokens (DataFrame API)")
print("=" * 60)
top_df.show(truncate=False)
print("=" * 60)
# Save as CSV
top_df.coalesce(1).write.mode("overwrite").option("header","true").csv("outputs/top10_df_tmp")
# Move single part file to stable path
import glob, shutil
part = glob.glob("outputs/top10_df_tmp/part*")[0]
shutil.copy(part, "outputs/top10_df.csv")
print("✅ Wrote outputs/top10_df.csv")
============================================================ 🔤 Top 10 Tokens (DataFrame API) ============================================================
+-----------+-----+ |token |count| +-----------+-----+ |hello |5 | |spark |4 | |world |4 | |and |3 | |data |3 | |pyspark |3 | |big |2 | |dataframes |2 | |engineering|2 | |learning |2 | +-----------+-----+ ============================================================ ✅ Wrote outputs/top10_df.csv
In [11]:
# Cell 3.5: DataFrame plan — evidence
plan_df = top_df._jdf.queryExecution().executedPlan().toString()
with open("proof/plan_df.txt","w") as f:
f.write(str(datetime.datetime.now()) + "\n\n")
f.write(plan_df)
print("✅ Saved proof/plan_df.txt")
# Affiche les premières lignes du plan
with open("proof/plan_df.txt","r") as f:
content = f.read()
print("\n" + "=" * 60)
print("📋 DataFrame Execution Plan (first 300 chars):")
print("=" * 60)
print(content[:300])
print("...")
print("=" * 60)
✅ Saved proof/plan_df.txt
============================================================
📋 DataFrame Execution Plan (first 300 chars):
============================================================
2025-12-07 21:46:42.402778
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=10, orderBy=[count#698L DESC NULLS LAST,token#697 ASC NULLS FIRST], output=[token#697,count#698L])
+- HashAggregate(keys=[token#697], functions=[count(1)], output=[token#697, count#698L])
+- Excha
...
============================================================
In [12]:
# Cell 4: Projection experiment: select("*") vs minimal projection
print("\n" + "=" * 60)
print("📊 PROJECTION EXPERIMENT")
print("=" * 60)
# Case A: select all columns then aggregate on 'category'
print("\n🔴 Case A: select('*') then aggregate")
print("-" * 60)
all_cols = df.select("*").groupBy("category").agg(F.sum("value").alias("sum_value"))
all_cols.explain("formatted")
_ = all_cols.count() # trigger
print("\n" + "=" * 60)
# Case B: minimal projection then aggregate
print("\n🔵 Case B: minimal projection then aggregate")
print("-" * 60)
proj = df.select("category","value").groupBy("category").agg(F.sum("value").alias("sum_value"))
proj.explain("formatted")
_ = proj.count() # trigger
print("\n" + "=" * 60)
print("✅ Plans displayed above")
print("=" * 60)
============================================================
📊 PROJECTION EXPERIMENT
============================================================
🔴 Case A: select('*') then aggregate
------------------------------------------------------------
== Physical Plan ==
AdaptiveSparkPlan (9)
+- HashAggregate (8)
+- Exchange (7)
+- HashAggregate (6)
+- InMemoryTableScan (1)
+- InMemoryRelation (2)
+- Union (5)
:- Scan csv (3)
+- Scan csv (4)
(1) InMemoryTableScan
Output [2]: [category#302, value#303]
Arguments: [category#302, value#303]
(2) InMemoryRelation
Arguments: [id#301, category#302, value#303, text#304], StorageLevel(disk, memory, deserialized, 1 replicas)
(3) Scan csv
Output [4]: [id#301, category#302, value#303, text#304]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab1_dataset_a.csv]
ReadSchema: struct<id:int,category:string,value:int,text:string>
(4) Scan csv
Output [4]: [id#322, category#323, value#324, text#325]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab1_dataset_b.csv]
ReadSchema: struct<id:int,category:string,value:int,text:string>
(5) Union
(6) HashAggregate
Input [2]: [category#302, value#303]
Keys [1]: [category#302]
Functions [1]: [partial_sum(value#303)]
Aggregate Attributes [1]: [sum#1036L]
Results [2]: [category#302, sum#1037L]
(7) Exchange
Input [2]: [category#302, sum#1037L]
Arguments: hashpartitioning(category#302, 200), ENSURE_REQUIREMENTS, [plan_id=512]
(8) HashAggregate
Input [2]: [category#302, sum#1037L]
Keys [1]: [category#302]
Functions [1]: [sum(value#303)]
Aggregate Attributes [1]: [sum(value#303)#975L]
Results [2]: [category#302, sum(value#303)#975L AS sum_value#970L]
(9) AdaptiveSparkPlan
Output [2]: [category#302, sum_value#970L]
Arguments: isFinalPlan=false
============================================================
🔵 Case B: minimal projection then aggregate
------------------------------------------------------------
== Physical Plan ==
AdaptiveSparkPlan (9)
+- HashAggregate (8)
+- Exchange (7)
+- HashAggregate (6)
+- InMemoryTableScan (1)
+- InMemoryRelation (2)
+- Union (5)
:- Scan csv (3)
+- Scan csv (4)
(1) InMemoryTableScan
Output [2]: [category#302, value#303]
Arguments: [category#302, value#303]
(2) InMemoryRelation
Arguments: [id#301, category#302, value#303, text#304], StorageLevel(disk, memory, deserialized, 1 replicas)
(3) Scan csv
Output [4]: [id#301, category#302, value#303, text#304]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab1_dataset_a.csv]
ReadSchema: struct<id:int,category:string,value:int,text:string>
(4) Scan csv
Output [4]: [id#322, category#323, value#324, text#325]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab1_dataset_b.csv]
ReadSchema: struct<id:int,category:string,value:int,text:string>
(5) Union
(6) HashAggregate
Input [2]: [category#302, value#303]
Keys [1]: [category#302]
Functions [1]: [partial_sum(value#303)]
Aggregate Attributes [1]: [sum#1208L]
Results [2]: [category#302, sum#1209L]
(7) Exchange
Input [2]: [category#302, sum#1209L]
Arguments: hashpartitioning(category#302, 200), ENSURE_REQUIREMENTS, [plan_id=643]
(8) HashAggregate
Input [2]: [category#302, sum#1209L]
Keys [1]: [category#302]
Functions [1]: [sum(value#303)]
Aggregate Attributes [1]: [sum(value#303)#1147L]
Results [2]: [category#302, sum(value#303)#1147L AS sum_value#1144L]
(9) AdaptiveSparkPlan
Output [2]: [category#302, sum_value#1144L]
Arguments: isFinalPlan=false
============================================================
✅ Plans displayed above
============================================================
In [13]:
# Cell 5: Cleanup
spark.stop()
print("\n✅ Spark session stopped.")
print("=" * 60)
print("🎉 Lab 1 Practice Complete!")
print("=" * 60)
✅ Spark session stopped. ============================================================ 🎉 Lab 1 Practice Complete! ============================================================