In [1]:
from pyspark.sql import SparkSession, functions as F, types as T
from datetime import datetime as _dt
import pathlib
import os

# Initialise Spark
spark = SparkSession.builder \
    .appName("de1-lab2") \
    .master("local[*]") \
    .config("spark.driver.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

print(f"✅ Spark {spark.version} initialized")

# Répertoires
base = "data/"
os.makedirs(base, exist_ok=True)
os.makedirs("proof", exist_ok=True)
os.makedirs("outputs/lab2", exist_ok=True)

# Schémas explicites
customers_schema = T.StructType([
    T.StructField("customer_id", T.IntegerType(), False),
    T.StructField("name", T.StringType(), True),
    T.StructField("email", T.StringType(), True),
    T.StructField("created_at", T.TimestampType(), True),
])

brands_schema = T.StructType([
    T.StructField("brand_id", T.IntegerType(), False),
    T.StructField("brand_name", T.StringType(), True),
])

categories_schema = T.StructType([
    T.StructField("category_id", T.IntegerType(), False),
    T.StructField("category_name", T.StringType(), True),
])

products_schema = T.StructType([
    T.StructField("product_id", T.IntegerType(), False),
    T.StructField("product_name", T.StringType(), True),
    T.StructField("brand_id", T.IntegerType(), True),
    T.StructField("category_id", T.IntegerType(), True),
    T.StructField("price", T.DoubleType(), True),
])

orders_schema = T.StructType([
    T.StructField("order_id", T.IntegerType(), False),
    T.StructField("customer_id", T.IntegerType(), True),
    T.StructField("order_date", T.TimestampType(), True),
])

order_items_schema = T.StructType([
    T.StructField("order_item_id", T.IntegerType(), False),
    T.StructField("order_id", T.IntegerType(), True),
    T.StructField("product_id", T.IntegerType(), True),
    T.StructField("quantity", T.IntegerType(), True),
    T.StructField("unit_price", T.DoubleType(), True),
])

print("✅ Tous les schémas définis")
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/09 20:41:17 WARN Utils: Your hostname, Wandaogo, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/12/09 20:41:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/09 20:41:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
✅ Spark 4.0.1 initialized
✅ Tous les schémas définis
In [2]:
import pandas as pd
from datetime import datetime, timedelta
import random

print("📊 Génération des données de test...\n")

# 1. Customers
customers_data = {
    "customer_id": list(range(1, 11)),
    "name": ["Alice Martin", "Bob Dupont", "Caroline Lefevre", "David Laurent", 
             "Emma Rousseau", "Frank Moreau", "Gabrielle Petit", "Henry Bernard",
             "Isabelle Dubois", "Jean-Pierre Garnier"],
    "email": [f"customer{i}@email.com" for i in range(1, 11)],
    "created_at": [datetime(2024, 1, 1) + timedelta(days=i*10) for i in range(10)]
}
df_customers = pd.DataFrame(customers_data)
df_customers.to_csv(f"{base}lab2_customers.csv", index=False)
print(f"✅ lab2_customers.csv: {len(df_customers)} rows")

# 2. Brands
brands_data = {
    "brand_id": [1, 2, 3, 4, 5],
    "brand_name": ["TechCorp", "ElectroMax", "GadgetPlus", "ProTech", "InnovateLabs"]
}
df_brands = pd.DataFrame(brands_data)
df_brands.to_csv(f"{base}lab2_brands.csv", index=False)
print(f"✅ lab2_brands.csv: {len(df_brands)} rows")

# 3. Categories
categories_data = {
    "category_id": [1, 2, 3, 4, 5],
    "category_name": ["Smartphones", "Laptops", "Tablets", "Accessories", "Wearables"]
}
df_categories = pd.DataFrame(categories_data)
df_categories.to_csv(f"{base}lab2_categories.csv", index=False)
print(f"✅ lab2_categories.csv: {len(df_categories)} rows")

# 4. Products
products_data = {
    "product_id": list(range(1, 21)),
    "product_name": [
        "iPhone 15", "Samsung Galaxy S24", "Google Pixel 8", "OnePlus 12",
        "MacBook Pro", "Dell XPS 15", "HP Pavilion", "Lenovo ThinkPad",
        "iPad Air", "Samsung Tab S9", "Apple Watch Ultra", "Airpods Pro",
        "USB-C Cable", "Phone Case", "Screen Protector", "Wireless Charger",
        "Xiaomi Band 8", "Fitbit Charge 6", "AirTag", "MagSafe Mount"
    ],
    "brand_id": [1, 2, 1, 3, 1, 2, 2, 4, 1, 2, 1, 1, 5, 5, 5, 5, 3, 4, 1, 5],
    "category_id": [1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 5, 5, 4, 4, 4, 4, 5, 5, 4, 4],
    "price": [999.99, 899.99, 799.99, 749.99, 1999.99, 1499.99, 699.99, 1299.99,
              699.99, 599.99, 799.99, 249.99, 19.99, 14.99, 9.99, 29.99,
              79.99, 149.99, 99.99, 39.99]
}
df_products = pd.DataFrame(products_data)
df_products.to_csv(f"{base}lab2_products.csv", index=False)
print(f"✅ lab2_products.csv: {len(df_products)} rows")

# 5. Orders
orders_data = {
    "order_id": list(range(1, 51)),
    "customer_id": [random.randint(1, 10) for _ in range(50)],
    "order_date": [datetime(2024, 6, 1) + timedelta(days=random.randint(0, 180)) for _ in range(50)]
}
df_orders = pd.DataFrame(orders_data)
df_orders.to_csv(f"{base}lab2_orders.csv", index=False)
print(f"✅ lab2_orders.csv: {len(df_orders)} rows")

# 6. Order Items
order_items_data = {
    "order_item_id": list(range(1, 101)),
    "order_id": [random.choice(df_orders["order_id"]) for _ in range(100)],
    "product_id": [random.randint(1, 20) for _ in range(100)],
    "quantity": [random.randint(1, 5) for _ in range(100)],
}
df_order_items = pd.DataFrame(order_items_data)

# Ajoute unit_price depuis products
df_order_items = df_order_items.merge(df_products[["product_id", "price"]], on="product_id")
df_order_items["unit_price"] = df_order_items["price"]
df_order_items = df_order_items[["order_item_id", "order_id", "product_id", "quantity", "unit_price"]]
df_order_items.to_csv(f"{base}lab2_order_items.csv", index=False)
print(f"✅ lab2_order_items.csv: {len(df_order_items)} rows")

print("\n" + "="*60)
print("✅ Toutes les données générées avec succès!")
print("="*60)
📊 Génération des données de test...

✅ lab2_customers.csv: 10 rows
✅ lab2_brands.csv: 5 rows
✅ lab2_categories.csv: 5 rows
✅ lab2_products.csv: 20 rows
✅ lab2_orders.csv: 50 rows
✅ lab2_order_items.csv: 100 rows

============================================================
✅ Toutes les données générées avec succès!
============================================================
In [3]:
print("\n" + "="*60)
print("📥 ÉTAPE 1: INGESTION DES DONNÉES")
print("="*60)

# Charge tous les CSV avec les schémas
customers = spark.read \
    .schema(customers_schema) \
    .option("header","true") \
    .csv(f"{base}lab2_customers.csv")

brands = spark.read \
    .schema(brands_schema) \
    .option("header","true") \
    .csv(f"{base}lab2_brands.csv")

categories = spark.read \
    .schema(categories_schema) \
    .option("header","true") \
    .csv(f"{base}lab2_categories.csv")

products = spark.read \
    .schema(products_schema) \
    .option("header","true") \
    .csv(f"{base}lab2_products.csv")

orders = spark.read \
    .schema(orders_schema) \
    .option("header","true") \
    .csv(f"{base}lab2_orders.csv")

order_items = spark.read \
    .schema(order_items_schema) \
    .option("header","true") \
    .csv(f"{base}lab2_order_items.csv")

# Affiche les comptages
print("\n📊 Comptages des tables opérationnelles:\n")
for name, df in [("customers", customers), ("brands", brands), ("categories", categories),
                  ("products", products), ("orders", orders), ("order_items", order_items)]:
    count = df.count()
    print(f"   {name:15s}: {count:4d} lignes")

print("\n" + "="*60)

# Profil des données
print("\n📈 Profils des données:\n")
print("customers:")
customers.show(3)
print("\norders:")
orders.show(3)
print("\norder_items:")
order_items.show(3)
============================================================
📥 ÉTAPE 1: INGESTION DES DONNÉES
============================================================

📊 Comptages des tables opérationnelles:

                                                                                
   customers      :   10 lignes
   brands         :    5 lignes
   categories     :    5 lignes
   products       :   20 lignes
   orders         :   50 lignes
   order_items    :  100 lignes

============================================================

📈 Profils des données:

customers:
+-----------+----------------+-------------------+-------------------+
|customer_id|            name|              email|         created_at|
+-----------+----------------+-------------------+-------------------+
|          1|    Alice Martin|customer1@email.com|2024-01-01 00:00:00|
|          2|      Bob Dupont|customer2@email.com|2024-01-11 00:00:00|
|          3|Caroline Lefevre|customer3@email.com|2024-01-21 00:00:00|
+-----------+----------------+-------------------+-------------------+
only showing top 3 rows

orders:
+--------+-----------+-------------------+
|order_id|customer_id|         order_date|
+--------+-----------+-------------------+
|       1|          6|2024-10-01 00:00:00|
|       2|          9|2024-09-05 00:00:00|
|       3|          1|2024-06-23 00:00:00|
+--------+-----------+-------------------+
only showing top 3 rows

order_items:
+-------------+--------+----------+--------+----------+
|order_item_id|order_id|product_id|quantity|unit_price|
+-------------+--------+----------+--------+----------+
|            1|      49|         6|       4|   1499.99|
|            2|      17|         2|       2|    899.99|
|            3|      26|        18|       4|    149.99|
+-------------+--------+----------+--------+----------+
only showing top 3 rows
In [4]:
print("\n" + "="*60)
print("🔍 PLAN D'EXÉCUTION - INGESTION")
print("="*60)

# Plan d'ingestion simple
ingest = orders.join(order_items, "order_id").select("order_id").distinct()

print("\n📋 Plan Spark (formaté):\n")
ingest.explain("formatted")

# Sauvegarde la preuve
with open("proof/plan_ingest.txt", "w") as f:
    f.write(f"=== PLAN INGESTION ===\n")
    f.write(f"Timestamp: {_dt.now()}\n\n")
    f.write(ingest._jdf.queryExecution().executedPlan().toString())

print("✅ Plan sauvegardé dans proof/plan_ingest.txt")

# Sauvegarde aussi un CSV avec le plan
with open("proof/ingestion_summary.csv", "w") as f:
    f.write("Opération,Lignes,Détails\n")
    f.write(f"Orders,{orders.count()},Toutes les commandes\n")
    f.write(f"OrderItems,{order_items.count()},Tous les articles\n")
    f.write(f"Join Result,{ingest.count()},Commandes avec articles\n")

print("✅ Résumé sauvegardé dans proof/ingestion_summary.csv")
============================================================
🔍 PLAN D'EXÉCUTION - INGESTION
============================================================

📋 Plan Spark (formaté):

== Physical Plan ==
AdaptiveSparkPlan (11)
+- HashAggregate (10)
   +- Exchange (9)
      +- HashAggregate (8)
         +- Project (7)
            +- BroadcastHashJoin Inner BuildLeft (6)
               :- BroadcastExchange (3)
               :  +- Filter (2)
               :     +- Scan csv  (1)
               +- Filter (5)
                  +- Scan csv  (4)


(1) Scan csv 
Output [1]: [order_id#13]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_orders.csv]
PushedFilters: [IsNotNull(order_id)]
ReadSchema: struct<order_id:int>

(2) Filter
Input [1]: [order_id#13]
Condition : isnotnull(order_id#13)

(3) BroadcastExchange
Input [1]: [order_id#13]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=257]

(4) Scan csv 
Output [1]: [order_id#17]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_order_items.csv]
PushedFilters: [IsNotNull(order_id)]
ReadSchema: struct<order_id:int>

(5) Filter
Input [1]: [order_id#17]
Condition : isnotnull(order_id#17)

(6) BroadcastHashJoin
Left keys [1]: [order_id#13]
Right keys [1]: [order_id#17]
Join type: Inner
Join condition: None

(7) Project
Output [1]: [order_id#13]
Input [2]: [order_id#13, order_id#17]

(8) HashAggregate
Input [1]: [order_id#13]
Keys [1]: [order_id#13]
Functions: []
Aggregate Attributes: []
Results [1]: [order_id#13]

(9) Exchange
Input [1]: [order_id#13]
Arguments: hashpartitioning(order_id#13, 200), ENSURE_REQUIREMENTS, [plan_id=262]

(10) HashAggregate
Input [1]: [order_id#13]
Keys [1]: [order_id#13]
Functions: []
Aggregate Attributes: []
Results [1]: [order_id#13]

(11) AdaptiveSparkPlan
Output [1]: [order_id#13]
Arguments: isFinalPlan=false


✅ Plan sauvegardé dans proof/plan_ingest.txt
✅ Résumé sauvegardé dans proof/ingestion_summary.csv
In [5]:
print("\n" + "="*60)
print("🔑 ÉTAPE 2: FONCTION CLÉ DE SUBSTITUTION")
print("="*60)

def sk(cols):
    """
    Génère une clé de substitution stable 64-bit positive
    à partir de clés naturelles en utilisant xxhash64
    """
    return F.abs(F.xxhash64(*[F.col(c) for c in cols]))

print("""
✅ Fonction sk() définie:
   - Utilise xxhash64 pour hash stable
   - Retourne valeur positive avec abs()
   - Déterministe (même clé naturelle = même SK)
   
Exemple:
  sk(["customer_id"]) → hash stable du customer_id
""")
============================================================
🔑 ÉTAPE 2: FONCTION CLÉ DE SUBSTITUTION
============================================================

✅ Fonction sk() définie:
   - Utilise xxhash64 pour hash stable
   - Retourne valeur positive avec abs()
   - Déterministe (même clé naturelle = même SK)
   
Exemple:
  sk(["customer_id"]) → hash stable du customer_id

In [6]:
print("\n" + "="*60)
print("📐 ÉTAPE 3: CONSTRUCTION DES DIMENSIONS")
print("="*60)

# dim_customer
dim_customer = customers.select(
    sk(["customer_id"]).alias("customer_sk"),
    "customer_id",
    "name",
    "email",
    "created_at"
)

# dim_brand
dim_brand = brands.select(
    sk(["brand_id"]).alias("brand_sk"),
    "brand_id",
    "brand_name"
)

# dim_category
dim_category = categories.select(
    sk(["category_id"]).alias("category_sk"),
    "category_id",
    "category_name"
)

# dim_product
dim_product = products.select(
    sk(["product_id"]).alias("product_sk"),
    "product_id",
    "product_name",
    sk(["brand_id"]).alias("brand_sk"),
    sk(["category_id"]).alias("category_sk"),
    "price"
)

# Affiche les résultats
print("\n📊 Dimensions créées:\n")
print(f"dim_customer:  {dim_customer.count()} rows")
dim_customer.show(3)

print(f"\ndim_brand:     {dim_brand.count()} rows")
dim_brand.show(3)

print(f"\ndim_category:  {dim_category.count()} rows")
dim_category.show(3)

print(f"\ndim_product:   {dim_product.count()} rows")
dim_product.show(3)

# Sauvegarde les statistiques
with open("proof/dimensions_summary.csv", "w") as f:
    f.write("Dimension,Lignes,Colonnes\n")
    f.write(f"dim_customer,{dim_customer.count()},5\n")
    f.write(f"dim_brand,{dim_brand.count()},3\n")
    f.write(f"dim_category,{dim_category.count()},3\n")
    f.write(f"dim_product,{dim_product.count()},6\n")

print("\n✅ Dimensions sauvegardées dans proof/dimensions_summary.csv")
============================================================
📐 ÉTAPE 3: CONSTRUCTION DES DIMENSIONS
============================================================

📊 Dimensions créées:

dim_customer:  10 rows
+-------------------+-----------+----------------+-------------------+-------------------+
|        customer_sk|customer_id|            name|              email|         created_at|
+-------------------+-----------+----------------+-------------------+-------------------+
|6698625589789238999|          1|    Alice Martin|customer1@email.com|2024-01-01 00:00:00|
|8420071140774656230|          2|      Bob Dupont|customer2@email.com|2024-01-11 00:00:00|
|6258084186791473711|          3|Caroline Lefevre|customer3@email.com|2024-01-21 00:00:00|
+-------------------+-----------+----------------+-------------------+-------------------+
only showing top 3 rows

dim_brand:     5 rows
+-------------------+--------+----------+
|           brand_sk|brand_id|brand_name|
+-------------------+--------+----------+
|6698625589789238999|       1|  TechCorp|
|8420071140774656230|       2|ElectroMax|
|6258084186791473711|       3|GadgetPlus|
+-------------------+--------+----------+
only showing top 3 rows

dim_category:  5 rows
+-------------------+-----------+-------------+
|        category_sk|category_id|category_name|
+-------------------+-----------+-------------+
|6698625589789238999|          1|  Smartphones|
|8420071140774656230|          2|      Laptops|
|6258084186791473711|          3|      Tablets|
+-------------------+-----------+-------------+
only showing top 3 rows

dim_product:   20 rows
+-------------------+----------+------------------+-------------------+-------------------+------+
|         product_sk|product_id|      product_name|           brand_sk|        category_sk| price|
+-------------------+----------+------------------+-------------------+-------------------+------+
|6698625589789238999|         1|         iPhone 15|6698625589789238999|6698625589789238999|999.99|
|8420071140774656230|         2|Samsung Galaxy S24|8420071140774656230|6698625589789238999|899.99|
|6258084186791473711|         3|    Google Pixel 8|6698625589789238999|6698625589789238999|799.99|
+-------------------+----------+------------------+-------------------+-------------------+------+
only showing top 3 rows

✅ Dimensions sauvegardées dans proof/dimensions_summary.csv
In [7]:
print("\n" + "="*60)
print("📅 ÉTAPE 4: DIMENSION DATE")
print("="*60)

from pyspark.sql import Window as W

# Extrait les dates uniques
dates = orders.select(F.to_date("order_date").alias("date")).distinct()

# Construit dim_date
dim_date = dates.select(
    sk(["date"]).alias("date_sk"),
    F.col("date"),
    F.year("date").alias("year"),
    F.month("date").alias("month"),
    F.dayofmonth("date").alias("day"),
    F.date_format("date", "E").alias("dow"),  # Day of week
    F.quarter("date").alias("quarter"),
    F.weekofyear("date").alias("week_of_year")
)

print(f"\n📊 dim_date: {dim_date.count()} jours uniques\n")
dim_date.show(10, truncate=False)

# Statistiques temporelles
min_date = orders.agg(F.min("order_date")).collect()[0][0]
max_date = orders.agg(F.max("order_date")).collect()[0][0]

print(f"\n📈 Plage temporelle:")
print(f"   Min date: {min_date}")
print(f"   Max date: {max_date}")

# Sauvegarde
with open("proof/date_dimension_summary.csv", "w") as f:
    f.write("Métrique,Valeur\n")
    f.write(f"Jours uniques,{dim_date.count()}\n")
    f.write(f"Date min,{min_date}\n")
    f.write(f"Date max,{max_date}\n")

print("\n✅ Dimension date sauvegardée")
============================================================
📅 ÉTAPE 4: DIMENSION DATE
============================================================

📊 dim_date: 45 jours uniques

+-------------------+----------+----+-----+---+---+-------+------------+
|date_sk            |date      |year|month|day|dow|quarter|week_of_year|
+-------------------+----------+----+-----+---+---+-------+------------+
|1307272247578339239|2024-08-27|2024|8    |27 |Tue|3      |35          |
|4822692307583801426|2024-11-02|2024|11   |2  |Sat|4      |44          |
|8872159615915572291|2024-10-24|2024|10   |24 |Thu|4      |43          |
|7593407256401673777|2024-09-10|2024|9    |10 |Tue|3      |37          |
|3799421252167961828|2024-08-05|2024|8    |5  |Mon|3      |32          |
|2795412895394378798|2024-10-25|2024|10   |25 |Fri|4      |43          |
|2857658719716137024|2024-07-18|2024|7    |18 |Thu|3      |29          |
|5690026203634146595|2024-08-29|2024|8    |29 |Thu|3      |35          |
|4425985170592395865|2024-08-07|2024|8    |7  |Wed|3      |32          |
|2425830812311755070|2024-07-17|2024|7    |17 |Wed|3      |29          |
+-------------------+----------+----+-----+---+---+-------+------------+
only showing top 10 rows

📈 Plage temporelle:
   Min date: 2024-06-14 00:00:00
   Max date: 2024-11-22 00:00:00

✅ Dimension date sauvegardée
In [10]:
# Plus propre: join puis projection immédiate
df_fact = (oi
    .join(p, F.col("oi.product_id") == F.col("p.product_id"), "inner")
    .join(o, F.col("oi.order_id") == F.col("o.order_id"), "inner")
    .join(c, F.col("o.customer_id") == F.col("c.customer_id"), "inner")
)

# Projet immédiatement pour éviter les ambiguïtés
df_fact = df_fact.select(
    F.col("oi.order_id").alias("order_id"),
    F.col("oi.product_id").alias("product_id"),
    F.col("oi.quantity").alias("quantity"),
    F.col("oi.unit_price").alias("unit_price"),
    F.col("o.customer_id").alias("customer_id"),
    F.col("o.order_date").alias("order_date"),
    F.col("p.price").alias("product_price")
)

# Maintenant plus d'ambiguïté!
df_fact = (df_fact
    .withColumn("date", F.to_date("order_date"))
    .withColumn("date_sk", sk(["date"]))
    .withColumn("customer_sk", sk(["customer_id"]))
    .withColumn("product_sk", sk(["product_id"]))
    .withColumn("subtotal", F.col("quantity") * F.col("unit_price"))
    .withColumn("year", F.year("date"))
    .withColumn("month", F.month("date"))
)
In [11]:
print("\n" + "="*60)
print("📊 ÉTAPE 5: CONSTRUCTION DE LA TABLE DE FAITS (PROPRE)")
print("="*60)

# Aliases
oi = order_items.alias("oi")
p = products.alias("p")
o = orders.alias("o")
c = customers.alias("c")

print("\n🔗 Sequence de joins:")
print("   order_items (oi)")
print("   → JOIN products (p)")
print("   → JOIN orders (o)")
print("   → JOIN customers (c)")

# ÉTAPE 1: Joins
df_joined = (oi
    .join(p, F.col("oi.product_id") == F.col("p.product_id"), "inner")
    .join(o, F.col("oi.order_id") == F.col("o.order_id"), "inner")
    .join(c, F.col("o.customer_id") == F.col("c.customer_id"), "inner")
)

# ÉTAPE 2: Projection immédiate pour éliminer l'ambiguïté
df_joined = df_joined.select(
    F.col("oi.order_id").alias("order_id"),
    F.col("oi.product_id").alias("product_id"),
    F.col("oi.quantity").alias("quantity"),
    F.col("oi.unit_price").alias("unit_price"),
    F.col("o.customer_id").alias("customer_id"),
    F.col("o.order_date").alias("order_date"),
    F.col("p.price").alias("product_price")
)

# ÉTAPE 3: Transformations
df_fact = (df_joined
    .withColumn("date", F.to_date("order_date"))
    .withColumn("date_sk", sk(["date"]))
    .withColumn("customer_sk", sk(["customer_id"]))
    .withColumn("product_sk", sk(["product_id"]))
    .withColumn("quantity", F.col("quantity").cast("int"))
    .withColumn("unit_price", F.col("unit_price").cast("double"))
    .withColumn("subtotal", F.col("quantity") * F.col("unit_price"))
    .withColumn("year", F.year("date"))
    .withColumn("month", F.month("date"))
    .select(
        "order_id",
        "date_sk",
        "customer_sk",
        "product_sk",
        "quantity",
        "unit_price",
        "subtotal",
        "year",
        "month"
    )
)

print(f"\n✅ fact_sales construite avec succès: {df_fact.count()} lignes\n")
df_fact.show(5)

# Statistiques
print(f"\n📈 Statistiques fact_sales:")
fact_stats = df_fact.agg(
    F.count("order_id").alias("nombre_orders"),
    F.sum("quantity").alias("total_quantity"),
    F.sum("subtotal").alias("total_gmv"),
    F.avg("subtotal").alias("avg_order_value"),
    F.min("subtotal").alias("min_order"),
    F.max("subtotal").alias("max_order")
).collect()[0]

print(f"   Nombre de commandes: {int(fact_stats['nombre_orders'])}")
print(f"   Quantité totale: {int(fact_stats['total_quantity'])}")
print(f"   GMV total: ${fact_stats['total_gmv']:.2f}")
print(f"   AOV moyen: ${fact_stats['avg_order_value']:.2f}")
print(f"   Min commande: ${fact_stats['min_order']:.2f}")
print(f"   Max commande: ${fact_stats['max_order']:.2f}")

# Sauvegarde les stats
with open("proof/fact_sales_summary.csv", "w") as f:
    f.write("Métrique,Valeur\n")
    f.write(f"Nombre de commandes,{int(fact_stats['nombre_orders'])}\n")
    f.write(f"Quantité totale,{int(fact_stats['total_quantity'])}\n")
    f.write(f"GMV total,${fact_stats['total_gmv']:.2f}\n")
    f.write(f"AOV moyen,${fact_stats['avg_order_value']:.2f}\n")

print("\n✅ fact_sales sauvegardée dans proof/fact_sales_summary.csv")
print("="*60)
============================================================
📊 ÉTAPE 5: CONSTRUCTION DE LA TABLE DE FAITS (PROPRE)
============================================================

🔗 Sequence de joins:
   order_items (oi)
   → JOIN products (p)
   → JOIN orders (o)
   → JOIN customers (c)

✅ fact_sales construite avec succès: 100 lignes

+--------+-------------------+-------------------+-------------------+--------+----------+------------------+----+-----+
|order_id|            date_sk|        customer_sk|         product_sk|quantity|unit_price|          subtotal|year|month|
+--------+-------------------+-------------------+-------------------+--------+----------+------------------+----+-----+
|      49|5690026203634146595|8420071140774656230| 233500712460350175|       4|   1499.99|           5999.96|2024|    8|
|      17|3271524206859782347|8420071140774656230|8420071140774656230|       2|    899.99|           1799.98|2024|   10|
|      26|6437972260669345058| 504019808641096632|5346497071442387076|       4|    149.99|            599.96|2024|   11|
|       2|8799929592043627498|2852032610340310743|6698625589789238999|       2|    999.99|           1999.98|2024|    9|
|      50|4575922205773423797|2786828215451145335|8285521376477742517|       3|   1299.99|3899.9700000000003|2024|   11|
+--------+-------------------+-------------------+-------------------+--------+----------+------------------+----+-----+
only showing top 5 rows

📈 Statistiques fact_sales:
   Nombre de commandes: 100
   Quantité totale: 295
   GMV total: $196872.05
   AOV moyen: $1968.72
   Min commande: $9.99
   Max commande: $9999.95

✅ fact_sales sauvegardée dans proof/fact_sales_summary.csv
============================================================
In [12]:
print("\n" + "="*60)
print("🔍 PLAN D'EXÉCUTION - FACT_SALES")
print("="*60 + "\n")

df_fact.explain("formatted")

# Sauvegarde le plan
with open("proof/plan_fact_join.txt", "w") as f:
    f.write(f"=== PLAN FACT_SALES ===\n")
    f.write(f"Timestamp: {_dt.now()}\n\n")
    f.write(df_fact._jdf.queryExecution().executedPlan().toString())

print("\n✅ Plan sauvegardé dans proof/plan_fact_join.txt")
============================================================
🔍 PLAN D'EXÉCUTION - FACT_SALES
============================================================

== Physical Plan ==
AdaptiveSparkPlan (19)
+- Project (18)
   +- Project (17)
      +- BroadcastHashJoin Inner BuildRight (16)
         :- Project (12)
         :  +- BroadcastHashJoin Inner BuildRight (11)
         :     :- Project (7)
         :     :  +- BroadcastHashJoin Inner BuildRight (6)
         :     :     :- Filter (2)
         :     :     :  +- Scan csv  (1)
         :     :     +- BroadcastExchange (5)
         :     :        +- Filter (4)
         :     :           +- Scan csv  (3)
         :     +- BroadcastExchange (10)
         :        +- Filter (9)
         :           +- Scan csv  (8)
         +- BroadcastExchange (15)
            +- Filter (14)
               +- Scan csv  (13)


(1) Scan csv 
Output [4]: [order_id#17, product_id#18, quantity#19, unit_price#20]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_order_items.csv]
PushedFilters: [IsNotNull(product_id), IsNotNull(order_id)]
ReadSchema: struct<order_id:int,product_id:int,quantity:int,unit_price:double>

(2) Filter
Input [4]: [order_id#17, product_id#18, quantity#19, unit_price#20]
Condition : (isnotnull(product_id#18) AND isnotnull(order_id#17))

(3) Scan csv 
Output [1]: [product_id#8]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_products.csv]
PushedFilters: [IsNotNull(product_id)]
ReadSchema: struct<product_id:int>

(4) Filter
Input [1]: [product_id#8]
Condition : isnotnull(product_id#8)

(5) BroadcastExchange
Input [1]: [product_id#8]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=2150]

(6) BroadcastHashJoin
Left keys [1]: [product_id#18]
Right keys [1]: [product_id#8]
Join type: Inner
Join condition: None

(7) Project
Output [4]: [order_id#17, product_id#18, quantity#19, unit_price#20]
Input [5]: [order_id#17, product_id#18, quantity#19, unit_price#20, product_id#8]

(8) Scan csv 
Output [3]: [order_id#13, customer_id#14, order_date#15]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_orders.csv]
PushedFilters: [IsNotNull(order_id), IsNotNull(customer_id)]
ReadSchema: struct<order_id:int,customer_id:int,order_date:timestamp>

(9) Filter
Input [3]: [order_id#13, customer_id#14, order_date#15]
Condition : (isnotnull(order_id#13) AND isnotnull(customer_id#14))

(10) BroadcastExchange
Input [3]: [order_id#13, customer_id#14, order_date#15]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=2154]

(11) BroadcastHashJoin
Left keys [1]: [order_id#17]
Right keys [1]: [order_id#13]
Join type: Inner
Join condition: None

(12) Project
Output [6]: [order_id#17, product_id#18, quantity#19, unit_price#20, customer_id#14, order_date#15]
Input [7]: [order_id#17, product_id#18, quantity#19, unit_price#20, order_id#13, customer_id#14, order_date#15]

(13) Scan csv 
Output [1]: [customer_id#0]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_customers.csv]
PushedFilters: [IsNotNull(customer_id)]
ReadSchema: struct<customer_id:int>

(14) Filter
Input [1]: [customer_id#0]
Condition : isnotnull(customer_id#0)

(15) BroadcastExchange
Input [1]: [customer_id#0]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=2158]

(16) BroadcastHashJoin
Left keys [1]: [customer_id#14]
Right keys [1]: [customer_id#0]
Join type: Inner
Join condition: None

(17) Project
Output [6]: [order_id#17, product_id#18, quantity#19, unit_price#20, customer_id#14, cast(order_date#15 as date) AS date#392]
Input [7]: [order_id#17, product_id#18, quantity#19, unit_price#20, customer_id#14, order_date#15, customer_id#0]

(18) Project
Output [9]: [order_id#17, abs(xxhash64(date#392, 42)) AS date_sk#393L, abs(xxhash64(customer_id#14, 42)) AS customer_sk#394L, abs(xxhash64(product_id#18, 42)) AS product_sk#395L, quantity#19, unit_price#20, (cast(quantity#19 as double) * unit_price#20) AS subtotal#398, year(date#392) AS year#399, month(date#392) AS month#400]
Input [6]: [order_id#17, product_id#18, quantity#19, unit_price#20, customer_id#14, date#392]

(19) AdaptiveSparkPlan
Output [9]: [order_id#17, date_sk#393L, customer_sk#394L, product_sk#395L, quantity#19, unit_price#20, subtotal#398, year#399, month#400]
Arguments: isFinalPlan=false



✅ Plan sauvegardé dans proof/plan_fact_join.txt
In [13]:
print("\n" + "="*60)
print("💾 ÉTAPE 6: ÉCRITURE DES SORTIES PARQUET")
print("="*60)

base_out = "outputs/lab2"

# Crée le répertoire
os.makedirs(base_out, exist_ok=True)

# Écrit les dimensions
print("\n📝 Écriture des dimensions...")

(dim_customer.write
    .mode("overwrite")
    .parquet(f"{base_out}/dim_customer"))
print(f"   ✅ dim_customer → {base_out}/dim_customer")

(dim_brand.write
    .mode("overwrite")
    .parquet(f"{base_out}/dim_brand"))
print(f"   ✅ dim_brand → {base_out}/dim_brand")

(dim_category.write
    .mode("overwrite")
    .parquet(f"{base_out}/dim_category"))
print(f"   ✅ dim_category → {base_out}/dim_category")

(dim_product.write
    .mode("overwrite")
    .parquet(f"{base_out}/dim_product"))
print(f"   ✅ dim_product → {base_out}/dim_product")

(dim_date.write
    .mode("overwrite")
    .parquet(f"{base_out}/dim_date"))
print(f"   ✅ dim_date → {base_out}/dim_date")

# Écrit la table de faits (partitionnée par year, month)
print("\n📝 Écriture de la table de faits (partitionnée par year/month)...")
(df_fact.write
    .mode("overwrite")
    .partitionBy("year", "month")
    .parquet(f"{base_out}/fact_sales"))
print(f"   ✅ fact_sales → {base_out}/fact_sales")

print("\n" + "="*60)
print("✅ Toutes les sorties Parquet écrites avec succès!")
print("="*60)

# Vérifie les fichiers
import subprocess
result = subprocess.run(f"ls -lhR {base_out}", shell=True, capture_output=True, text=True)
print("\n📂 Structure des sorties:")
print(result.stdout)
============================================================
💾 ÉTAPE 6: ÉCRITURE DES SORTIES PARQUET
============================================================

📝 Écriture des dimensions...
                                                                                
   ✅ dim_customer → outputs/lab2/dim_customer
   ✅ dim_brand → outputs/lab2/dim_brand
   ✅ dim_category → outputs/lab2/dim_category
   ✅ dim_product → outputs/lab2/dim_product
   ✅ dim_date → outputs/lab2/dim_date

📝 Écriture de la table de faits (partitionnée par year/month)...
   ✅ fact_sales → outputs/lab2/fact_sales

============================================================
✅ Toutes les sorties Parquet écrites avec succès!
============================================================

📂 Structure des sorties:
outputs/lab2:
total 24K
drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec  9 20:49 dim_brand
drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec  9 20:49 dim_category
drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec  9 20:49 dim_customer
drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec  9 20:49 dim_date
drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec  9 20:49 dim_product
drwxr-xr-x 3 bibawandaogo bibawandaogo 4.0K Dec  9 20:49 fact_sales

outputs/lab2/dim_brand:
total 4.0K
-rw-r--r-- 1 bibawandaogo bibawandaogo    0 Dec  9 20:49 _SUCCESS
-rw-r--r-- 1 bibawandaogo bibawandaogo 1.2K Dec  9 20:49 part-00000-fc4094ef-c911-499c-988e-18aeb0944a64-c000.snappy.parquet

outputs/lab2/dim_category:
total 4.0K
-rw-r--r-- 1 bibawandaogo bibawandaogo    0 Dec  9 20:49 _SUCCESS
-rw-r--r-- 1 bibawandaogo bibawandaogo 1.2K Dec  9 20:49 part-00000-1402ab54-95f4-4aec-9019-d4e590fa5d12-c000.snappy.parquet

outputs/lab2/dim_customer:
total 4.0K
-rw-r--r-- 1 bibawandaogo bibawandaogo    0 Dec  9 20:49 _SUCCESS
-rw-r--r-- 1 bibawandaogo bibawandaogo 2.0K Dec  9 20:49 part-00000-79204bd6-e80c-4244-8b9e-8bdcef05a561-c000.snappy.parquet

outputs/lab2/dim_date:
total 4.0K
-rw-r--r-- 1 bibawandaogo bibawandaogo    0 Dec  9 20:49 _SUCCESS
-rw-r--r-- 1 bibawandaogo bibawandaogo 3.3K Dec  9 20:49 part-00000-e31640ee-0680-4931-b223-66cf148aa595-c000.snappy.parquet

outputs/lab2/dim_product:
total 4.0K
-rw-r--r-- 1 bibawandaogo bibawandaogo    0 Dec  9 20:49 _SUCCESS
-rw-r--r-- 1 bibawandaogo bibawandaogo 2.7K Dec  9 20:49 part-00000-d65d61a9-9702-4815-8bdd-4fad64667a61-c000.snappy.parquet

outputs/lab2/fact_sales:
total 4.0K
-rw-r--r-- 1 bibawandaogo bibawandaogo    0 Dec  9 20:49 _SUCCESS
drwxr-xr-x 8 bibawandaogo bibawandaogo 4.0K Dec  9 20:49 year=2024

outputs/lab2/fact_sales/year=2024:
total 24K
drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec  9 20:49 month=10
drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec  9 20:49 month=11
drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec  9 20:49 month=6
drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec  9 20:49 month=7
drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec  9 20:49 month=8
drwxr-xr-x 2 bibawandaogo bibawandaogo 4.0K Dec  9 20:49 month=9

outputs/lab2/fact_sales/year=2024/month=10:
total 4.0K
-rw-r--r-- 1 bibawandaogo bibawandaogo 2.8K Dec  9 20:49 part-00000-6f995a37-e29f-4269-a02d-43c701036080.c000.snappy.parquet

outputs/lab2/fact_sales/year=2024/month=11:
total 4.0K
-rw-r--r-- 1 bibawandaogo bibawandaogo 2.9K Dec  9 20:49 part-00000-6f995a37-e29f-4269-a02d-43c701036080.c000.snappy.parquet

outputs/lab2/fact_sales/year=2024/month=6:
total 4.0K
-rw-r--r-- 1 bibawandaogo bibawandaogo 2.6K Dec  9 20:49 part-00000-6f995a37-e29f-4269-a02d-43c701036080.c000.snappy.parquet

outputs/lab2/fact_sales/year=2024/month=7:
total 4.0K
-rw-r--r-- 1 bibawandaogo bibawandaogo 2.6K Dec  9 20:49 part-00000-6f995a37-e29f-4269-a02d-43c701036080.c000.snappy.parquet

outputs/lab2/fact_sales/year=2024/month=8:
total 4.0K
-rw-r--r-- 1 bibawandaogo bibawandaogo 2.8K Dec  9 20:49 part-00000-6f995a37-e29f-4269-a02d-43c701036080.c000.snappy.parquet

outputs/lab2/fact_sales/year=2024/month=9:
total 4.0K
-rw-r--r-- 1 bibawandaogo bibawandaogo 2.8K Dec  9 20:49 part-00000-6f995a37-e29f-4269-a02d-43c701036080.c000.snappy.parquet

In [14]:
print("\n" + "="*60)
print("⚡ ÉTAPE 7: OPTIMISATION - PROJECTION")
print("="*60)

import time

# ========== CAS A: JOIN PUIS PROJECT (TARDIF) ==========
print("\n📊 CAS A: Join puis projection (projection tardive)")
print("-" * 60)

start_a = time.time()

df_a = (orders.join(order_items, "order_id")
            .join(products, "product_id")
            .groupBy(F.to_date("order_date").alias("d"))
            .agg(F.sum(F.col("quantity") * F.col("price")).alias("gmv")))

print("\n🔍 Plan Spark (CAS A):\n")
df_a.explain("formatted")

count_a = df_a.count()
time_a = time.time() - start_a

print(f"\n📈 Résultats CAS A:")
print(f"   Lignes: {count_a}")
print(f"   Temps: {time_a:.3f}s")

df_a.show()

# Sauvegarde le plan
with open("proof/plan_case_a_late_projection.txt", "w") as f:
    f.write(f"=== CAS A: PROJECTION TARDIVE ===\n")
    f.write(f"Timestamp: {_dt.now()}\n")
    f.write(f"Durée: {time_a:.3f}s\n\n")
    f.write(df_a._jdf.queryExecution().executedPlan().toString())

# ========== CAS B: PROJECT PUIS JOIN (PRÉCOCE) ==========
print("\n📊 CAS B: Projection puis join (projection précoce)")
print("-" * 60)

start_b = time.time()

df_b = (orders.select("order_id", "order_date")
            .join(order_items.select("order_id", "product_id", "quantity"), "order_id")
            .join(products.select("product_id", "price"), "product_id")
            .groupBy(F.to_date("order_date").alias("d"))
            .agg(F.sum(F.col("quantity") * F.col("price")).alias("gmv")))

print("\n🔍 Plan Spark (CAS B):\n")
df_b.explain("formatted")

count_b = df_b.count()
time_b = time.time() - start_b

print(f"\n📈 Résultats CAS B:")
print(f"   Lignes: {count_b}")
print(f"   Temps: {time_b:.3f}s")

df_b.show()

# Sauvegarde le plan
with open("proof/plan_case_b_early_projection.txt", "w") as f:
    f.write(f"=== CAS B: PROJECTION PRÉCOCE ===\n")
    f.write(f"Timestamp: {_dt.now()}\n")
    f.write(f"Durée: {time_b:.3f}s\n\n")
    f.write(df_b._jdf.queryExecution().executedPlan().toString())

# ========== COMPARAISON ==========
print("\n" + "="*60)
print("📊 COMPARAISON CAS A vs CAS B")
print("="*60)

improvement = ((time_a - time_b) / time_a * 100) if time_a > 0 else 0

print(f"\nProj Tardive (A): {time_a:.3f}s")
print(f"Proj Précoce (B): {time_b:.3f}s")
print(f"Amélioration:     {improvement:+.1f}%")

if time_b < time_a:
    print(f"\n✅ Projection PRÉCOCE est {time_a/time_b:.2f}x plus rapide!")
else:
    print(f"\n⚠️  Les performances sont similaires (small dataset)")

# Sauvegarde les métriques
with open("proof/projection_comparison.csv", "w") as f:
    f.write("Cas,Approche,Temps(s),Lignes,Amélioration(%)\n")
    f.write(f"A,Projection Tardive,{time_a:.3f},{count_a},{0:.1f}\n")
    f.write(f"B,Projection Précoce,{time_b:.3f},{count_b},{improvement:.1f}\n")

print("\n✅ Métriques sauvegardées dans proof/projection_comparison.csv")
============================================================
⚡ ÉTAPE 7: OPTIMISATION - PROJECTION
============================================================

📊 CAS A: Join puis projection (projection tardive)
------------------------------------------------------------

🔍 Plan Spark (CAS A):

== Physical Plan ==
AdaptiveSparkPlan (16)
+- HashAggregate (15)
   +- Exchange (14)
      +- HashAggregate (13)
         +- Project (12)
            +- BroadcastHashJoin Inner BuildRight (11)
               :- Project (7)
               :  +- BroadcastHashJoin Inner BuildLeft (6)
               :     :- BroadcastExchange (3)
               :     :  +- Filter (2)
               :     :     +- Scan csv  (1)
               :     +- Filter (5)
               :        +- Scan csv  (4)
               +- BroadcastExchange (10)
                  +- Filter (9)
                     +- Scan csv  (8)


(1) Scan csv 
Output [2]: [order_id#13, order_date#15]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_orders.csv]
PushedFilters: [IsNotNull(order_id)]
ReadSchema: struct<order_id:int,order_date:timestamp>

(2) Filter
Input [2]: [order_id#13, order_date#15]
Condition : isnotnull(order_id#13)

(3) BroadcastExchange
Input [2]: [order_id#13, order_date#15]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=2675]

(4) Scan csv 
Output [3]: [order_id#17, product_id#18, quantity#19]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_order_items.csv]
PushedFilters: [IsNotNull(order_id), IsNotNull(product_id)]
ReadSchema: struct<order_id:int,product_id:int,quantity:int>

(5) Filter
Input [3]: [order_id#17, product_id#18, quantity#19]
Condition : (isnotnull(order_id#17) AND isnotnull(product_id#18))

(6) BroadcastHashJoin
Left keys [1]: [order_id#13]
Right keys [1]: [order_id#17]
Join type: Inner
Join condition: None

(7) Project
Output [3]: [order_date#15, product_id#18, quantity#19]
Input [5]: [order_id#13, order_date#15, order_id#17, product_id#18, quantity#19]

(8) Scan csv 
Output [2]: [product_id#8, price#12]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_products.csv]
PushedFilters: [IsNotNull(product_id)]
ReadSchema: struct<product_id:int,price:double>

(9) Filter
Input [2]: [product_id#8, price#12]
Condition : isnotnull(product_id#8)

(10) BroadcastExchange
Input [2]: [product_id#8, price#12]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=2679]

(11) BroadcastHashJoin
Left keys [1]: [product_id#18]
Right keys [1]: [product_id#8]
Join type: Inner
Join condition: None

(12) Project
Output [3]: [quantity#19, price#12, cast(order_date#15 as date) AS _groupingexpression#558]
Input [5]: [order_date#15, product_id#18, quantity#19, product_id#8, price#12]

(13) HashAggregate
Input [3]: [quantity#19, price#12, _groupingexpression#558]
Keys [1]: [_groupingexpression#558]
Functions [1]: [partial_sum((cast(quantity#19 as double) * price#12))]
Aggregate Attributes [1]: [sum#559]
Results [2]: [_groupingexpression#558, sum#560]

(14) Exchange
Input [2]: [_groupingexpression#558, sum#560]
Arguments: hashpartitioning(_groupingexpression#558, 200), ENSURE_REQUIREMENTS, [plan_id=2684]

(15) HashAggregate
Input [2]: [_groupingexpression#558, sum#560]
Keys [1]: [_groupingexpression#558]
Functions [1]: [sum((cast(quantity#19 as double) * price#12))]
Aggregate Attributes [1]: [sum((cast(quantity#19 as double) * price#12))#557]
Results [2]: [_groupingexpression#558 AS d#544, sum((cast(quantity#19 as double) * price#12))#557 AS gmv#545]

(16) AdaptiveSparkPlan
Output [2]: [d#544, gmv#545]
Arguments: isFinalPlan=false



📈 Résultats CAS A:
   Lignes: 37
   Temps: 1.089s
+----------+------------------+
|         d|               gmv|
+----------+------------------+
|2024-08-27|           3909.94|
|2024-10-24|5539.9400000000005|
|2024-11-02|           2324.92|
|2024-09-10|           5259.94|
|2024-08-05|           1399.98|
|2024-08-29|19749.789999999997|
|2024-08-07|             39.99|
|2024-11-05|15599.880000000001|
|2024-06-23|16699.910000000003|
|2024-08-15|           3559.91|
|2024-09-12|            949.97|
|2024-11-07|           3569.92|
|2024-09-25|13679.779999999999|
|2024-10-01|           1399.96|
|2024-09-21|           1859.95|
|2024-11-11|            819.94|
|2024-10-15|           6649.87|
|2024-09-09|            699.99|
|2024-10-28|2099.9700000000003|
|2024-09-05|          12979.89|
+----------+------------------+
only showing top 20 rows

📊 CAS B: Projection puis join (projection précoce)
------------------------------------------------------------

🔍 Plan Spark (CAS B):

== Physical Plan ==
AdaptiveSparkPlan (16)
+- HashAggregate (15)
   +- Exchange (14)
      +- HashAggregate (13)
         +- Project (12)
            +- BroadcastHashJoin Inner BuildRight (11)
               :- Project (7)
               :  +- BroadcastHashJoin Inner BuildLeft (6)
               :     :- BroadcastExchange (3)
               :     :  +- Filter (2)
               :     :     +- Scan csv  (1)
               :     +- Filter (5)
               :        +- Scan csv  (4)
               +- BroadcastExchange (10)
                  +- Filter (9)
                     +- Scan csv  (8)


(1) Scan csv 
Output [2]: [order_id#13, order_date#15]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_orders.csv]
PushedFilters: [IsNotNull(order_id)]
ReadSchema: struct<order_id:int,order_date:timestamp>

(2) Filter
Input [2]: [order_id#13, order_date#15]
Condition : isnotnull(order_id#13)

(3) BroadcastExchange
Input [2]: [order_id#13, order_date#15]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=3299]

(4) Scan csv 
Output [3]: [order_id#17, product_id#18, quantity#19]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_order_items.csv]
PushedFilters: [IsNotNull(order_id), IsNotNull(product_id)]
ReadSchema: struct<order_id:int,product_id:int,quantity:int>

(5) Filter
Input [3]: [order_id#17, product_id#18, quantity#19]
Condition : (isnotnull(order_id#17) AND isnotnull(product_id#18))

(6) BroadcastHashJoin
Left keys [1]: [order_id#13]
Right keys [1]: [order_id#17]
Join type: Inner
Join condition: None

(7) Project
Output [3]: [order_date#15, product_id#18, quantity#19]
Input [5]: [order_id#13, order_date#15, order_id#17, product_id#18, quantity#19]

(8) Scan csv 
Output [2]: [product_id#8, price#12]
Batched: false
Location: InMemoryFileIndex [file:/home/bibawandaogo/data engineering 1/data/lab2_products.csv]
PushedFilters: [IsNotNull(product_id)]
ReadSchema: struct<product_id:int,price:double>

(9) Filter
Input [2]: [product_id#8, price#12]
Condition : isnotnull(product_id#8)

(10) BroadcastExchange
Input [2]: [product_id#8, price#12]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=3303]

(11) BroadcastHashJoin
Left keys [1]: [product_id#18]
Right keys [1]: [product_id#8]
Join type: Inner
Join condition: None

(12) Project
Output [3]: [quantity#19, price#12, cast(order_date#15 as date) AS _groupingexpression#599]
Input [5]: [order_date#15, product_id#18, quantity#19, product_id#8, price#12]

(13) HashAggregate
Input [3]: [quantity#19, price#12, _groupingexpression#599]
Keys [1]: [_groupingexpression#599]
Functions [1]: [partial_sum((cast(quantity#19 as double) * price#12))]
Aggregate Attributes [1]: [sum#600]
Results [2]: [_groupingexpression#599, sum#601]

(14) Exchange
Input [2]: [_groupingexpression#599, sum#601]
Arguments: hashpartitioning(_groupingexpression#599, 200), ENSURE_REQUIREMENTS, [plan_id=3308]

(15) HashAggregate
Input [2]: [_groupingexpression#599, sum#601]
Keys [1]: [_groupingexpression#599]
Functions [1]: [sum((cast(quantity#19 as double) * price#12))]
Aggregate Attributes [1]: [sum((cast(quantity#19 as double) * price#12))#598]
Results [2]: [_groupingexpression#599 AS d#591, sum((cast(quantity#19 as double) * price#12))#598 AS gmv#592]

(16) AdaptiveSparkPlan
Output [2]: [d#591, gmv#592]
Arguments: isFinalPlan=false



📈 Résultats CAS B:
   Lignes: 37
   Temps: 0.764s
+----------+------------------+
|         d|               gmv|
+----------+------------------+
|2024-08-27|           3909.94|
|2024-10-24|5539.9400000000005|
|2024-11-02|           2324.92|
|2024-09-10|           5259.94|
|2024-08-05|           1399.98|
|2024-08-29|19749.789999999997|
|2024-08-07|             39.99|
|2024-11-05|15599.880000000001|
|2024-06-23|16699.910000000003|
|2024-08-15|           3559.91|
|2024-09-12|            949.97|
|2024-11-07|           3569.92|
|2024-09-25|13679.779999999999|
|2024-10-01|           1399.96|
|2024-09-21|           1859.95|
|2024-11-11|            819.94|
|2024-10-15|           6649.87|
|2024-09-09|            699.99|
|2024-10-28|2099.9700000000003|
|2024-09-05|          12979.89|
+----------+------------------+
only showing top 20 rows

============================================================
📊 COMPARAISON CAS A vs CAS B
============================================================

Proj Tardive (A): 1.089s
Proj Précoce (B): 0.764s
Amélioration:     +29.8%

✅ Projection PRÉCOCE est 1.42x plus rapide!

✅ Métriques sauvegardées dans proof/projection_comparison.csv
In [15]:
print("\n" + "="*60)
print("📋 ÉTAPE 8: RÉSUMÉ FINAL")
print("="*60)

summary_metrics = {
    "Timestamp": str(_dt.now()),
    "Spark Version": spark.version,
    "Total Customers": dim_customer.count(),
    "Total Brands": dim_brand.count(),
    "Total Categories": dim_category.count(),
    "Total Products": dim_product.count(),
    "Total Dates": dim_date.count(),
    "Total Orders (Facts)": df_fact.count(),
    "Total GMV": fact_stats['total_gmv'],
    "Avg Order Value": fact_stats['avg_order_value'],
}

# Affiche le résumé
print("\n📊 MÉTRIQUES CLÉS:\n")
for key, value in summary_metrics.items():
    if isinstance(value, float):
        print(f"   {key:25s}: ${value:,.2f}")
    else:
        print(f"   {key:25s}: {value}")

# Sauvegarde dans CSV
with open("proof/lab2_metrics_final.csv", "w") as f:
    f.write("Métrique,Valeur\n")
    for key, value in summary_metrics.items():
        f.write(f"{key},{value}\n")

print("\n✅ Métriques sauvegardées dans proof/lab2_metrics_final.csv")

# Liste tous les fichiers de preuve
print("\n" + "="*60)
print("📁 FICHIERS DE PREUVE GÉNÉRÉS:")
print("="*60)

proof_files = os.listdir("proof")
for i, file in enumerate(sorted(proof_files), 1):
    size = os.path.getsize(f"proof/{file}")
    print(f"   {i}. proof/{file} ({size:,} bytes)")

print("\n" + "="*60)

# Arrête Spark
spark.stop()
print("✅ Session Spark arrêtée")
print("\n🎉 LAB 2 PRACTICE TERMINÉ AVEC SUCCÈS!")
print("="*60)
============================================================
📋 ÉTAPE 8: RÉSUMÉ FINAL
============================================================

📊 MÉTRIQUES CLÉS:

   Timestamp                : 2025-12-09 20:51:27.883443
   Spark Version            : 4.0.1
   Total Customers          : 10
   Total Brands             : 5
   Total Categories         : 5
   Total Products           : 20
   Total Dates              : 45
   Total Orders (Facts)     : 100
   Total GMV                : $196,872.05
   Avg Order Value          : $1,968.72

✅ Métriques sauvegardées dans proof/lab2_metrics_final.csv

============================================================
📁 FICHIERS DE PREUVE GÉNÉRÉS:
============================================================
   1. proof/date_dimension_summary.csv (92 bytes)
   2. proof/dimensions_summary.csv (92 bytes)
   3. proof/fact_sales_summary.csv (102 bytes)
   4. proof/ingestion_summary.csv (130 bytes)
   5. proof/lab2_metrics_final.csv (249 bytes)
   6. proof/plan_case_a_late_projection.txt (2,462 bytes)
   7. proof/plan_case_b_early_projection.txt (2,463 bytes)
   8. proof/plan_df.txt (1,535 bytes)
   9. proof/plan_fact_join.txt (3,221 bytes)
   10. proof/plan_formatted.txt (715 bytes)
   11. proof/plan_ingest.txt (1,317 bytes)
   12. proof/plan_rdd.txt (883 bytes)
   13. proof/projection_comparison.csv (116 bytes)

============================================================
✅ Session Spark arrêtée

🎉 LAB 2 PRACTICE TERMINÉ AVEC SUCCÈS!
============================================================