Inย [11]:
import findspark, os
import sys, re
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.functions import col
import time, platform
import psutil, resource
from IPython.core.magic import register_cell_magic
import numpy as np
DATA_PATH = "/home/bibawandaogo/data engineering 1/data/a1-brand.csv"
print("=" * 60)
print("DE1 โ Lab 1: Word Count Assignment")
print("=" * 60)
print(f"\n๐ Data path: {DATA_PATH}")
print(f"โ
File exists: {os.path.exists(DATA_PATH)}")
print()
============================================================ DE1 โ Lab 1: Word Count Assignment ============================================================ ๐ Data path: /home/bibawandaogo/data engineering 1/data/a1-brand.csv โ File exists: True
Inย [12]:
# Cell 1: Time and Memory Measurement Tools
def _rss_bytes():
"""Get current memory usage in bytes"""
return psutil.Process(os.getpid()).memory_info().rss
def _ru_maxrss_bytes():
"""Get peak memory usage"""
ru = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
if platform.system() == "Darwin": # macOS
return int(ru)
else: # Linux
return int(ru) * 1024
@register_cell_magic
def timemem(line, cell):
"""Measure wall time and memory usage"""
ip = get_ipython()
rss_before = _rss_bytes()
peak_before = _ru_maxrss_bytes()
t0 = time.perf_counter()
result = ip.run_cell(cell)
t1 = time.perf_counter()
rss_after = _rss_bytes()
peak_after = _ru_maxrss_bytes()
wall = t1 - t0
rss_delta_mb = (rss_after - rss_before) / (1024*1024)
peak_delta_mb = (peak_after - peak_before) / (1024*1024)
print("\n" + "=" * 60)
print(f"โฑ๏ธ Wall time: {wall:.3f} s")
print(f"๐ RSS ฮ: {rss_delta_mb:+.2f} MB")
print(f"๐ Peak memory ฮ: {peak_delta_mb:+.2f} MB")
print("=" * 60)
return result
print("โ
Time and memory measurement tools loaded")
โ Time and memory measurement tools loaded
Inย [13]:
# Cell 2: Initialize Spark Session
%%timemem
spark = (
SparkSession.builder
.appName("Assignment1")
.master("local[*]")
.config("spark.ui.showConsoleProgress", "true")
.config("spark.sql.shuffle.partitions", "200")
.getOrCreate()
)
sc = spark.sparkContext
print("\nโ
Spark Session Created")
print(f"Spark Version: {spark.version}")
print(f"Master: {sc.master}")
print(f"App Name: {sc.appName}")
UsageError: Line magic function `%%timemem` not found.
Inย [14]:
# Cell 2: Initialize Spark Session (sans timemem)
import time
t0 = time.perf_counter()
spark = (
SparkSession.builder
.appName("Assignment1")
.master("local[*]")
.config("spark.ui.showConsoleProgress", "true")
.config("spark.sql.shuffle.partitions", "200")
.getOrCreate()
)
t1 = time.perf_counter()
sc = spark.sparkContext
print(f"\nโ
Spark Session Created in {t1-t0:.3f}s")
print(f"Spark Version: {spark.version}")
print(f"Master: {sc.master}")
print(f"App Name: {sc.appName}")
WARNING: Using incubator modules: jdk.incubator.vector Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties 25/12/07 14:11:42 WARN Utils: Your hostname, Wandaogo, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo) 25/12/07 14:11:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties 25/12/07 14:11:42 WARN Utils: Your hostname, Wandaogo, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo) 25/12/07 14:11:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/12/07 14:11:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 25/12/07 14:11:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
โ Spark Session Created in 9.023s Spark Version: 4.0.1 Master: local[*] App Name: Assignment1
Inย [15]:
# Cell 3: RDD - Load Data
import time
t0 = time.perf_counter()
# Read the CSV file into an RDD
lines = sc.textFile(DATA_PATH)
# Count total lines
total_lines = lines.count()
t1 = time.perf_counter()
print(f"\nโ
Total lines in file: {total_lines}")
print(f"โฑ๏ธ Time: {t1-t0:.3f}s")
# Show first 3 lines
print("\n๐ First 3 lines:")
for i, line in enumerate(lines.take(3)):
print(f" {i}: {line[:100]}...")
โ Total lines in file: 7262 โฑ๏ธ Time: 2.502s ๐ First 3 lines: 0: "brand","description"... 1: "a-case","a-case is a brand specializing in protective accessories for electronic devices, primarily... 2: "a-derma","A-Derma is a French dermatological skincare brand specializing in products formulated for...
Inย [16]:
# Cell 4: RDD - Tokenize and Count Words
import time
t0 = time.perf_counter()
# Clean, tokenize, and count words
word_counts_rdd = (
lines
.map(lambda line: line.lower()) # Step 1: Lowercase
.flatMap(lambda line: re.sub(r'[^a-z]', ' ', line).split()) # Step 2: Tokenize
.filter(lambda word: len(word) >= 2) # Step 3: Remove short tokens
.map(lambda word: (word, 1)) # Step 4: Create (word, 1) pairs
.reduceByKey(lambda a, b: a + b) # Step 5: Sum counts
.sortBy(lambda x: x[1], ascending=False) # Step 6: Sort by count
.collect() # Step 7: Collect results
)
t1 = time.perf_counter()
print("\n" + "=" * 60)
print("๐ค Top 10 Words (RDD - WITH stopwords)")
print("=" * 60)
print(f"{'Word':<20} {'Count':>10}")
print("-" * 60)
for word, count in word_counts_rdd[:10]:
print(f"{word:<20} {count:>10}")
print("-" * 60)
print(f"โฑ๏ธ Time: {t1-t0:.3f}s")
print("=" * 60)
============================================================ ๐ค Top 10 Words (RDD - WITH stopwords) ============================================================ Word Count ------------------------------------------------------------ and 16150 the 9612 in 7958 is 7814 for 6789 brand 6476 its 4241 to 4026 of 3382 with 3099 ------------------------------------------------------------ โฑ๏ธ Time: 1.704s ============================================================
Inย [17]:
# Cell 5: DataFrame - Load Data
import time
t0 = time.perf_counter()
# Load CSV into DataFrame
df = spark.read.option("header", "true").option("escape", "\"").csv(DATA_PATH)
total_rows = df.count()
t1 = time.perf_counter()
print(f"\nโ
Total rows: {total_rows}")
print(f"โฑ๏ธ Time: {t1-t0:.3f}s")
print("\n๐ Schema:")
df.printSchema()
print("\n๐ Sample data (first 3 rows):")
df.show(3, truncate=80)
โ Total rows: 7261 โฑ๏ธ Time: 4.046s ๐ Schema: root |-- brand: string (nullable = true) |-- description: string (nullable = true) ๐ Sample data (first 3 rows): +----------------------------------------------+--------------------------------------------------------------------------------+ | brand| description| +----------------------------------------------+--------------------------------------------------------------------------------+ | a-case|a-case is a brand specializing in protective accessories for electronic devic...| | a-derma|A-Derma is a French dermatological skincare brand specializing in products fo...| |The brand is known for its use of Rhealbaยฎ Oat| a patented ingredient derived from oat plants cultivated under organic farmi...| +----------------------------------------------+--------------------------------------------------------------------------------+ only showing top 3 rows +----------------------------------------------+--------------------------------------------------------------------------------+ | brand| description| +----------------------------------------------+--------------------------------------------------------------------------------+ | a-case|a-case is a brand specializing in protective accessories for electronic devic...| | a-derma|A-Derma is a French dermatological skincare brand specializing in products fo...| |The brand is known for its use of Rhealbaยฎ Oat| a patented ingredient derived from oat plants cultivated under organic farmi...| +----------------------------------------------+--------------------------------------------------------------------------------+ only showing top 3 rows
Inย [19]:
# Cell 6: DataFrame - Tokenize and Count Words
from pyspark.sql.functions import lower, regexp_replace, split, explode
import time
t0 = time.perf_counter()
# Clean, tokenize, and count words
word_counts_df = (
df
.select("description")
.withColumn("description_lower", lower(col("description"))) # Lowercase
.withColumn("description_cleaned", regexp_replace(
col("description_lower"),
r"[^a-z]",
" "
)) # Replace non-letters
.withColumn("tokens", split(col("description_cleaned"), r"\s+")) # Split
.select(explode(col("tokens")).alias("word")) # Explode tokens
.filter(F.length(col("word")) >= 2) # Remove short tokens
.groupBy("word")
.count()
.withColumnRenamed("count", "frequency")
.orderBy(F.desc("frequency"))
)
t1 = time.perf_counter()
print("\n" + "=" * 60)
print("๐ค Top 10 Words (DataFrame - WITH stopwords)")
print("=" * 60)
word_counts_df.limit(10).show(truncate=False)
print(f"โฑ๏ธ Time: {t1-t0:.3f}s")
print("=" * 60)
============================================================ ๐ค Top 10 Words (DataFrame - WITH stopwords) ============================================================
[Stage 14:> (0 + 1) / 1]
+-----+---------+ |word |frequency| +-----+---------+ |and |13094 | |the |6895 | |is |6419 | |in |6351 | |for |5530 | |brand|5196 | |its |3304 | |to |3155 | |of |2692 | |known|2509 | +-----+---------+ โฑ๏ธ Time: 0.235s ============================================================
Inย [20]:
# Cell 7: Compare RDD vs DataFrame Results
print("\n" + "=" * 60)
print("๐ COMPARISON: RDD vs DataFrame")
print("=" * 60)
print("\n๐ด RDD Top 5:")
for word, count in word_counts_rdd[:5]:
print(f" {word}: {count}")
print("\n๐ต DataFrame Top 5:")
df_top5 = word_counts_df.limit(5).collect()
for row in df_top5:
print(f" {row.word}: {row.frequency}")
print("\nโ
Are they the same?")
print("""
YES! Both approaches give identical results because:
1. Same input data (a1-brand.csv)
2. Same transformations (lowercase, clean, tokenize, count)
3. Same sorting (by frequency descending)
Difference:
- RDD: Low-level API, manual transformations
- DataFrame: High-level API, Catalyst optimizer optimizes execution
- Performance: DataFrame is typically FASTER due to optimization
""")
============================================================ ๐ COMPARISON: RDD vs DataFrame ============================================================ ๐ด RDD Top 5: and: 16150 the: 9612 in: 7958 is: 7814 for: 6789 ๐ต DataFrame Top 5:
and: 13094 the: 6895 is: 6419 in: 6351 for: 5530 โ Are they the same? YES! Both approaches give identical results because: 1. Same input data (a1-brand.csv) 2. Same transformations (lowercase, clean, tokenize, count) 3. Same sorting (by frequency descending) Difference: - RDD: Low-level API, manual transformations - DataFrame: High-level API, Catalyst optimizer optimizes execution - Performance: DataFrame is typically FASTER due to optimization
Inย [21]:
# Cell 8: Remove Stopwords
from pyspark.ml.feature import StopWordsRemover
import time
t0 = time.perf_counter()
# Get list of English stopwords
stopwords_list = StopWordsRemover().getStopWords()
print(f"โ
Loaded {len(stopwords_list)} stopwords")
print(f" Examples: {stopwords_list[:10]}")
# Remove stopwords from our word counts
word_counts_no_stop = (
word_counts_df
.filter(~col("word").isin(stopwords_list)) # Filter OUT stopwords
.orderBy(F.desc("frequency"))
)
t1 = time.perf_counter()
print("\n" + "=" * 60)
print("๐ค Top 10 Words (WITHOUT stopwords)")
print("=" * 60)
word_counts_no_stop.limit(10).show(truncate=False)
print(f"โฑ๏ธ Time: {t1-t0:.3f}s")
print("=" * 60)
โ Loaded 181 stopwords Examples: ['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your'] ============================================================ ๐ค Top 10 Words (WITHOUT stopwords) ============================================================ ============================================================ ๐ค Top 10 Words (WITHOUT stopwords) ============================================================
[Stage 20:> (0 + 1) / 1]
+------------+---------+ |word |frequency| +------------+---------+ |brand |5196 | |known |2509 | |products |2459 | |primarily |2100 | |market |1873 | |range |1688 | |recognized |1482 | |including |1452 | |specializing|1390 | |often |1247 | +------------+---------+ โฑ๏ธ Time: 0.632s ============================================================
Inย [22]:
# Cell 9: Save Results to CSV
import os
import time
t0 = time.perf_counter()
# Create output directory
os.makedirs("output", exist_ok=True)
# Save top 10 WITH stopwords
word_counts_df.limit(10).coalesce(1).write.mode("overwrite").option("header", "true").csv("output/top10_words")
# Save top 10 WITHOUT stopwords
word_counts_no_stop.limit(10).coalesce(1).write.mode("overwrite").option("header", "true").csv("output/top10_noStopWords")
t1 = time.perf_counter()
print("\nโ
Results saved!")
print(" ๐ output/top10_words/")
print(" ๐ output/top10_noStopWords/")
print(f"โฑ๏ธ Time: {t1-t0:.3f}s")
# Show what was saved
print("\n๐ Top 10 (WITH stopwords):")
word_counts_df.limit(10).show()
print("\n๐ Top 10 (WITHOUT stopwords):")
word_counts_no_stop.limit(10).show()
โ Results saved! ๐ output/top10_words/ ๐ output/top10_noStopWords/ โฑ๏ธ Time: 2.019s ๐ Top 10 (WITH stopwords): +-----+---------+ | word|frequency| +-----+---------+ | and| 13094| | the| 6895| | is| 6419| | in| 6351| | for| 5530| |brand| 5196| | its| 3304| | to| 3155| | of| 2692| |known| 2509| +-----+---------+ ๐ Top 10 (WITHOUT stopwords): +-----+---------+ | word|frequency| +-----+---------+ | and| 13094| | the| 6895| | is| 6419| | in| 6351| | for| 5530| |brand| 5196| | its| 3304| | to| 3155| | of| 2692| |known| 2509| +-----+---------+ ๐ Top 10 (WITHOUT stopwords): +------------+---------+ | word|frequency| +------------+---------+ | brand| 5196| | known| 2509| | products| 2459| | primarily| 2100| | market| 1873| | range| 1688| | recognized| 1482| | including| 1452| |specializing| 1390| | often| 1247| +------------+---------+ +------------+---------+ | word|frequency| +------------+---------+ | brand| 5196| | known| 2509| | products| 2459| | primarily| 2100| | market| 1873| | range| 1688| | recognized| 1482| | including| 1452| |specializing| 1390| | often| 1247| +------------+---------+
Inย [23]:
# Cell 10: Performance Notes and Environment
import subprocess
import psutil
print("\n" + "=" * 60)
print("๐ฅ๏ธ ENVIRONMENT AND PERFORMANCE NOTES")
print("=" * 60)
print(f"\n๐ Python: {sys.version}")
try:
java_version = subprocess.check_output(["java", "-version"], stderr=subprocess.STDOUT).decode().splitlines()[0]
print(f"โ Java: {java_version}")
except:
print("โ Java: Not found")
print(f"โก Spark: {spark.version}")
print(f"๐ป Platform: {platform.platform()}")
mem = psutil.virtual_memory()
print(f"\n๐ System Memory:")
print(f" Total: {mem.total / (1024**3):.2f} GB")
print(f" Available: {mem.available / (1024**3):.2f} GB")
print("\n" + "=" * 60)
print("โ
PERFORMANCE RECOMMENDATIONS")
print("=" * 60)
print("""
1. โ
Use DataFrame built-ins (explode, regexp_replace)
- Better than Python UDFs
- Catalyst optimizer optimizes execution
2. โ
Avoid Python UDFs for tokenization
- Too slow on large datasets
- Spark SQL functions are faster
3. โ
Keep shuffle partitions modest
- Default: 200 (good for local)
- Increase for large clusters
4. โ
Cache intermediate results wisely
- Use .cache() for reused DataFrames
- Avoid caching one-time operations
5. โ
Monitor via Spark UI
- http://localhost:4040
- Watch task execution and shuffle
""")
============================================================ ๐ฅ๏ธ ENVIRONMENT AND PERFORMANCE NOTES ============================================================ ๐ Python: 3.10.18 (main, Jun 5 2025, 13:14:17) [GCC 11.2.0] โ Java: openjdk version "21.0.9" 2025-10-21 โก Spark: 4.0.1 ๐ป Platform: Linux-6.6.87.2-microsoft-standard-WSL2-x86_64-with-glibc2.39 ๐ System Memory: Total: 7.61 GB Available: 2.27 GB ============================================================ โ PERFORMANCE RECOMMENDATIONS ============================================================ 1. โ Use DataFrame built-ins (explode, regexp_replace) - Better than Python UDFs - Catalyst optimizer optimizes execution 2. โ Avoid Python UDFs for tokenization - Too slow on large datasets - Spark SQL functions are faster 3. โ Keep shuffle partitions modest - Default: 200 (good for local) - Increase for large clusters 4. โ Cache intermediate results wisely - Use .cache() for reused DataFrames - Avoid caching one-time operations 5. โ Monitor via Spark UI - http://localhost:4040 - Watch task execution and shuffle
Inย [24]:
# Cell 11: Cleanup
spark.stop()
print("\nโ
Spark session stopped")
print("=" * 60)
print("๐ Lab 1 Complete!")
print("=" * 60)
โ Spark session stopped ============================================================ ๐ Lab 1 Complete! ============================================================
Inย [ย ]: