import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from dotenv import load_dotenv
import pandas as pd
load_dotenv()
DATA_DIR = os.environ["DATA_DIR"]
OLIST_DIR = os.path.join(DATA_DIR, "olist")
spark = (
SparkSession.builder
.appName("olist-01-intro")
.master("local[*]")
.config("spark.sql.session.timeZone", "UTC")
.getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")Customer Analytics with Olist, Part 1: Data Setup
Let’s explore the Olist datasets. Olist is a Brazilian marketplace aggregator where small businesses sell products online.
The Brazilian E-Commerce dataset is a snapshot of Olist’s marketplace from the customer side. The dataset records what happened after someone clicked “buy”: the order, what was in it, how it was paid for, how long delivery took, and what the customer thought of it (the review). Nine tables, ~100k orders, Sep 2016-Aug 2018.
The Marketing Funnel dataset is about the seller side - how sellers got onto the platform. To sell on Olist, a business had to fill out a contact form (becoming a “Marketing Qualified Lead”, MQL), get called by a sales rep, go through a consultancy, and either sign up or drop out. The dataset has two tables recording the B2B sales pipeline - 8k MQLs who requested contact between June 2017 and June 2018 to become Olist sellers.
Both datasets share a seller_id key, making it possible to trace a path from a marketing lead through seller onboarding to the orders and reviews that seller generates. Once a closed deal (marketing funnel) becomes a seller (e-commerce). You can follow them, and answer questions like “do sellers who came in through organic search perform better than paid search sellers?”
Let’s load all eleven tables, map the schema and relationships, and run a first-pass data quality check. For dataset download instructions, see Downloading Datasets with the Kaggle API.
Why PySpark?
The Olist dataset is small enough to fit in memory on a laptop. Pandas would be faster and lighter here. The main reason to use PySpark here is learning to think in lazily evaluated, partitioned DataFrames.
We’ll run PySpark in local mode with all available cores (local[*]), which gives the full API with no cluster overhead.
Setup
We load all eleven tables into one dict:
files = {
"orders": "olist_orders_dataset.csv",
"customers": "olist_customers_dataset.csv",
"order_items": "olist_order_items_dataset.csv",
"order_payments": "olist_order_payments_dataset.csv",
"order_reviews": "olist_order_reviews_dataset.csv",
"products": "olist_products_dataset.csv",
"sellers": "olist_sellers_dataset.csv",
"geolocation": "olist_geolocation_dataset.csv",
"category_translation": "product_category_name_translation.csv",
"mql": "olist_marketing_qualified_leads_dataset.csv",
"closed_deals": "olist_closed_deals_dataset.csv",
}
dfs = {
name: spark.read.csv(os.path.join(OLIST_DIR, fname), header=True, inferSchema=True)
for name, fname in files.items()
}
orders, customers, order_items = dfs["orders"], dfs["customers"], dfs["order_items"]
order_payments, order_reviews = dfs["order_payments"], dfs["order_reviews"]
products, sellers, geolocation = dfs["products"], dfs["sellers"], dfs["geolocation"]
category_translation = dfs["category_translation"]
mql, closed_deals = dfs["mql"], dfs["closed_deals"]The Eleven Tables
| table | rows | columns |
|---|---|---|
| orders | 99,441 | 8 |
| customers | 99,441 | 5 |
| order_items | 112,650 | 7 |
| order_payments | 103,886 | 5 |
| order_reviews | 104,162 | 7 |
| products | 32,951 | 9 |
| sellers | 3,095 | 4 |
| geolocation | 1,000,163 | 5 |
| category_translation | 71 | 2 |
| mql | 8,000 | 4 |
| closed_deals | 842 | 14 |
Table Relationships
The seller_id column in closed_deals is the bridge between the two datasets.
orders is the hub of the e-commerce schema. sellers is the bridge point to the funnel: closed_deals records which MQL became which seller, so joining closed_deals → sellers → order_items → orders traces a lead all the way through to individual customer transactions.
customer_id vs customer_unique_id
customers has both customer_id and customer_unique_id. The customer_id is order-scoped — a repeat buyer gets a new customer_id for each order. The customer_unique_id is the stable person identifier. Getting this wrong inflates the apparent customer count and breaks retention analysis.
geolocation is a reference table joined on zip prefix. It has multiple lat/lng entries per prefix, so joins need aggregation or deduplication.
%%{init: {"themeVariables": {"fontSize": "1.4em"}}}%%
erDiagram
orders {
string order_id PK
string customer_id FK
string order_status
timestamp order_purchase_timestamp
timestamp order_approved_at
timestamp order_delivered_carrier_date
timestamp order_delivered_customer_date
timestamp order_estimated_delivery_date
}
customers {
string customer_id PK
string customer_unique_id
string customer_zip_code_prefix
string customer_city
string customer_state
}
order_items {
string order_id FK
int order_item_id
string product_id FK
string seller_id FK
timestamp shipping_limit_date
float price
float freight_value
}
order_payments {
string order_id FK
int payment_sequential
string payment_type
int payment_installments
float payment_value
}
order_reviews {
string review_id PK
string order_id FK
int review_score
string review_comment_title
string review_comment_message
timestamp review_creation_date
timestamp review_answer_timestamp
}
products {
string product_id PK
string product_category_name FK
int product_name_lenght
int product_description_lenght
int product_photos_qty
int product_weight_g
int product_length_cm
int product_height_cm
int product_width_cm
}
sellers {
string seller_id PK
string seller_zip_code_prefix
string seller_city
string seller_state
}
geolocation {
string geolocation_zip_code_prefix
float geolocation_lat
float geolocation_lng
string geolocation_city
string geolocation_state
}
category_translation {
string product_category_name PK
string product_category_name_english
}
mql {
string mql_id PK
date first_contact_date
string landing_page_id
string origin
}
closed_deals {
string mql_id FK
string seller_id FK
string sdr_id
string sr_id
date won_date
string business_type
string lead_type
string lead_behaviour_profile
boolean has_company
boolean has_gtin
string average_stock
string business_segment
string declared_product_subcategory
float declared_monthly_revenue
}
orders ||--o{ order_items : "order_id"
orders ||--o{ order_payments : "order_id"
orders ||--o{ order_reviews : "order_id"
orders }o--|| customers : "customer_id"
order_items }o--|| products : "product_id"
order_items }o--|| sellers : "seller_id"
products }o--o| category_translation : "category_name"
customers }o--o{ geolocation : "zip_code_prefix"
sellers }o--o{ geolocation : "zip_code_prefix"
closed_deals }o--|| sellers : "seller_id"
closed_deals }o--o| mql : "mql_id"
Schema
Schema of a table can be explored with printSchema(), which shows column names, data types, and nullability. Here is just one example.
root
|-- order_id: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- order_status: string (nullable = true)
|-- order_purchase_timestamp: timestamp (nullable = true)
|-- order_approved_at: timestamp (nullable = true)
|-- order_delivered_carrier_date: timestamp (nullable = true)
|-- order_delivered_customer_date: timestamp (nullable = true)
|-- order_estimated_delivery_date: timestamp (nullable = true)
Data Quality
Missing Values
def null_summary(df, name, n_rows):
null_counts = (
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns])
.collect()[0]
.asDict()
)
return [
{"table": name, "column": col, "nulls": cnt, "pct": round(cnt / n_rows * 100, 1)}
for col, cnt in null_counts.items()
if cnt > 0
]
all_nulls = []
for name, df in dfs.items():
all_nulls.extend(null_summary(df, name, row_counts[name]))
null_df = pd.DataFrame(all_nulls, columns=["table", "column", "nulls", "pct"])
(
null_df
.sort_values(["table", "nulls"], ascending=[True, False])
.style.format({"nulls": "{:,}", "pct": "{:.1f}%"})
.hide(axis="index")
)[Stage 76:=======> (1 + 7) / 8]
| table | column | nulls | pct |
|---|---|---|---|
| closed_deals | has_company | 779 | 92.5% |
| closed_deals | has_gtin | 778 | 92.4% |
| closed_deals | average_stock | 776 | 92.2% |
| closed_deals | declared_product_catalog_size | 773 | 91.8% |
| closed_deals | lead_behaviour_profile | 177 | 21.0% |
| closed_deals | business_type | 10 | 1.2% |
| closed_deals | lead_type | 6 | 0.7% |
| closed_deals | business_segment | 1 | 0.1% |
| mql | origin | 60 | 0.8% |
| order_reviews | review_comment_title | 92,157 | 88.5% |
| order_reviews | review_comment_message | 63,079 | 60.6% |
| order_reviews | review_answer_timestamp | 8,785 | 8.4% |
| order_reviews | review_creation_date | 8,764 | 8.4% |
| order_reviews | review_score | 2,380 | 2.3% |
| order_reviews | order_id | 2,236 | 2.1% |
| order_reviews | review_id | 1 | 0.0% |
| orders | order_delivered_customer_date | 2,965 | 3.0% |
| orders | order_delivered_carrier_date | 1,783 | 1.8% |
| orders | order_approved_at | 160 | 0.2% |
| products | product_category_name | 610 | 1.9% |
| products | product_name_lenght | 610 | 1.9% |
| products | product_description_lenght | 610 | 1.9% |
| products | product_photos_qty | 610 | 1.9% |
| products | product_weight_g | 2 | 0.0% |
| products | product_length_cm | 2 | 0.0% |
| products | product_height_cm | 2 | 0.0% |
| products | product_width_cm | 2 | 0.0% |
A few things stand out:
orders—order_approved_at,order_delivered_carrier_date, andorder_delivered_customer_dateare null for orders that were cancelled or never shipped. These are structurally valid nulls, not data errors. When computing delivery-time metrics, we filter onorder_status == 'delivered'.order_reviews—review_comment_titleandreview_comment_messageare mostly null. Reviews are often score-only. Any text analysis must account for the sparse coverage.products— a small number of product dimension columns (weight,length, etc.) are null. A handful of products never had their physical attributes filled in.closed_deals— several self-declared fields (declared_monthly_revenue,declared_product_subcategory,average_stock) are frequently null. These are optional fields filled in during the sales consultancy and should be treated as sparse signals, not reliable features.
Duplicate Keys
def duplicate_count(df, key_col):
return (
df.groupBy(key_col)
.count()
.filter(F.col("count") > 1)
.count()
)
checks = {
"orders.order_id": duplicate_count(orders, "order_id"),
"customers.customer_id": duplicate_count(customers, "customer_id"),
"products.product_id": duplicate_count(products, "product_id"),
"sellers.seller_id": duplicate_count(sellers, "seller_id"),
"mql.mql_id": duplicate_count(mql, "mql_id"),
"closed_deals.mql_id": duplicate_count(closed_deals, "mql_id"),
}
pd.DataFrame(
[(k, v) for k, v in checks.items()],
columns=["key", "duplicate_groups"],
).style.hide(axis="index")| key | duplicate_groups |
|---|---|
| orders.order_id | 0 |
| customers.customer_id | 0 |
| products.product_id | 0 |
| sellers.seller_id | 0 |
| mql.mql_id | 0 |
| closed_deals.mql_id | 0 |
Primary keys are clean across the board. The plan dataset notes that a seller can appear in multiple MQL records (different landing pages), so closed_deals.mql_id may not be unique by design. closed_deals.seller_id is the join key to the e-commerce data.
Date Ranges
orders_range = orders.agg(
F.min("order_purchase_timestamp").alias("min"),
F.max("order_purchase_timestamp").alias("max"),
).collect()[0]
mql_range = mql.agg(
F.min("first_contact_date").alias("min"),
F.max("first_contact_date").alias("max"),
).collect()[0]
pd.DataFrame([
{"dataset": "orders (purchase)", "from": orders_range["min"], "to": orders_range["max"]},
{"dataset": "mql (first contact)", "from": mql_range["min"], "to": mql_range["max"]},
]).style.hide(axis="index")| dataset | from | to |
|---|---|---|
| orders (purchase) | 2016-09-04 22:15:19 | 2018-10-17 18:30:18 |
| mql (first contact) | 2017-06-14 | 2018-05-31 |
| year | orders |
|---|---|
| 2016 | 329 |
| 2017 | 45,101 |
| 2018 | 54,011 |
The e-commerce data runs Sep 2016–Aug 2018. The funnel data covers Jun 2017–Jun 2018. The overlap window is roughly one year.
The handful of 2016 orders represent the marketplace’s early months — thin enough that seasonality analysis should restrict to the full-year windows 2017 and 2018.
Summary
unique_customers = customers.select("customer_unique_id").distinct().count()
mql_count = row_counts["mql"]
closed_count = row_counts["closed_deals"]
summary = pd.DataFrame([
("Total orders", f"{row_counts['orders']:,}"),
("Delivered orders", f"{orders.filter(F.col('order_status') == 'delivered').count():,}"),
("Unique customers", f"{unique_customers:,}"),
("Unique sellers", f"{row_counts['sellers']:,}"),
("Unique products", f"{row_counts['products']:,}"),
("Product categories (PT)", f"{products.select('product_category_name').distinct().count():,}"),
("E-commerce date range", "Sep 2016 – Aug 2018"),
("MQLs", f"{mql_count:,}"),
("Closed deals", f"{closed_count:,}"),
("Funnel date range", "Jun 2017 – Jun 2018"),
], columns=["metric", "value"])
summary.style.hide(axis="index")| metric | value |
|---|---|
| Total orders | 99,441 |
| Delivered orders | 96,478 |
| Unique customers | 96,096 |
| Unique sellers | 3,095 |
| Unique products | 32,951 |
| Product categories (PT) | 74 |
| E-commerce date range | Sep 2016 – Aug 2018 |
| MQLs | 8,000 |
| Closed deals | 842 |
| Funnel date range | Jun 2017 – Jun 2018 |
The e-commerce data shows a near-zero repeat-purchase rate — roughly 100 000 orders across 96 000 unique customers. Most customers bought once and never returned, which is structurally different from subscription or high-frequency retail data. This shapes how we interpret retention metrics: “churned” means something different here.
The funnel data tells the seller side: ~8 000 MQLs, a fraction of which became the sellers visible in the e-commerce data.