Inย [11]:
import findspark, os
import sys, re
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.functions import col
import time, platform
import psutil, resource
from IPython.core.magic import register_cell_magic
import numpy as np

DATA_PATH = "/home/bibawandaogo/data engineering 1/data/a1-brand.csv"

print("=" * 60)
print("DE1 โ€” Lab 1: Word Count Assignment")
print("=" * 60)
print(f"\n๐Ÿ“‚ Data path: {DATA_PATH}")
print(f"โœ… File exists: {os.path.exists(DATA_PATH)}")
print()
============================================================
DE1 โ€” Lab 1: Word Count Assignment
============================================================

๐Ÿ“‚ Data path: /home/bibawandaogo/data engineering 1/data/a1-brand.csv
โœ… File exists: True

Inย [12]:
# Cell 1: Time and Memory Measurement Tools
def _rss_bytes():
    """Get current memory usage in bytes"""
    return psutil.Process(os.getpid()).memory_info().rss

def _ru_maxrss_bytes():
    """Get peak memory usage"""
    ru = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
    if platform.system() == "Darwin":  # macOS
        return int(ru)
    else:  # Linux
        return int(ru) * 1024

@register_cell_magic
def timemem(line, cell):
    """Measure wall time and memory usage"""
    ip = get_ipython()
    rss_before = _rss_bytes()
    peak_before = _ru_maxrss_bytes()
    t0 = time.perf_counter()

    result = ip.run_cell(cell)

    t1 = time.perf_counter()
    rss_after = _rss_bytes()
    peak_after = _ru_maxrss_bytes()

    wall = t1 - t0
    rss_delta_mb = (rss_after - rss_before) / (1024*1024)
    peak_delta_mb = (peak_after - peak_before) / (1024*1024)

    print("\n" + "=" * 60)
    print(f"โฑ๏ธ  Wall time: {wall:.3f} s")
    print(f"๐Ÿ“Š RSS ฮ”: {rss_delta_mb:+.2f} MB")
    print(f"๐Ÿ“ˆ Peak memory ฮ”: {peak_delta_mb:+.2f} MB")
    print("=" * 60)

    return result

print("โœ… Time and memory measurement tools loaded")
โœ… Time and memory measurement tools loaded
Inย [13]:
# Cell 2: Initialize Spark Session
%%timemem

spark = (
    SparkSession.builder
    .appName("Assignment1")
    .master("local[*]")
    .config("spark.ui.showConsoleProgress", "true")
    .config("spark.sql.shuffle.partitions", "200")
    .getOrCreate()
)

sc = spark.sparkContext

print("\nโœ… Spark Session Created")
print(f"Spark Version: {spark.version}")
print(f"Master: {sc.master}")
print(f"App Name: {sc.appName}")
UsageError: Line magic function `%%timemem` not found.
Inย [14]:
# Cell 2: Initialize Spark Session (sans timemem)

import time

t0 = time.perf_counter()

spark = (
    SparkSession.builder
    .appName("Assignment1")
    .master("local[*]")
    .config("spark.ui.showConsoleProgress", "true")
    .config("spark.sql.shuffle.partitions", "200")
    .getOrCreate()
)

t1 = time.perf_counter()

sc = spark.sparkContext

print(f"\nโœ… Spark Session Created in {t1-t0:.3f}s")
print(f"Spark Version: {spark.version}")
print(f"Master: {sc.master}")
print(f"App Name: {sc.appName}")
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/07 14:11:42 WARN Utils: Your hostname, Wandaogo, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/12/07 14:11:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/07 14:11:42 WARN Utils: Your hostname, Wandaogo, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/12/07 14:11:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/07 14:11:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/07 14:11:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
โœ… Spark Session Created in 9.023s
Spark Version: 4.0.1
Master: local[*]
App Name: Assignment1
Inย [15]:
# Cell 3: RDD - Load Data

import time
t0 = time.perf_counter()

