Análisis de Ventas y Stock con PySpark y SQL: Descubriendo Tendencias en el Comercio Minorista
En el mundo del comercio minorista, la toma de decisiones basada en datos es clave para optimizar estrategias de ventas y mejorar la gestión del inventario. En este artículo, exploraremos cómo usar Apache Spark con PySpark y SQL para analizar datos de ventas y stock de productos, identificando patrones clave en la compra de productos, la facturación por tienda y la disponibilidad de inventario.
📊 Configuración del Entorno con PySpark
Para este análisis, utilizamos PySpark, una potente herramienta para el procesamiento de datos a gran escala. La sesión de Spark se configura con:
pythonCopyEditfrom pyspark.sql import SparkSession
from pyspark.sql.functions import desc, count, col, row_number, avg, when, sum as sum, asc, rank
from pyspark.sql.window import Window
# Inicializar sesión de Spark
spark = SparkSession.builder.appName("ActGrupal").getOrCreate()
Los datos de ventas y stock se leen desde archivos CSV y JSON. En este caso, el dataset de compras se encuentra en el siguiente enlace de GitHub:
🔗 purchases.json en GitHub
pythonCopyEditdfventa = spark.read.json("ruta_a_purchases.json")
dfventa.createOrReplaceTempView("venta")
dfstock = spark.read.option("header", "true").option("inferSchema", "true").csv("ruta_a_stock.csv")
dfstock.createOrReplaceTempView("stock")
🔎 1. ¿Cuáles son los 10 productos más comprados?
Este análisis nos permite identificar los productos más populares según la cantidad de compras registradas.
🔹 Con PySpark API
pythonCopyEditventasProductoDF = dfventa.groupBy("product_id").count()
top10Productos = ventasProductoDF.orderBy(desc("count")).limit(10)
top10Productos.show()
🔹 Con SQL
sqlCopyEditSELECT product_id, COUNT(*) as total_ventas
FROM venta
GROUP BY product_id
ORDER BY total_ventas DESC
LIMIT 10;
🔍 Beneficio: Conocer los productos más vendidos ayuda a optimizar el inventario y planificar estrategias de marketing para maximizar ganancias.
📈 2. Porcentaje de Compra por Tipo de Producto
Este análisis permite conocer qué categorías de productos tienen mayor participación en las ventas.
🔹 Con PySpark API
pythonCopyEdittotalVentas = dfventa.count()
ventasPorTipoProducto = dfventa.groupBy("item_type").agg(count("*").alias("cuenta_ventas"))
ventasPorTipoProducto = ventasPorTipoProducto.withColumn("porcentaje", (col("cuenta_ventas") / totalVentas) * 100)
ventasPorTipoProducto.show()
🔹 Con SQL
sqlCopyEditSELECT item_type, COUNT(*) as cuenta_ventas, (COUNT(*) / (SELECT COUNT(*) FROM venta) * 100) as porcentaje
FROM venta
GROUP BY item_type;
🔍 Beneficio: Conocer la distribución de ventas por categoría permite mejorar la estrategia de compras y stock.
🏆 3. Los 3 Productos Más Comprados por Tipo de Producto
Este análisis nos ayuda a entender qué productos destacan en cada categoría.
🔹 Con PySpark API
pythonCopyEditventasPorProductoYTipo = dfventa.groupBy("item_type", "product_id").count().withColumnRenamed("count", "cuenta_ventas")
especificacionVentana = Window.partitionBy("item_type").orderBy(col("cuenta_ventas").desc())
top3ProductosPorTipo = ventasPorProductoYTipo.withColumn("rango", row_number().over(especificacionVentana)).filter(col("rango") <= 3)
top3ProductosPorTipo.show()
🔹 Con SQL
sqlCopyEditSELECT item_type, product_id, cuenta_ventas, rango
FROM (
SELECT item_type, product_id, COUNT(*) as cuenta_ventas,
ROW_NUMBER() OVER (PARTITION BY item_type ORDER BY COUNT(*) DESC) as rango
FROM venta
GROUP BY item_type, product_id
) AS ranked_ventas
WHERE rango <= 3;
🔍 Beneficio: Permite definir estrategias específicas para cada categoría, como promociones o ajustes de precios.
💰 4. Productos con Precios Superiores al Promedio
Este análisis identifica los productos premium en función de su precio medio.
🔹 Con PySpark API
pythonCopyEditprecioMedio = dfventa.agg(avg("price").alias("precio_promedio")).first()["precio_promedio"]
productosCaros = dfventa.filter(col("price") > precioMedio).orderBy(desc("price"))
productosCaros.show()
🔹 Con SQL
sqlCopyEditSELECT *, price
FROM venta
WHERE price > (SELECT AVG(price) FROM venta)
ORDER BY price DESC;
🔍 Beneficio: Ayuda a segmentar productos de alto valor y definir estrategias de marketing específicas.
🏪 5. Tienda con Más Ventas y Mayor Facturación
Aquí analizamos cuál tienda vende más productos y cuál genera más ingresos.
🔹 Con PySpark API
pythonCopyEdittop_tienda_ventas = dfventa.groupBy("shop_id").count().orderBy(desc("count")).show(1)
top_tienda_facturacion = dfventa.groupBy("shop_id").agg(sum("price").alias("total_facturado")).orderBy(desc("total_facturado")).show(1)
🔹 Con SQL
sqlCopyEditSELECT shop_id, COUNT(*) AS total_productos
FROM venta
GROUP BY shop_id
ORDER BY total_productos DESC
LIMIT 1;
SELECT shop_id, SUM(price) AS total_facturado
FROM venta
GROUP BY shop_id
ORDER BY total_facturado DESC
LIMIT 1;
🔍 Beneficio: Permite identificar las tiendas con mejor rendimiento y replicar su estrategia en otras ubicaciones.
🌍 6. Análisis Geográfico de Ventas y Uso de Métodos de Pago
Dividimos el mundo en 5 áreas geográficas según la longitud y analizamos dónde se usa más PayPal.
🔹 Con PySpark API
pythonCopyEditdf_ventas = dfventa.withColumn("Area",
when((col("location.lon") < -108), "Area1")
.when((col("location.lon") < -36), "Area2")
.when((col("location.lon") < 36), "Area3")
.when((col("location.lon") < 108), "Area4")
.otherwise("Area5"))
uso_paypal = df_ventas.filter(col("payment_type") == "paypal").groupBy("Area").count().orderBy(desc("count"))
uso_paypal.show(1)
🔹 Con SQL
sqlCopyEditSELECT Area, COUNT(*) as total_paypal
FROM (
SELECT *,
CASE
WHEN location.lon < -108 THEN 'Area1'
WHEN location.lon < -36 THEN 'Area2'
WHEN location.lon < 36 THEN 'Area3'
WHEN location.lon < 108 THEN 'Area4'
ELSE 'Area5'
END AS Area
FROM venta
) WHERE payment_type = 'paypal'
GROUP BY Area
ORDER BY total_paypal DESC
LIMIT 1;
🔍 Beneficio: Ayuda a adaptar las estrategias de pago y comercio electrónico en diferentes regiones.
📉 7. Productos con Stock Insuficiente
Este análisis muestra qué productos no tienen suficiente inventario para cubrir la demanda.
🔹 Con PySpark API
pythonCopyEditdfcruce = dfventa.groupBy("product_id").count().join(dfstock, on="product_id", how="left")
dfcruce = dfcruce.withColumn("sin_stock", when(col("quantity") < col("count"), True).otherwise(False))
dfcruce.filter(col("sin_stock") == True).show()
🔍 Beneficio: Evita pérdidas de ventas por falta de inventario y optimiza la reposición de productos.
🚀 Conclusión
Gracias a PySpark y SQL, podemos obtener información clave para optimizar la gestión de ventas y stock en tiendas físicas y en línea. Si te interesa más sobre este análisis, revisa el dataset completo en GitHub:
🔗 purchases.json en GitHub
Publicar comentario