In [1]:
from pyspark.sql import SparkSession, functions as F, types as T
from datetime import datetime as _dt
import pathlib
import os
# Initialise Spark
spark = SparkSession.builder \
.appName("de1-lab2") \
.master("local[*]") \
.config("spark.driver.memory", "8g") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
print(f"✅ Spark {spark.version} initialized")
# Répertoires
base = "data/"
os.makedirs(base, exist_ok=True)
os.makedirs("proof", exist_ok=True)
os.makedirs("outputs/lab2", exist_ok=True)
# Schémas explicites
customers_schema = T.StructType([
T.StructField("customer_id", T.IntegerType(), False),
T.StructField("name", T.StringType(), True),
T.StructField("email", T.StringType(), True),
T.StructField("created_at", T.TimestampType(), True),
])
brands_schema = T.StructType([
T.StructField("brand_id", T.IntegerType(), False),
T.StructField("brand_name", T.StringType(), True),
])
categories_schema = T.StructType([
T.StructField("category_id", T.IntegerType(), False),
T.StructField("category_name", T.StringType(), True),
])
products_schema = T.StructType([
T.StructField("product_id", T.IntegerType(), False),
T.StructField("product_name", T.StringType(), True),
T.StructField("brand_id", T.IntegerType(), True),
T.StructField("category_id", T.IntegerType(), True),
T.StructField("price", T.DoubleType(), True),
])
orders_schema = T.StructType([
T.StructField("order_id", T.IntegerType(), False),
T.StructField("customer_id", T.IntegerType(), True),
T.StructField("order_date", T.TimestampType(), True),
])
order_items_schema = T.StructType([
T.StructField("order_item_id", T.IntegerType(), False),
T.StructField("order_id", T.IntegerType(), True),
T.StructField("product_id", T.IntegerType(), True),
T.StructField("quantity", T.IntegerType(), True),
T.StructField("unit_price", T.DoubleType(), True),
])
print("✅ Tous les schémas définis")
WARNING: Using incubator modules: jdk.incubator.vector Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties 25/12/09 20:41:17 WARN Utils: Your hostname, Wandaogo, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo) 25/12/09 20:41:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/12/09 20:41:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
✅ Spark 4.0.1 initialized ✅ Tous les schémas définis
In [2]:
import pandas as pd
from datetime import datetime, timedelta
import random
print("📊 Génération des données de test...\n")
# 1. Customers
customers_data = {
"customer_id": list(range(1, 11)),
"name": ["Alice Martin", "Bob Dupont", "Caroline Lefevre", "David Laurent",
"Emma Rousseau", "Frank Moreau", "Gabrielle Petit", "Henry Bernard",
"Isabelle Dubois", "Jean-Pierre Garnier"],
"email": [f"customer{i}@email.com" for i in range(1, 11)],
"created_at": [datetime(2024, 1, 1) + timedelta(days=i*10) for i in range(10)]
}
df_customers = pd.DataFrame(customers_data)
df_customers.to_csv(f"{base}lab2_customers.csv", index=False)
print(f"✅ lab2_customers.csv: {len(df_customers)} rows")
# 2. Brands
brands_data = {
"brand_id": [1, 2, 3, 4, 5],
"brand_name": ["TechCorp", "ElectroMax", "GadgetPlus", "ProTech", "InnovateLabs"]
}
df_brands = pd.DataFrame(brands_data)
df_brands.to_csv(f"{base}lab2_brands.csv", index=False)
print(f"✅ lab2_brands.csv: {len(df_brands)} rows")
# 3. Categories
categories_data = {
"category_id": [1, 2, 3, 4, 5],
"category_name": ["Smartphones", "Laptops", "Tablets", "Accessories", "Wearables"]
}
df_categories = pd.DataFrame(categories_data)
df_categories.to_csv(f"{base}lab2_categories.csv", index=False)
print(f"✅ lab2_categories.csv: {len(df_categories)} rows")
# 4. Products
products_data = {
"product_id": list(range(1, 21)),
"product_name": [
"iPhone 15", "Samsung Galaxy S24", "Google Pixel 8", "OnePlus 12",
"MacBook Pro", "Dell XPS 15", "HP Pavilion", "Lenovo ThinkPad",
"iPad Air", "Samsung Tab S9", "Apple Watch Ultra", "Airpods Pro",
"USB-C Cable", "Phone Case", "Screen Protector", "Wireless Charger",
"Xiaomi Band 8", "Fitbit Charge 6", "AirTag", "MagSafe Mount"
],
"brand_id": [1, 2, 1, 3, 1, 2, 2, 4, 1, 2, 1, 1, 5, 5, 5, 5, 3, 4, 1, 5],
"category_id": [1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 5, 5, 4, 4, 4, 4, 5, 5, 4, 4],
"price": [999.99, 899.99, 799.99, 749.99, 1999.99, 1499.99, 699.99, 1299.99,
699.99, 599.99, 799.99, 249.99, 19.99, 14.99, 9.99, 29.99,
79.99, 149.99, 99.99, 39.99]
}
df_products = pd.DataFrame(products_data)
df_products.to_csv(f"{base}lab2_products.csv", index=False)
print(f"✅ lab2_products.csv: {len(df_products)} rows")
# 5. Orders
orders_data = {
"order_id": list(range(1, 51)),
"customer_id": [random.randint(1, 10) for _ in range(50)],
"order_date": [datetime(2024, 6, 1) + timedelta(days=random.randint(0, 180)) for _ in range(50)]
}
df_orders = pd.DataFrame(orders_data)
df_orders.to_csv(f"{base}lab2_orders.csv", index=False)
print(f"✅ lab2_orders.csv: {len(df_orders)} rows")
# 6. Order Items
order_items_data = {
"order_item_id": list(range(1, 101)),
"order_id": [random.choice(df_orders["order_id"]) for _ in range(100)],
"product_id": [random.randint(1, 20) for _ in range(100)],
"quantity": [random.randint(1, 5) for _ in range(100)],
}
df_order_items = pd.DataFrame(order_items_data)
# Ajoute unit_price depuis products
df_order_items = df_order_items.merge(df_products[["product_id", "price"]], on="product_id")
df_order_items["unit_price"] = df_order_items["price"]
df_order_items = df_order_items[["order_item_id", "order_id", "product_id", "quantity", "unit_price"]]
df_order_items.to_csv(f"{base}lab2_order_items.csv", index=False)
print(f"✅ lab2_order_items.csv: {len(df_order_items)} rows")
print("\n" + "="*60)
print("✅ Toutes les données générées avec succès!")
print("="*60)
📊 Génération des données de test... ✅ lab2_customers.csv: 10 rows ✅ lab2_brands.csv: 5 rows ✅ lab2_categories.csv: 5 rows ✅ lab2_products.csv: 20 rows ✅ lab2_orders.csv: 50 rows ✅ lab2_order_items.csv: 100 rows ============================================================ ✅ Toutes les données générées avec succès! ============================================================
In [3]:
print("\n" + "="*60)
print("📥 ÉTAPE 1: INGESTION DES DONNÉES")
print("="*60)
# Charge tous les CSV avec les schémas
customers = spark.read \
.schema(customers_schema) \
.option("header","true") \
.csv(f"{base}lab2_customers.csv")
brands = spark.read \
.schema(brands_schema) \
.option("header","true") \
.csv(f"{base}lab2_brands.csv")
categories = spark.read \
.schema(categories_schema) \
.option("header","true") \
.csv(f"{base}lab2_categories.csv")
products = spark.read \
.schema(products_schema) \
.option("header","true") \
.csv(f"{base}lab2_products.csv")
orders = spark.read \
.schema(orders_schema) \
.option("header","true") \
.csv(f"{base}lab2_orders.csv")
order_items = spark.read \
.schema(order_items_schema) \
.option("header","true") \
.csv(f"{base}lab2_order_items.csv")
# Affiche les comptages
print("\n📊 Comptages des tables opérationnelles:\n")
for name, df in [("customers", customers), ("brands", brands), ("categories", categories),
("products", products), ("orders", orders), ("order_items", order_items)]:
count = df.count()
print(f" {name:15s}: {count:4d} lignes")
print("\n" + "="*60)
# Profil des données
print("\n📈 Profils des données:\n")
print("customers:")
customers.show(3)
print("\norders:")
orders.show(3)
print("\norder_items:")
order_items.show(3)
============================================================ 📥 ÉTAPE 1: INGESTION DES DONNÉES ============================================================ 📊 Comptages des tables opérationnelles:
customers : 10 lignes brands : 5 lignes categories : 5 lignes products : 20 lignes orders : 50 lignes order_items : 100 lignes ============================================================ 📈 Profils des données: customers: +-----------+----------------+-------------------+-------------------+ |customer_id| name| email| created_at| +-----------+----------------+-------------------+-------------------+ | 1| Alice Martin|customer1@email.com|2024-01-01 00:00:00| | 2| Bob Dupont|customer2@email.com|2024-01-11 00:00:00| | 3|Caroline Lefevre|customer3@email.com|2024-01-21 00:00:00| +-----------+----------------+-------------------+-------------------+ only showing top 3 rows orders: +--------+-----------+-------------------+ |order_id|customer_id| order_date| +--------+-----------+-------------------+ | 1| 6|2024-10-01 00:00:00| | 2| 9|2024-09-05 00:00:00| | 3| 1|2024-06-23 00:00:00| +--------+-----------+-------------------+ only showing top 3 rows order_items: +-------------+--------+----------+--------+----------+ |order_item_id|order_id|product_id|quantity|unit_price| +-------------+--------+----------+--------+----------+ | 1| 49| 6| 4| 1499.99| | 2| 17| 2| 2| 899.99| | 3| 26| 18| 4| 149.99| +-------------+--------+----------+--------+----------+ only showing top 3 rows
In [4]:
print("\n" + "="*60)
print("🔍 PLAN D'EXÉCUTION - INGESTION")
print("="*60)
# Plan d'ingestion simple
ingest = orders.join(order_items, "order_id").select("order_id").distinct()
print("\n📋 Plan Spark (formaté):\n")
ingest.explain("formatted")
# Sauvegarde la preuve
with open("proof/plan_ingest.txt", "w") as f:
f.write(f"=== PLAN INGESTION ===\n")
f.write(f"Timestamp: {_dt.now()}\n\n")
f.write(ingest._jdf.queryExecution().executedPlan().toString())
print("✅ Plan sauvegardé dans proof/plan_ingest.txt")
# Sauvegarde aussi un CSV avec le plan
with open("proof/ingestion_summary.csv", "w") as f:
f.write("Opération,Lignes,Détails\n")
f.write(f"Orders,{orders.count()},Toutes les commandes\n")
f.write(f"OrderItems,{order_items.count()},Tous les articles\n")
f.write(f"Join Result,{ingest.count()},Commandes avec articles\n")
print("✅ Résumé sauvegardé dans proof/ingestion_summary.csv")
============================================================
🔍 PLAN D'EXÉCUTION - INGESTION
============================================================
📋 Plan Spark (formaté):
== Physical Plan ==
AdaptiveSparkPlan (11)
+- HashAggregate (10)
+- Exchange (9)
+- HashAggregate (8)
+- Project (7)
+- BroadcastHashJoin Inner BuildLeft (6)
:- BroadcastExchange (3)
: +- Filter (2)
: +- Scan csv (1)
+- Filter (5)
+- Scan csv (4)
(1) Scan csv
Output [1]: [order_id#13]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_orders.csv]
PushedFilters: [IsNotNull(order_id)]
ReadSchema: struct<order_id:int>
(2) Filter
Input [1]: [order_id#13]
Condition : isnotnull(order_id#13)
(3) BroadcastExchange
Input [1]: [order_id#13]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=257]
(4) Scan csv
Output [1]: [order_id#17]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_order_items.csv]
PushedFilters: [IsNotNull(order_id)]
ReadSchema: struct<order_id:int>
(5) Filter
Input [1]: [order_id#17]
Condition : isnotnull(order_id#17)
(6) BroadcastHashJoin
Left keys [1]: [order_id#13]
Right keys [1]: [order_id#17]
Join type: Inner
Join condition: None
(7) Project
Output [1]: [order_id#13]
Input [2]: [order_id#13, order_id#17]
(8) HashAggregate
Input [1]: [order_id#13]
Keys [1]: [order_id#13]
Functions: []
Aggregate Attributes: []
Results [1]: [order_id#13]
(9) Exchange
Input [1]: [order_id#13]
Arguments: hashpartitioning(order_id#13, 200), ENSURE_REQUIREMENTS, [plan_id=262]
(10) HashAggregate
Input [1]: [order_id#13]
Keys [1]: [order_id#13]
Functions: []
Aggregate Attributes: []
Results [1]: [order_id#13]
(11) AdaptiveSparkPlan
Output [1]: [order_id#13]
Arguments: isFinalPlan=false
✅ Plan sauvegardé dans proof/plan_ingest.txt
✅ Résumé sauvegardé dans proof/ingestion_summary.csv
In [5]:
print("\n" + "="*60)
print("🔑 ÉTAPE 2: FONCTION CLÉ DE SUBSTITUTION")
print("="*60)
def sk(cols):
"""
Génère une clé de substitution stable 64-bit positive
à partir de clés naturelles en utilisant xxhash64
"""
return F.abs(F.xxhash64(*[F.col(c) for c in cols]))
print("""
✅ Fonction sk() définie:
- Utilise xxhash64 pour hash stable
- Retourne valeur positive avec abs()
- Déterministe (même clé naturelle = même SK)
Exemple:
sk(["customer_id"]) → hash stable du customer_id
""")
============================================================ 🔑 ÉTAPE 2: FONCTION CLÉ DE SUBSTITUTION ============================================================ ✅ Fonction sk() définie: - Utilise xxhash64 pour hash stable - Retourne valeur positive avec abs() - Déterministe (même clé naturelle = même SK) Exemple: sk(["customer_id"]) → hash stable du customer_id
In [6]:
print("\n" + "="*60)
print("📐 ÉTAPE 3: CONSTRUCTION DES DIMENSIONS")
print("="*60)
# dim_customer
dim_customer = customers.select(
sk(["customer_id"]).alias("customer_sk"),
"customer_id",
"name",
"email",
"created_at"
)
# dim_brand
dim_brand = brands.select(
sk(["brand_id"]).alias("brand_sk"),
"brand_id",
"brand_name"
)
# dim_category
dim_category = categories.select(
sk(["category_id"]).alias("category_sk"),
"category_id",
"category_name"
)
# dim_product
dim_product = products.select(
sk(["product_id"]).alias("product_sk"),
"product_id",
"product_name",
sk(["brand_id"]).alias("brand_sk"),
sk(["category_id"]).alias("category_sk"),
"price"
)
# Affiche les résultats
print("\n📊 Dimensions créées:\n")
print(f"dim_customer: {dim_customer.count()} rows")
dim_customer.show(3)
print(f"\ndim_brand: {dim_brand.count()} rows")
dim_brand.show(3)
print(f"\ndim_category: {dim_category.count()} rows")
dim_category.show(3)
print(f"\ndim_product: {dim_product.count()} rows")
dim_product.show(3)
# Sauvegarde les statistiques
with open("proof/dimensions_summary.csv", "w") as f:
f.write("Dimension,Lignes,Colonnes\n")
f.write(f"dim_customer,{dim_customer.count()},5\n")
f.write(f"dim_brand,{dim_brand.count()},3\n")
f.write(f"dim_category,{dim_category.count()},3\n")
f.write(f"dim_product,{dim_product.count()},6\n")
print("\n✅ Dimensions sauvegardées dans proof/dimensions_summary.csv")
============================================================ 📐 ÉTAPE 3: CONSTRUCTION DES DIMENSIONS ============================================================ 📊 Dimensions créées: dim_customer: 10 rows +-------------------+-----------+----------------+-------------------+-------------------+ | customer_sk|customer_id| name| email| created_at| +-------------------+-----------+----------------+-------------------+-------------------+ |6698625589789238999| 1| Alice Martin|customer1@email.com|2024-01-01 00:00:00| |8420071140774656230| 2| Bob Dupont|customer2@email.com|2024-01-11 00:00:00| |6258084186791473711| 3|Caroline Lefevre|customer3@email.com|2024-01-21 00:00:00| +-------------------+-----------+----------------+-------------------+-------------------+ only showing top 3 rows dim_brand: 5 rows +-------------------+--------+----------+ | brand_sk|brand_id|brand_name| +-------------------+--------+----------+ |6698625589789238999| 1| TechCorp| |8420071140774656230| 2|ElectroMax| |6258084186791473711| 3|GadgetPlus| +-------------------+--------+----------+ only showing top 3 rows dim_category: 5 rows +-------------------+-----------+-------------+ | category_sk|category_id|category_name| +-------------------+-----------+-------------+ |6698625589789238999| 1| Smartphones| |8420071140774656230| 2| Laptops| |6258084186791473711| 3| Tablets| +-------------------+-----------+-------------+ only showing top 3 rows dim_product: 20 rows +-------------------+----------+------------------+-------------------+-------------------+------+ | product_sk|product_id| product_name| brand_sk| category_sk| price| +-------------------+----------+------------------+-------------------+-------------------+------+ |6698625589789238999| 1| iPhone 15|6698625589789238999|6698625589789238999|999.99| |8420071140774656230| 2|Samsung Galaxy S24|8420071140774656230|6698625589789238999|899.99| |6258084186791473711| 3| Google Pixel 8|6698625589789238999|6698625589789238999|799.99| +-------------------+----------+------------------+-------------------+-------------------+------+ only showing top 3 rows ✅ Dimensions sauvegardées dans proof/dimensions_summary.csv
In [7]:
print("\n" + "="*60)
print("📅 ÉTAPE 4: DIMENSION DATE")
print("="*60)
from pyspark.sql import Window as W
# Extrait les dates uniques
dates = orders.select(F.to_date("order_date").alias("date")).distinct()
# Construit dim_date
dim_date = dates.select(
sk(["date"]).alias("date_sk"),
F.col("date"),
F.year("date").alias("year"),
F.month("date").alias("month"),
F.dayofmonth("date").alias("day"),
F.date_format("date", "E").alias("dow"), # Day of week
F.quarter("date").alias("quarter"),
F.weekofyear("date").alias("week_of_year")
)
print(f"\n📊 dim_date: {dim_date.count()} jours uniques\n")
dim_date.show(10, truncate=False)
# Statistiques temporelles
min_date = orders.agg(F.min("order_date")).collect()[0][0]
max_date = orders.agg(F.max("order_date")).collect()[0][0]
print(f"\n📈 Plage temporelle:")
print(f" Min date: {min_date}")
print(f" Max date: {max_date}")
# Sauvegarde
with open("proof/date_dimension_summary.csv", "w") as f:
f.write("Métrique,Valeur\n")
f.write(f"Jours uniques,{dim_date.count()}\n")
f.write(f"Date min,{min_date}\n")
f.write(f"Date max,{max_date}\n")
print("\n✅ Dimension date sauvegardée")
============================================================ 📅 ÉTAPE 4: DIMENSION DATE ============================================================ 📊 dim_date: 45 jours uniques +-------------------+----------+----+-----+---+---+-------+------------+ |date_sk |date |year|month|day|dow|quarter|week_of_year| +-------------------+----------+----+-----+---+---+-------+------------+ |1307272247578339239|2024-08-27|2024|8 |27 |Tue|3 |35 | |4822692307583801426|2024-11-02|2024|11 |2 |Sat|4 |44 | |8872159615915572291|2024-10-24|2024|10 |24 |Thu|4 |43 | |7593407256401673777|2024-09-10|2024|9 |10 |Tue|3 |37 | |3799421252167961828|2024-08-05|2024|8 |5 |Mon|3 |32 | |2795412895394378798|2024-10-25|2024|10 |25 |Fri|4 |43 | |2857658719716137024|2024-07-18|2024|7 |18 |Thu|3 |29 | |5690026203634146595|2024-08-29|2024|8 |29 |Thu|3 |35 | |4425985170592395865|2024-08-07|2024|8 |7 |Wed|3 |32 | |2425830812311755070|2024-07-17|2024|7 |17 |Wed|3 |29 | +-------------------+----------+----+-----+---+---+-------+------------+ only showing top 10 rows 📈 Plage temporelle: Min date: 2024-06-14 00:00:00 Max date: 2024-11-22 00:00:00 ✅ Dimension date sauvegardée
In [10]:
# Plus propre: join puis projection immédiate
df_fact = (oi
.join(p, F.col("oi.product_id") == F.col("p.product_id"), "inner")
.join(o, F.col("oi.order_id") == F.col("o.order_id"), "inner")
.join(c, F.col("o.customer_id") == F.col("c.customer_id"), "inner")
)
# Projet immédiatement pour éviter les ambiguïtés
df_fact = df_fact.select(
F.col("oi.order_id").alias("order_id"),
F.col("oi.product_id").alias("product_id"),
F.col("oi.quantity").alias("quantity"),
F.col("oi.unit_price").alias("unit_price"),
F.col("o.customer_id").alias("customer_id"),
F.col("o.order_date").alias("order_date"),
F.col("p.price").alias("product_price")
)
# Maintenant plus d'ambiguïté!
df_fact = (df_fact
.withColumn("date", F.to_date("order_date"))
.withColumn("date_sk", sk(["date"]))
.withColumn("customer_sk", sk(["customer_id"]))
.withColumn("product_sk", sk(["product_id"]))
.withColumn("subtotal", F.col("quantity") * F.col("unit_price"))
.withColumn("year", F.year("date"))
.withColumn("month", F.month("date"))
)
In [11]:
print("\n" + "="*60)
print("📊 ÉTAPE 5: CONSTRUCTION DE LA TABLE DE FAITS (PROPRE)")
print("="*60)
# Aliases
oi = order_items.alias("oi")
p = products.alias("p")
o = orders.alias("o")
c = customers.alias("c")
print("\n🔗 Sequence de joins:")
print(" order_items (oi)")
print(" → JOIN products (p)")
print(" → JOIN orders (o)")
print(" → JOIN customers (c)")
# ÉTAPE 1: Joins
df_joined = (oi
.join(p, F.col("oi.product_id") == F.col("p.product_id"), "inner")
.join(o, F.col("oi.order_id") == F.col("o.order_id"), "inner")
.join(c, F.col("o.customer_id") == F.col("c.customer_id"), "inner")
)
# ÉTAPE 2: Projection immédiate pour éliminer l'ambiguïté
df_joined = df_joined.select(
F.col("oi.order_id").alias("order_id"),
F.col("oi.product_id").alias("product_id"),
F.col("oi.quantity").alias("quantity"),
F.col("oi.unit_price").alias("unit_price"),
F.col("o.customer_id").alias("customer_id"),
F.col("o.order_date").alias("order_date"),
F.col("p.price").alias("product_price")
)
# ÉTAPE 3: Transformations
df_fact = (df_joined
.withColumn("date", F.to_date("order_date"))
.withColumn("date_sk", sk(["date"]))
.withColumn("customer_sk", sk(["customer_id"]))
.withColumn("product_sk", sk(["product_id"]))
.withColumn("quantity", F.col("quantity").cast("int"))
.withColumn("unit_price", F.col("unit_price").cast("double"))
.withColumn("subtotal", F.col("quantity") * F.col("unit_price"))
.withColumn("year", F.year("date"))
.withColumn("month", F.month("date"))
.select(
"order_id",
"date_sk",
"customer_sk",
"product_sk",
"quantity",
"unit_price",
"subtotal",
"year",
"month"
)
)
print(f"\n✅ fact_sales construite avec succès: {df_fact.count()} lignes\n")
df_fact.show(5)
# Statistiques
print(f"\n📈 Statistiques fact_sales:")
fact_stats = df_fact.agg(
F.count("order_id").alias("nombre_orders"),
F.sum("quantity").alias("total_quantity"),
F.sum("subtotal").alias("total_gmv"),
F.avg("subtotal").alias("avg_order_value"),
F.min("subtotal").alias("min_order"),
F.max("subtotal").alias("max_order")
).collect()[0]
print(f" Nombre de commandes: {int(fact_stats['nombre_orders'])}")
print(f" Quantité totale: {int(fact_stats['total_quantity'])}")
print(f" GMV total: ${fact_stats['total_gmv']:.2f}")
print(f" AOV moyen: ${fact_stats['avg_order_value']:.2f}")
print(f" Min commande: ${fact_stats['min_order']:.2f}")
print(f" Max commande: ${fact_stats['max_order']:.2f}")
# Sauvegarde les stats
with open("proof/fact_sales_summary.csv", "w") as f:
f.write("Métrique,Valeur\n")
f.write(f"Nombre de commandes,{int(fact_stats['nombre_orders'])}\n")
f.write(f"Quantité totale,{int(fact_stats['total_quantity'])}\n")
f.write(f"GMV total,${fact_stats['total_gmv']:.2f}\n")
f.write(f"AOV moyen,${fact_stats['avg_order_value']:.2f}\n")
print("\n✅ fact_sales sauvegardée dans proof/fact_sales_summary.csv")
print("="*60)
============================================================ 📊 ÉTAPE 5: CONSTRUCTION DE LA TABLE DE FAITS (PROPRE) ============================================================ 🔗 Sequence de joins: order_items (oi) → JOIN products (p) → JOIN orders (o) → JOIN customers (c) ✅ fact_sales construite avec succès: 100 lignes +--------+-------------------+-------------------+-------------------+--------+----------+------------------+----+-----+ |order_id| date_sk| customer_sk| product_sk|quantity|unit_price| subtotal|year|month| +--------+-------------------+-------------------+-------------------+--------+----------+------------------+----+-----+ | 49|5690026203634146595|8420071140774656230| 233500712460350175| 4| 1499.99| 5999.96|2024| 8| | 17|3271524206859782347|8420071140774656230|8420071140774656230| 2| 899.99| 1799.98|2024| 10| | 26|6437972260669345058| 504019808641096632|5346497071442387076| 4| 149.99| 599.96|2024| 11| | 2|8799929592043627498|2852032610340310743|6698625589789238999| 2| 999.99| 1999.98|2024| 9| | 50|4575922205773423797|2786828215451145335|8285521376477742517| 3| 1299.99|3899.9700000000003|2024| 11| +--------+-------------------+-------------------+-------------------+--------+----------+------------------+----+-----+ only showing top 5 rows 📈 Statistiques fact_sales: Nombre de commandes: 100 Quantité totale: 295 GMV total: $196872.05 AOV moyen: $1968.72 Min commande: $9.99 Max commande: $9999.95 ✅ fact_sales sauvegardée dans proof/fact_sales_summary.csv ============================================================
In [12]:
print("\n" + "="*60)
print("🔍 PLAN D'EXÉCUTION - FACT_SALES")
print("="*60 + "\n")
df_fact.explain("formatted")
# Sauvegarde le plan
with open("proof/plan_fact_join.txt", "w") as f:
f.write(f"=== PLAN FACT_SALES ===\n")
f.write(f"Timestamp: {_dt.now()}\n\n")
f.write(df_fact._jdf.queryExecution().executedPlan().toString())
print("\n✅ Plan sauvegardé dans proof/plan_fact_join.txt")
============================================================
🔍 PLAN D'EXÉCUTION - FACT_SALES
============================================================
== Physical Plan ==
AdaptiveSparkPlan (19)
+- Project (18)
+- Project (17)
+- BroadcastHashJoin Inner BuildRight (16)
:- Project (12)
: +- BroadcastHashJoin Inner BuildRight (11)
: :- Project (7)
: : +- BroadcastHashJoin Inner BuildRight (6)
: : :- Filter (2)
: : : +- Scan csv (1)
: : +- BroadcastExchange (5)
: : +- Filter (4)
: : +- Scan csv (3)
: +- BroadcastExchange (10)
: +- Filter (9)
: +- Scan csv (8)
+- BroadcastExchange (15)
+- Filter (14)
+- Scan csv (13)
(1) Scan csv
Output [4]: [order_id#17, product_id#18, quantity#19, unit_price#20]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_order_items.csv]
PushedFilters: [IsNotNull(product_id), IsNotNull(order_id)]
ReadSchema: struct<order_id:int,product_id:int,quantity:int,unit_price:double>
(2) Filter
Input [4]: [order_id#17, product_id#18, quantity#19, unit_price#20]
Condition : (isnotnull(product_id#18) AND isnotnull(order_id#17))
(3) Scan csv
Output [1]: [product_id#8]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_products.csv]
PushedFilters: [IsNotNull(product_id)]
ReadSchema: struct<product_id:int>
(4) Filter
Input [1]: [product_id#8]
Condition : isnotnull(product_id#8)
(5) BroadcastExchange
Input [1]: [product_id#8]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=2150]
(6) BroadcastHashJoin
Left keys [1]: [product_id#18]
Right keys [1]: [product_id#8]
Join type: Inner
Join condition: None
(7) Project
Output [4]: [order_id#17, product_id#18, quantity#19, unit_price#20]
Input [5]: [order_id#17, product_id#18, quantity#19, unit_price#20, product_id#8]
(8) Scan csv
Output [3]: [order_id#13, customer_id#14, order_date#15]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_orders.csv]
PushedFilters: [IsNotNull(order_id), IsNotNull(customer_id)]
ReadSchema: struct<order_id:int,customer_id:int,order_date:timestamp>
(9) Filter
Input [3]: [order_id#13, customer_id#14, order_date#15]
Condition : (isnotnull(order_id#13) AND isnotnull(customer_id#14))
(10) BroadcastExchange
Input [3]: [order_id#13, customer_id#14, order_date#15]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=2154]
(11) BroadcastHashJoin
Left keys [1]: [order_id#17]
Right keys [1]: [order_id#13]
Join type: Inner
Join condition: None
(12) Project
Output [6]: [order_id#17, product_id#18, quantity#19, unit_price#20, customer_id#14, order_date#15]
Input [7]: [order_id#17, product_id#18, quantity#19, unit_price#20, order_id#13, customer_id#14, order_date#15]
(13) Scan csv
Output [1]: [customer_id#0]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_customers.csv]
PushedFilters: [IsNotNull(customer_id)]
ReadSchema: struct<customer_id:int>
(14) Filter
Input [1]: [customer_id#0]
Condition : isnotnull(customer_id#0)
(15) BroadcastExchange
Input [1]: [customer_id#0]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=2158]
(16) BroadcastHashJoin
Left keys [1]: [customer_id#14]
Right keys [1]: [customer_id#0]
Join type: Inner
Join condition: None
(17) Project
Output [6]: [order_id#17, product_id#18, quantity#19, unit_price#20, customer_id#14, cast(order_date#15 as date) AS date#392]
Input [7]: [order_id#17, product_id#18, quantity#19, unit_price#20, customer_id#14, order_date#15, customer_id#0]
(18) Project
Output [9]: [order_id#17, abs(xxhash64(date#392, 42)) AS date_sk#393L, abs(xxhash64(customer_id#14, 42)) AS customer_sk#394L, abs(xxhash64(product_id#18, 42)) AS product_sk#395L, quantity#19, unit_price#20, (cast(quantity#19 as double) * unit_price#20) AS subtotal#398, year(date#392) AS year#399, month(date#392) AS month#400]
Input [6]: [order_id#17, product_id#18, quantity#19, unit_price#20, customer_id#14, date#392]
(19) AdaptiveSparkPlan
Output [9]: [order_id#17, date_sk#393L, customer_sk#394L, product_sk#395L, quantity#19, unit_price#20, subtotal#398, year#399, month#400]
Arguments: isFinalPlan=false
✅ Plan sauvegardé dans proof/plan_fact_join.txt
In [13]:
print("\n" + "="*60)
print("💾 ÉTAPE 6: ÉCRITURE DES SORTIES PARQUET")
print("="*60)
base_out = "outputs/lab2"
# Crée le répertoire
os.makedirs(base_out, exist_ok=True)
# Écrit les dimensions
print("\n📝 Écriture des dimensions...")
(dim_customer.write
.mode("overwrite")
.parquet(f"{base_out}/dim_customer"))
print(f" ✅ dim_customer → {base_out}/dim_customer")
(dim_brand.write
.mode("overwrite")
.parquet(f"{base_out}/dim_brand"))
print(f" ✅ dim_brand → {base_out}/dim_brand")
(dim_category.write
.mode("overwrite")
.parquet(f"{base_out}/dim_category"))
print(f" ✅ dim_category → {base_out}/dim_category")
(dim_product.write
.mode("overwrite")
.parquet(f"{base_out}/dim_product"))
print(f" ✅ dim_product → {base_out}/dim_product")
(dim_date.write
.mode("overwrite")
.parquet(f"{base_out}/dim_date"))
print(f" ✅ dim_date → {base_out}/dim_date")
# Écrit la table de faits (partitionnée par year, month)
print("\n📝 Écriture de la table de faits (partitionnée par year/month)...")
(df_fact.write
.mode("overwrite")
.partitionBy("year", "month")
.parquet(f"{base_out}/fact_sales"))
print(f" ✅ fact_sales → {base_out}/fact_sales")
print("\n" + "="*60)
print("✅ Toutes les sorties Parquet écrites avec succès!")
print("="*60)
# Vérifie les fichiers
import subprocess
result = subprocess.run(f"ls -lhR {base_out}", shell=True, capture_output=True, text=True)
print("\n📂 Structure des sorties:")
print(result.stdout)
============================================================ 💾 ÉTAPE 6: ÉCRITURE DES SORTIES PARQUET ============================================================ 📝 Écriture des dimensions...
✅ dim_customer → outputs/lab2/dim_customer ✅ dim_brand → outputs/lab2/dim_brand ✅ dim_category → outputs/lab2/dim_category ✅ dim_product → outputs/lab2/dim_product ✅ dim_date → outputs/lab2/dim_date 📝 Écriture de la table de faits (partitionnée par year/month)... ✅ fact_sales → outputs/lab2/fact_sales ============================================================ ✅ Toutes les sorties Parquet écrites avec succès! ============================================================ 📂 Structure des sorties: outputs/lab2: total 24K drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec 9 20:49 dim_brand drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec 9 20:49 dim_category drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec 9 20:49 dim_customer drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec 9 20:49 dim_date drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec 9 20:49 dim_product drwxr-xr-x 3 bibawandaogo bibawandaogo 4.0K Dec 9 20:49 fact_sales outputs/lab2/dim_brand: total 4.0K -rw-r--r-- 1 bibawandaogo bibawandaogo 0 Dec 9 20:49 _SUCCESS -rw-r--r-- 1 bibawandaogo bibawandaogo 1.2K Dec 9 20:49 part-00000-fc4094ef-c911-499c-988e-18aeb0944a64-c000.snappy.parquet outputs/lab2/dim_category: total 4.0K -rw-r--r-- 1 bibawandaogo bibawandaogo 0 Dec 9 20:49 _SUCCESS -rw-r--r-- 1 bibawandaogo bibawandaogo 1.2K Dec 9 20:49 part-00000-1402ab54-95f4-4aec-9019-d4e590fa5d12-c000.snappy.parquet outputs/lab2/dim_customer: total 4.0K -rw-r--r-- 1 bibawandaogo bibawandaogo 0 Dec 9 20:49 _SUCCESS -rw-r--r-- 1 bibawandaogo bibawandaogo 2.0K Dec 9 20:49 part-00000-79204bd6-e80c-4244-8b9e-8bdcef05a561-c000.snappy.parquet outputs/lab2/dim_date: total 4.0K -rw-r--r-- 1 bibawandaogo bibawandaogo 0 Dec 9 20:49 _SUCCESS -rw-r--r-- 1 bibawandaogo bibawandaogo 3.3K Dec 9 20:49 part-00000-e31640ee-0680-4931-b223-66cf148aa595-c000.snappy.parquet outputs/lab2/dim_product: total 4.0K -rw-r--r-- 1 bibawandaogo bibawandaogo 0 Dec 9 20:49 _SUCCESS -rw-r--r-- 1 bibawandaogo bibawandaogo 2.7K Dec 9 20:49 part-00000-d65d61a9-9702-4815-8bdd-4fad64667a61-c000.snappy.parquet outputs/lab2/fact_sales: total 4.0K -rw-r--r-- 1 bibawandaogo bibawandaogo 0 Dec 9 20:49 _SUCCESS drwxr-xr-x 8 bibawandaogo bibawandaogo 4.0K Dec 9 20:49 year=2024 outputs/lab2/fact_sales/year=2024: total 24K drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec 9 20:49 month=10 drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec 9 20:49 month=11 drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec 9 20:49 month=6 drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec 9 20:49 month=7 drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec 9 20:49 month=8 drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec 9 20:49 month=9 outputs/lab2/fact_sales/year=2024/month=10: total 4.0K -rw-r--r-- 1 bibawandaogo bibawandaogo 2.8K Dec 9 20:49 part-00000-6f995a37-e29f-4269-a02d-43c701036080.c000.snappy.parquet outputs/lab2/fact_sales/year=2024/month=11: total 4.0K -rw-r--r-- 1 bibawandaogo bibawandaogo 2.9K Dec 9 20:49 part-00000-6f995a37-e29f-4269-a02d-43c701036080.c000.snappy.parquet outputs/lab2/fact_sales/year=2024/month=6: total 4.0K -rw-r--r-- 1 bibawandaogo bibawandaogo 2.6K Dec 9 20:49 part-00000-6f995a37-e29f-4269-a02d-43c701036080.c000.snappy.parquet outputs/lab2/fact_sales/year=2024/month=7: total 4.0K -rw-r--r-- 1 bibawandaogo bibawandaogo 2.6K Dec 9 20:49 part-00000-6f995a37-e29f-4269-a02d-43c701036080.c000.snappy.parquet outputs/lab2/fact_sales/year=2024/month=8: total 4.0K -rw-r--r-- 1 bibawandaogo bibawandaogo 2.8K Dec 9 20:49 part-00000-6f995a37-e29f-4269-a02d-43c701036080.c000.snappy.parquet outputs/lab2/fact_sales/year=2024/month=9: total 4.0K -rw-r--r-- 1 bibawandaogo bibawandaogo 2.8K Dec 9 20:49 part-00000-6f995a37-e29f-4269-a02d-43c701036080.c000.snappy.parquet
In [14]:
print("\n" + "="*60)
print("⚡ ÉTAPE 7: OPTIMISATION - PROJECTION")
print("="*60)
import time
# ========== CAS A: JOIN PUIS PROJECT (TARDIF) ==========
print("\n📊 CAS A: Join puis projection (projection tardive)")
print("-" * 60)
start_a = time.time()
df_a = (orders.join(order_items, "order_id")
.join(products, "product_id")
.groupBy(F.to_date("order_date").alias("d"))
.agg(F.sum(F.col("quantity") * F.col("price")).alias("gmv")))
print("\n🔍 Plan Spark (CAS A):\n")
df_a.explain("formatted")
count_a = df_a.count()
time_a = time.time() - start_a
print(f"\n📈 Résultats CAS A:")
print(f" Lignes: {count_a}")
print(f" Temps: {time_a:.3f}s")
df_a.show()
# Sauvegarde le plan
with open("proof/plan_case_a_late_projection.txt", "w") as f:
f.write(f"=== CAS A: PROJECTION TARDIVE ===\n")
f.write(f"Timestamp: {_dt.now()}\n")
f.write(f"Durée: {time_a:.3f}s\n\n")
f.write(df_a._jdf.queryExecution().executedPlan().toString())
# ========== CAS B: PROJECT PUIS JOIN (PRÉCOCE) ==========
print("\n📊 CAS B: Projection puis join (projection précoce)")
print("-" * 60)
start_b = time.time()
df_b = (orders.select("order_id", "order_date")
.join(order_items.select("order_id", "product_id", "quantity"), "order_id")
.join(products.select("product_id", "price"), "product_id")
.groupBy(F.to_date("order_date").alias("d"))
.agg(F.sum(F.col("quantity") * F.col("price")).alias("gmv")))
print("\n🔍 Plan Spark (CAS B):\n")
df_b.explain("formatted")
count_b = df_b.count()
time_b = time.time() - start_b
print(f"\n📈 Résultats CAS B:")
print(f" Lignes: {count_b}")
print(f" Temps: {time_b:.3f}s")
df_b.show()
# Sauvegarde le plan
with open("proof/plan_case_b_early_projection.txt", "w") as f:
f.write(f"=== CAS B: PROJECTION PRÉCOCE ===\n")
f.write(f"Timestamp: {_dt.now()}\n")
f.write(f"Durée: {time_b:.3f}s\n\n")
f.write(df_b._jdf.queryExecution().executedPlan().toString())
# ========== COMPARAISON ==========
print("\n" + "="*60)
print("📊 COMPARAISON CAS A vs CAS B")
print("="*60)
improvement = ((time_a - time_b) / time_a * 100) if time_a > 0 else 0
print(f"\nProj Tardive (A): {time_a:.3f}s")
print(f"Proj Précoce (B): {time_b:.3f}s")
print(f"Amélioration: {improvement:+.1f}%")
if time_b < time_a:
print(f"\n✅ Projection PRÉCOCE est {time_a/time_b:.2f}x plus rapide!")
else:
print(f"\n⚠️ Les performances sont similaires (small dataset)")
# Sauvegarde les métriques
with open("proof/projection_comparison.csv", "w") as f:
f.write("Cas,Approche,Temps(s),Lignes,Amélioration(%)\n")
f.write(f"A,Projection Tardive,{time_a:.3f},{count_a},{0:.1f}\n")
f.write(f"B,Projection Précoce,{time_b:.3f},{count_b},{improvement:.1f}\n")
print("\n✅ Métriques sauvegardées dans proof/projection_comparison.csv")
============================================================
⚡ ÉTAPE 7: OPTIMISATION - PROJECTION
============================================================
📊 CAS A: Join puis projection (projection tardive)
------------------------------------------------------------
🔍 Plan Spark (CAS A):
== Physical Plan ==
AdaptiveSparkPlan (16)
+- HashAggregate (15)
+- Exchange (14)
+- HashAggregate (13)
+- Project (12)
+- BroadcastHashJoin Inner BuildRight (11)
:- Project (7)
: +- BroadcastHashJoin Inner BuildLeft (6)
: :- BroadcastExchange (3)
: : +- Filter (2)
: : +- Scan csv (1)
: +- Filter (5)
: +- Scan csv (4)
+- BroadcastExchange (10)
+- Filter (9)
+- Scan csv (8)
(1) Scan csv
Output [2]: [order_id#13, order_date#15]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_orders.csv]
PushedFilters: [IsNotNull(order_id)]
ReadSchema: struct<order_id:int,order_date:timestamp>
(2) Filter
Input [2]: [order_id#13, order_date#15]
Condition : isnotnull(order_id#13)
(3) BroadcastExchange
Input [2]: [order_id#13, order_date#15]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=2675]
(4) Scan csv
Output [3]: [order_id#17, product_id#18, quantity#19]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_order_items.csv]
PushedFilters: [IsNotNull(order_id), IsNotNull(product_id)]
ReadSchema: struct<order_id:int,product_id:int,quantity:int>
(5) Filter
Input [3]: [order_id#17, product_id#18, quantity#19]
Condition : (isnotnull(order_id#17) AND isnotnull(product_id#18))
(6) BroadcastHashJoin
Left keys [1]: [order_id#13]
Right keys [1]: [order_id#17]
Join type: Inner
Join condition: None
(7) Project
Output [3]: [order_date#15, product_id#18, quantity#19]
Input [5]: [order_id#13, order_date#15, order_id#17, product_id#18, quantity#19]
(8) Scan csv
Output [2]: [product_id#8, price#12]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_products.csv]
PushedFilters: [IsNotNull(product_id)]
ReadSchema: struct<product_id:int,price:double>
(9) Filter
Input [2]: [product_id#8, price#12]
Condition : isnotnull(product_id#8)
(10) BroadcastExchange
Input [2]: [product_id#8, price#12]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=2679]
(11) BroadcastHashJoin
Left keys [1]: [product_id#18]
Right keys [1]: [product_id#8]
Join type: Inner
Join condition: None
(12) Project
Output [3]: [quantity#19, price#12, cast(order_date#15 as date) AS _groupingexpression#558]
Input [5]: [order_date#15, product_id#18, quantity#19, product_id#8, price#12]
(13) HashAggregate
Input [3]: [quantity#19, price#12, _groupingexpression#558]
Keys [1]: [_groupingexpression#558]
Functions [1]: [partial_sum((cast(quantity#19 as double) * price#12))]
Aggregate Attributes [1]: [sum#559]
Results [2]: [_groupingexpression#558, sum#560]
(14) Exchange
Input [2]: [_groupingexpression#558, sum#560]
Arguments: hashpartitioning(_groupingexpression#558, 200), ENSURE_REQUIREMENTS, [plan_id=2684]
(15) HashAggregate
Input [2]: [_groupingexpression#558, sum#560]
Keys [1]: [_groupingexpression#558]
Functions [1]: [sum((cast(quantity#19 as double) * price#12))]
Aggregate Attributes [1]: [sum((cast(quantity#19 as double) * price#12))#557]
Results [2]: [_groupingexpression#558 AS d#544, sum((cast(quantity#19 as double) * price#12))#557 AS gmv#545]
(16) AdaptiveSparkPlan
Output [2]: [d#544, gmv#545]
Arguments: isFinalPlan=false
📈 Résultats CAS A:
Lignes: 37
Temps: 1.089s
+----------+------------------+
| d| gmv|
+----------+------------------+
|2024-08-27| 3909.94|
|2024-10-24|5539.9400000000005|
|2024-11-02| 2324.92|
|2024-09-10| 5259.94|
|2024-08-05| 1399.98|
|2024-08-29|19749.789999999997|
|2024-08-07| 39.99|
|2024-11-05|15599.880000000001|
|2024-06-23|16699.910000000003|
|2024-08-15| 3559.91|
|2024-09-12| 949.97|
|2024-11-07| 3569.92|
|2024-09-25|13679.779999999999|
|2024-10-01| 1399.96|
|2024-09-21| 1859.95|
|2024-11-11| 819.94|
|2024-10-15| 6649.87|
|2024-09-09| 699.99|
|2024-10-28|2099.9700000000003|
|2024-09-05| 12979.89|
+----------+------------------+
only showing top 20 rows
📊 CAS B: Projection puis join (projection précoce)
------------------------------------------------------------
🔍 Plan Spark (CAS B):
== Physical Plan ==
AdaptiveSparkPlan (16)
+- HashAggregate (15)
+- Exchange (14)
+- HashAggregate (13)
+- Project (12)
+- BroadcastHashJoin Inner BuildRight (11)
:- Project (7)
: +- BroadcastHashJoin Inner BuildLeft (6)
: :- BroadcastExchange (3)
: : +- Filter (2)
: : +- Scan csv (1)
: +- Filter (5)
: +- Scan csv (4)
+- BroadcastExchange (10)
+- Filter (9)
+- Scan csv (8)
(1) Scan csv
Output [2]: [order_id#13, order_date#15]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_orders.csv]
PushedFilters: [IsNotNull(order_id)]
ReadSchema: struct<order_id:int,order_date:timestamp>
(2) Filter
Input [2]: [order_id#13, order_date#15]
Condition : isnotnull(order_id#13)
(3) BroadcastExchange
Input [2]: [order_id#13, order_date#15]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=3299]
(4) Scan csv
Output [3]: [order_id#17, product_id#18, quantity#19]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_order_items.csv]
PushedFilters: [IsNotNull(order_id), IsNotNull(product_id)]
ReadSchema: struct<order_id:int,product_id:int,quantity:int>
(5) Filter
Input [3]: [order_id#17, product_id#18, quantity#19]
Condition : (isnotnull(order_id#17) AND isnotnull(product_id#18))
(6) BroadcastHashJoin
Left keys [1]: [order_id#13]
Right keys [1]: [order_id#17]
Join type: Inner
Join condition: None
(7) Project
Output [3]: [order_date#15, product_id#18, quantity#19]
Input [5]: [order_id#13, order_date#15, order_id#17, product_id#18, quantity#19]
(8) Scan csv
Output [2]: [product_id#8, price#12]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_products.csv]
PushedFilters: [IsNotNull(product_id)]
ReadSchema: struct<product_id:int,price:double>
(9) Filter
Input [2]: [product_id#8, price#12]
Condition : isnotnull(product_id#8)
(10) BroadcastExchange
Input [2]: [product_id#8, price#12]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=3303]
(11) BroadcastHashJoin
Left keys [1]: [product_id#18]
Right keys [1]: [product_id#8]
Join type: Inner
Join condition: None
(12) Project
Output [3]: [quantity#19, price#12, cast(order_date#15 as date) AS _groupingexpression#599]
Input [5]: [order_date#15, product_id#18, quantity#19, product_id#8, price#12]
(13) HashAggregate
Input [3]: [quantity#19, price#12, _groupingexpression#599]
Keys [1]: [_groupingexpression#599]
Functions [1]: [partial_sum((cast(quantity#19 as double) * price#12))]
Aggregate Attributes [1]: [sum#600]
Results [2]: [_groupingexpression#599, sum#601]
(14) Exchange
Input [2]: [_groupingexpression#599, sum#601]
Arguments: hashpartitioning(_groupingexpression#599, 200), ENSURE_REQUIREMENTS, [plan_id=3308]
(15) HashAggregate
Input [2]: [_groupingexpression#599, sum#601]
Keys [1]: [_groupingexpression#599]
Functions [1]: [sum((cast(quantity#19 as double) * price#12))]
Aggregate Attributes [1]: [sum((cast(quantity#19 as double) * price#12))#598]
Results [2]: [_groupingexpression#599 AS d#591, sum((cast(quantity#19 as double) * price#12))#598 AS gmv#592]
(16) AdaptiveSparkPlan
Output [2]: [d#591, gmv#592]
Arguments: isFinalPlan=false
📈 Résultats CAS B:
Lignes: 37
Temps: 0.764s
+----------+------------------+
| d| gmv|
+----------+------------------+
|2024-08-27| 3909.94|
|2024-10-24|5539.9400000000005|
|2024-11-02| 2324.92|
|2024-09-10| 5259.94|
|2024-08-05| 1399.98|
|2024-08-29|19749.789999999997|
|2024-08-07| 39.99|
|2024-11-05|15599.880000000001|
|2024-06-23|16699.910000000003|
|2024-08-15| 3559.91|
|2024-09-12| 949.97|
|2024-11-07| 3569.92|
|2024-09-25|13679.779999999999|
|2024-10-01| 1399.96|
|2024-09-21| 1859.95|
|2024-11-11| 819.94|
|2024-10-15| 6649.87|
|2024-09-09| 699.99|
|2024-10-28|2099.9700000000003|
|2024-09-05| 12979.89|
+----------+------------------+
only showing top 20 rows
============================================================
📊 COMPARAISON CAS A vs CAS B
============================================================
Proj Tardive (A): 1.089s
Proj Précoce (B): 0.764s
Amélioration: +29.8%
✅ Projection PRÉCOCE est 1.42x plus rapide!
✅ Métriques sauvegardées dans proof/projection_comparison.csv
In [15]:
print("\n" + "="*60)
print("📋 ÉTAPE 8: RÉSUMÉ FINAL")
print("="*60)
summary_metrics = {
"Timestamp": str(_dt.now()),
"Spark Version": spark.version,
"Total Customers": dim_customer.count(),
"Total Brands": dim_brand.count(),
"Total Categories": dim_category.count(),
"Total Products": dim_product.count(),
"Total Dates": dim_date.count(),
"Total Orders (Facts)": df_fact.count(),
"Total GMV": fact_stats['total_gmv'],
"Avg Order Value": fact_stats['avg_order_value'],
}
# Affiche le résumé
print("\n📊 MÉTRIQUES CLÉS:\n")
for key, value in summary_metrics.items():
if isinstance(value, float):
print(f" {key:25s}: ${value:,.2f}")
else:
print(f" {key:25s}: {value}")
# Sauvegarde dans CSV
with open("proof/lab2_metrics_final.csv", "w") as f:
f.write("Métrique,Valeur\n")
for key, value in summary_metrics.items():
f.write(f"{key},{value}\n")
print("\n✅ Métriques sauvegardées dans proof/lab2_metrics_final.csv")
# Liste tous les fichiers de preuve
print("\n" + "="*60)
print("📁 FICHIERS DE PREUVE GÉNÉRÉS:")
print("="*60)
proof_files = os.listdir("proof")
for i, file in enumerate(sorted(proof_files), 1):
size = os.path.getsize(f"proof/{file}")
print(f" {i}. proof/{file} ({size:,} bytes)")
print("\n" + "="*60)
# Arrête Spark
spark.stop()
print("✅ Session Spark arrêtée")
print("\n🎉 LAB 2 PRACTICE TERMINÉ AVEC SUCCÈS!")
print("="*60)
============================================================ 📋 ÉTAPE 8: RÉSUMÉ FINAL ============================================================ 📊 MÉTRIQUES CLÉS: Timestamp : 2025-12-09 20:51:27.883443 Spark Version : 4.0.1 Total Customers : 10 Total Brands : 5 Total Categories : 5 Total Products : 20 Total Dates : 45 Total Orders (Facts) : 100 Total GMV : $196,872.05 Avg Order Value : $1,968.72 ✅ Métriques sauvegardées dans proof/lab2_metrics_final.csv ============================================================ 📁 FICHIERS DE PREUVE GÉNÉRÉS: ============================================================ 1. proof/date_dimension_summary.csv (92 bytes) 2. proof/dimensions_summary.csv (92 bytes) 3. proof/fact_sales_summary.csv (102 bytes) 4. proof/ingestion_summary.csv (130 bytes) 5. proof/lab2_metrics_final.csv (249 bytes) 6. proof/plan_case_a_late_projection.txt (2,462 bytes) 7. proof/plan_case_b_early_projection.txt (2,463 bytes) 8. proof/plan_df.txt (1,535 bytes) 9. proof/plan_fact_join.txt (3,221 bytes) 10. proof/plan_formatted.txt (715 bytes) 11. proof/plan_ingest.txt (1,317 bytes) 12. proof/plan_rdd.txt (883 bytes) 13. proof/projection_comparison.csv (116 bytes) ============================================================ ✅ Session Spark arrêtée 🎉 LAB 2 PRACTICE TERMINÉ AVEC SUCCÈS! ============================================================