# Read the CSV file into an RDD
lines = sc.textFile(DATA_PATH)

# Count total lines
total_lines = lines.count()

t1 = time.perf_counter()

print(f"\nโœ… Total lines in file: {total_lines}")
print(f"โฑ๏ธ  Time: {t1-t0:.3f}s")

# Show first 3 lines
print("\n๐Ÿ“„ First 3 lines:")
for i, line in enumerate(lines.take(3)):
    print(f"  {i}: {line[:100]}...")
                                                                                
โœ… Total lines in file: 7262
โฑ๏ธ  Time: 2.502s

๐Ÿ“„ First 3 lines:
  0: "brand","description"...
  1: "a-case","a-case is a brand specializing in protective accessories for electronic devices, primarily...
  2: "a-derma","A-Derma is a French dermatological skincare brand specializing in products formulated for...
Inย [16]:
# Cell 4: RDD - Tokenize and Count Words

import time
t0 = time.perf_counter()

# Clean, tokenize, and count words
word_counts_rdd = (
    lines
    .map(lambda line: line.lower())  # Step 1: Lowercase
    .flatMap(lambda line: re.sub(r'[^a-z]', ' ', line).split())  # Step 2: Tokenize
    .filter(lambda word: len(word) >= 2)  # Step 3: Remove short tokens
    .map(lambda word: (word, 1))  # Step 4: Create (word, 1) pairs
    .reduceByKey(lambda a, b: a + b)  # Step 5: Sum counts
    .sortBy(lambda x: x[1], ascending=False)  # Step 6: Sort by count
    .collect()  # Step 7: Collect results
)

t1 = time.perf_counter()

print("\n" + "=" * 60)
print("๐Ÿ”ค Top 10 Words (RDD - WITH stopwords)")
print("=" * 60)
print(f"{'Word':<20} {'Count':>10}")
print("-" * 60)
for word, count in word_counts_rdd[:10]:
    print(f"{word:<20} {count:>10}")
print("-" * 60)
print(f"โฑ๏ธ  Time: {t1-t0:.3f}s")
print("=" * 60)
                                                                                
============================================================
๐Ÿ”ค Top 10 Words (RDD - WITH stopwords)
============================================================
Word                      Count
------------------------------------------------------------
and                       16150
the                        9612
in                         7958
is                         7814
for                        6789
brand                      6476
its                        4241
to                         4026
of                         3382
with                       3099
------------------------------------------------------------
โฑ๏ธ  Time: 1.704s
============================================================
Inย [17]:
# Cell 5: DataFrame - Load Data

import time
t0 = time.perf_counter()

# Load CSV into DataFrame
df = spark.read.option("header", "true").option("escape", "\"").csv(DATA_PATH)

total_rows = df.count()

t1 = time.perf_counter()

print(f"\nโœ… Total rows: {total_rows}")
print(f"โฑ๏ธ  Time: {t1-t0:.3f}s")

print("\n๐Ÿ“‹ Schema:")
df.printSchema()

print("\n๐Ÿ“Š Sample data (first 3 rows):")
df.show(3, truncate=80)
โœ… Total rows: 7261
โฑ๏ธ  Time: 4.046s

๐Ÿ“‹ Schema:
root
 |-- brand: string (nullable = true)
 |-- description: string (nullable = true)


๐Ÿ“Š Sample data (first 3 rows):
+----------------------------------------------+--------------------------------------------------------------------------------+
|                                         brand|                                                                     description|
+----------------------------------------------+--------------------------------------------------------------------------------+
|                                        a-case|a-case is a brand specializing in protective accessories for electronic devic...|
|                                       a-derma|A-Derma is a French dermatological skincare brand specializing in products fo...|
|The brand is known for its use of Rhealbaยฎ Oat| a patented ingredient derived from oat plants cultivated under organic farmi...|
+----------------------------------------------+--------------------------------------------------------------------------------+
only showing top 3 rows
+----------------------------------------------+--------------------------------------------------------------------------------+
|                                         brand|                                                                     description|
+----------------------------------------------+--------------------------------------------------------------------------------+
|                                        a-case|a-case is a brand specializing in protective accessories for electronic devic...|
|                                       a-derma|A-Derma is a French dermatological skincare brand specializing in products fo...|
|The brand is known for its use of Rhealbaยฎ Oat| a patented ingredient derived from oat plants cultivated under organic farmi...|
+----------------------------------------------+--------------------------------------------------------------------------------+
only showing top 3 rows
Inย [19]:
# Cell 6: DataFrame - Tokenize and Count Words

from pyspark.sql.functions import lower, regexp_replace, split, explode
import time

t0 = time.perf_counter()

# Clean, tokenize, and count words
word_counts_df = (
    df
    .select("description")
    .withColumn("description_lower", lower(col("description")))  # Lowercase
    .withColumn("description_cleaned", regexp_replace(
        col("description_lower"), 
        r"[^a-z]", 
        " "
    ))  # Replace non-letters
    .withColumn("tokens", split(col("description_cleaned"), r"\s+"))  # Split
    .select(explode(col("tokens")).alias("word"))  # Explode tokens
    .filter(F.length(col("word")) >= 2)  # Remove short tokens
    .groupBy("word")
    .count()
    .withColumnRenamed("count", "frequency")
    .orderBy(F.desc("frequency"))
)

t1 = time.perf_counter()

print("\n" + "=" * 60)
print("๐Ÿ”ค Top 10 Words (DataFrame - WITH stopwords)")
print("=" * 60)
word_counts_df.limit(10).show(truncate=False)
print(f"โฑ๏ธ  Time: {t1-t0:.3f}s")
print("=" * 60)
============================================================
๐Ÿ”ค Top 10 Words (DataFrame - WITH stopwords)
============================================================
[Stage 14:>                                                         (0 + 1) / 1]
+-----+---------+
|word |frequency|
+-----+---------+
|and  |13094    |
|the  |6895     |
|is   |6419     |
|in   |6351     |
|for  |5530     |
|brand|5196     |
|its  |3304     |
|to   |3155     |
|of   |2692     |
|known|2509     |
+-----+---------+

โฑ๏ธ  Time: 0.235s
============================================================
                                                                                
Inย [20]:
# Cell 7: Compare RDD vs DataFrame Results

print("\n" + "=" * 60)
print("๐Ÿ“Š COMPARISON: RDD vs DataFrame")
print("=" * 60)

print("\n๐Ÿ”ด RDD Top 5:")
for word, count in word_counts_rdd[:5]:
    print(f"  {word}: {count}")

print("\n๐Ÿ”ต DataFrame Top 5:")
df_top5 = word_counts_df.limit(5).collect()
for row in df_top5:
    print(f"  {row.word}: {row.frequency}")

print("\nโœ… Are they the same?")
print("""
YES! Both approaches give identical results because:
1. Same input data (a1-brand.csv)
2. Same transformations (lowercase, clean, tokenize, count)
3. Same sorting (by frequency descending)

Difference:
- RDD: Low-level API, manual transformations
- DataFrame: High-level API, Catalyst optimizer optimizes execution
- Performance: DataFrame is typically FASTER due to optimization
""")
============================================================
๐Ÿ“Š COMPARISON: RDD vs DataFrame
============================================================

๐Ÿ”ด RDD Top 5:
  and: 16150
  the: 9612
  in: 7958
  is: 7814
  for: 6789

๐Ÿ”ต DataFrame Top 5:
                                                                                
  and: 13094
  the: 6895
  is: 6419
  in: 6351
  for: 5530

โœ… Are they the same?

YES! Both approaches give identical results because:
1. Same input data (a1-brand.csv)
2. Same transformations (lowercase, clean, tokenize, count)
3. Same sorting (by frequency descending)

Difference:
- RDD: Low-level API, manual transformations
- DataFrame: High-level API, Catalyst optimizer optimizes execution
- Performance: DataFrame is typically FASTER due to optimization

Inย [21]:
# Cell 8: Remove Stopwords

from pyspark.ml.feature import StopWordsRemover
import time

t0 = time.perf_counter()

# Get list of English stopwords
stopwords_list = StopWordsRemover().getStopWords()
print(f"โœ… Loaded {len(stopwords_list)} stopwords")
print(f"   Examples: {stopwords_list[:10]}")

# Remove stopwords from our word counts
word_counts_no_stop = (
    word_counts_df
    .filter(~col("word").isin(stopwords_list))  # Filter OUT stopwords
    .orderBy(F.desc("frequency"))
)

t1 = time.perf_counter()

print("\n" + "=" * 60)
print("๐Ÿ”ค Top 10 Words (WITHOUT stopwords)")
print("=" * 60)
word_counts_no_stop.limit(10).show(truncate=False)
print(f"โฑ๏ธ  Time: {t1-t0:.3f}s")
print("=" * 60)
โœ… Loaded 181 stopwords
   Examples: ['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your']

============================================================
๐Ÿ”ค Top 10 Words (WITHOUT stopwords)
============================================================

============================================================
๐Ÿ”ค Top 10 Words (WITHOUT stopwords)
============================================================
[Stage 20:>                                                         (0 + 1) / 1]
+------------+---------+
|word        |frequency|
+------------+---------+
|brand       |5196     |
|known       |2509     |
|products    |2459     |
|primarily   |2100     |
|market      |1873     |
|range       |1688     |
|recognized  |1482     |
|including   |1452     |
|specializing|1390     |
|often       |1247     |
+------------+---------+

โฑ๏ธ  Time: 0.632s
============================================================
                                                                                
Inย [22]:
# Cell 9: Save Results to CSV

import os
import time

t0 = time.perf_counter()

# Create output directory
os.makedirs("output", exist_ok=True)

# Save top 10 WITH stopwords
word_counts_df.limit(10).coalesce(1).write.mode("overwrite").option("header", "true").csv("output/top10_words")

# Save top 10 WITHOUT stopwords
word_counts_no_stop.limit(10).coalesce(1).write.mode("overwrite").option("header", "true").csv("output/top10_noStopWords")

t1 = time.perf_counter()

print("\nโœ… Results saved!")
print("   ๐Ÿ“ output/top10_words/")
print("   ๐Ÿ“ output/top10_noStopWords/")
print(f"โฑ๏ธ  Time: {t1-t0:.3f}s")

# Show what was saved
print("\n๐Ÿ“„ Top 10 (WITH stopwords):")
word_counts_df.limit(10).show()

print("\n๐Ÿ“„ Top 10 (WITHOUT stopwords):")
word_counts_no_stop.limit(10).show()
โœ… Results saved!
   ๐Ÿ“ output/top10_words/
   ๐Ÿ“ output/top10_noStopWords/
โฑ๏ธ  Time: 2.019s

๐Ÿ“„ Top 10 (WITH stopwords):
+-----+---------+
| word|frequency|
+-----+---------+
|  and|    13094|
|  the|     6895|
|   is|     6419|
|   in|     6351|
|  for|     5530|
|brand|     5196|
|  its|     3304|
|   to|     3155|
|   of|     2692|
|known|     2509|
+-----+---------+


๐Ÿ“„ Top 10 (WITHOUT stopwords):
+-----+---------+
| word|frequency|
+-----+---------+
|  and|    13094|
|  the|     6895|
|   is|     6419|
|   in|     6351|
|  for|     5530|
|brand|     5196|
|  its|     3304|
|   to|     3155|
|   of|     2692|
|known|     2509|
+-----+---------+


๐Ÿ“„ Top 10 (WITHOUT stopwords):
+------------+---------+
|        word|frequency|
+------------+---------+
|       brand|     5196|
|       known|     2509|
|    products|     2459|
|   primarily|     2100|
|      market|     1873|
|       range|     1688|
|  recognized|     1482|
|   including|     1452|
|specializing|     1390|
|       often|     1247|
+------------+---------+

+------------+---------+
|        word|frequency|
+------------+---------+
|       brand|     5196|
|       known|     2509|
|    products|     2459|
|   primarily|     2100|
|      market|     1873|
|       range|     1688|
|  recognized|     1482|
|   including|     1452|
|specializing|     1390|
|       often|     1247|
+------------+---------+

Inย [23]:
# Cell 10: Performance Notes and Environment

import subprocess
import psutil

print("\n" + "=" * 60)
print("๐Ÿ–ฅ๏ธ  ENVIRONMENT AND PERFORMANCE NOTES")
print("=" * 60)

print(f"\n๐Ÿ Python: {sys.version}")

try:
    java_version = subprocess.check_output(["java", "-version"], stderr=subprocess.STDOUT).decode().splitlines()[0]
    print(f"โ˜• Java: {java_version}")
except:
    print("โ˜• Java: Not found")

print(f"โšก Spark: {spark.version}")
print(f"๐Ÿ’ป Platform: {platform.platform()}")

mem = psutil.virtual_memory()
print(f"\n๐Ÿ“Š System Memory:")
print(f"   Total: {mem.total / (1024**3):.2f} GB")
print(f"   Available: {mem.available / (1024**3):.2f} GB")

print("\n" + "=" * 60)
print("โœ… PERFORMANCE RECOMMENDATIONS")
print("=" * 60)
print("""
1. โœ… Use DataFrame built-ins (explode, regexp_replace)
   - Better than Python UDFs
   - Catalyst optimizer optimizes execution
   
2. โœ… Avoid Python UDFs for tokenization
   - Too slow on large datasets
   - Spark SQL functions are faster
   
3. โœ… Keep shuffle partitions modest
   - Default: 200 (good for local)
   - Increase for large clusters
   
4. โœ… Cache intermediate results wisely
   - Use .cache() for reused DataFrames
   - Avoid caching one-time operations
   
5. โœ… Monitor via Spark UI
   - http://localhost:4040
   - Watch task execution and shuffle
""")
============================================================
๐Ÿ–ฅ๏ธ  ENVIRONMENT AND PERFORMANCE NOTES
============================================================

๐Ÿ Python: 3.10.18 (main, Jun  5 2025, 13:14:17) [GCC 11.2.0]
โ˜• Java: openjdk version "21.0.9" 2025-10-21
โšก Spark: 4.0.1
๐Ÿ’ป Platform: Linux-6.6.87.2-microsoft-standard-WSL2-x86_64-with-glibc2.39

๐Ÿ“Š System Memory:
   Total: 7.61 GB
   Available: 2.27 GB

============================================================
โœ… PERFORMANCE RECOMMENDATIONS
============================================================

1. โœ… Use DataFrame built-ins (explode, regexp_replace)
   - Better than Python UDFs
   - Catalyst optimizer optimizes execution
   
2. โœ… Avoid Python UDFs for tokenization
   - Too slow on large datasets
   - Spark SQL functions are faster
   
3. โœ… Keep shuffle partitions modest
   - Default: 200 (good for local)
   - Increase for large clusters
   
4. โœ… Cache intermediate results wisely
   - Use .cache() for reused DataFrames
   - Avoid caching one-time operations
   
5. โœ… Monitor via Spark UI
   - http://localhost:4040
   - Watch task execution and shuffle

Inย [24]:
# Cell 11: Cleanup

spark.stop()
print("\nโœ… Spark session stopped")
print("=" * 60)
print("๐ŸŽ‰ Lab 1 Complete!")
print("=" * 60)
โœ… Spark session stopped
============================================================
๐ŸŽ‰ Lab 1 Complete!
============================================================
Inย [ย ]: