ZASQL_PYTHON Telegram 393
💫 Spark для аналитика (ч. 1)

Раньше в 💙 я работал немного со Spark, большая часть задач спокойно решалась внутри одной БД или выгрузкой в pandas.

Сейчас для моих задач Spark - это необходимость, чтобы не падал JupyterHub по оперативной памяти: все вычисления выполняются распределённо на кластере с большим объёмом ресурсов. Но это не волшебная таблетка, т.к. важно следить за тем, как используются ресурсы, грамотно настраивать Spark-приложения и оптимизировать запросы. На самом деле подход к работе с ресурсами здесь другой, и есть ряд ограничений, о которых расскажу в следующих постах 🙃

🥳 Как я использую сейчас

1. Собираю данные из разных источников
В реальных задачах часто нужно объединять сразу несколько источников: выгрузки из разных баз, parquet и тд. Пока всё влезает в pandas - норм, но когда данных слишком много, pandas начинает падать. Spark позволяет легко подтянуть все необходимые источники и собрать их в одну большую таблицу, не заботясь об ограничениях памяти.

2. Выполняю тяжёлые вычисления и агрегации
После того как все данные собраны, начинаются подсчеты метрик по большим объёмам данных. Здесь Spark выигрывает за счёт распределённых вычислений: вся тяжёлая работа идёт на кластере, а не на ноутбуке. Как только нужные агрегаты посчитаны, можно забрать результат и уже дальше анализировать, строить графики и т.д.

😍 Spark реально может работать дольше, чем pandas, если данных немного. Всё из-за архитектуры: Spark каждый раз поднимает распределённую систему, разбивает задачи на части, отправляет их на кластер и только потом собирает результат. Pandas же считает всё в оперативке и на небольших данных это быстрее почти всегда.

🔽Ниже прикрепляю основные функции для работы в Spark, которые я использую для решения задач аналитики

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, count

# запускаем Spark-сессию, тут еще можно закопаться в настройки приложения (если будет много 🐳, выложу)

spark = SparkSession.builder.appName("zasql_python").getOrCreate() # название приложения может быть произвольным

# читаем csv и кучу источников

df_csv = spark.read.csv("file.csv", header=True, inferSchema=True)
df_parquet = spark.read.parquet("file.parquet")
df_json = spark.read.json("file.json")

# джойним таблицы между собой

df_joined = df_csv.join(df_parquet, on="user_id", how="inner")

# фильтруем данные

df_filtered = df_joined.filter(df_joined["is_active"] == 1)

# применяем агрегирующие функции, считаем сумму строчек, среднее значение по заказам

df_grouped = (
df_filtered
.groupBy("country")
.agg(
count("*").alias("users_count"),
avg("order_sum").alias("avg_order_sum")
)
)

df_pandas = df_grouped.toPandas()


🐼 Очень похоже на pandas + можно в любой момент пересесть на spark.sql и писать в sql-формате любой запрос.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("zasql_python_sql").getOrCreate() # произвольное название приложения, должно быть другим, если запускаем параллельно

df_orders = spark.read.parquet("orders.parquet") # читаем в Spark DataFrame первый источник источник
df_users = spark.read.csv("users.csv", header=True, inferSchema=True) # читаем в Spark DataFrame второй источник

df_orders.createOrReplaceTempView("orders") # создаем темповые таблицы заказов
df_users.createOrReplaceTempView("users") # создаем темповые таблицы юзеров

# теперь читаем тут в sql-формате

query = """
SELECT
u.country,
COUNT(DISTINCT o.user_id) AS active_users,
AVG(o.order_sum) AS avg_order_sum
FROM orders o
JOIN users u ON o.user_id = u.user_id
WHERE o.is_active = 1
GROUP BY u.country
ORDER BY avg_order_sum DESC
"""

result = spark.sql(query) # читаем в spark.sql, результат тот же получаем, но в SQL-формате
result.show() # показать значения, но можно перевести и в pandas, но ресурсов много сожрет


Spark спасает, когда надо соединить и обработать десятки миллионов строк из разных источников, и обычный pandas падает по памяти, ядро умирает.

Ставьте
🐳, если хотите продолжение истории про Spark 💫
Please open Telegram to view this post
VIEW IN TELEGRAM
🐳158🔥1493👌2



tgoop.com/zasql_python/393
Create:
Last Update:

💫 Spark для аналитика (ч. 1)

Раньше в 💙 я работал немного со Spark, большая часть задач спокойно решалась внутри одной БД или выгрузкой в pandas.

Сейчас для моих задач Spark - это необходимость, чтобы не падал JupyterHub по оперативной памяти: все вычисления выполняются распределённо на кластере с большим объёмом ресурсов. Но это не волшебная таблетка, т.к. важно следить за тем, как используются ресурсы, грамотно настраивать Spark-приложения и оптимизировать запросы. На самом деле подход к работе с ресурсами здесь другой, и есть ряд ограничений, о которых расскажу в следующих постах 🙃

🥳 Как я использую сейчас

1. Собираю данные из разных источников
В реальных задачах часто нужно объединять сразу несколько источников: выгрузки из разных баз, parquet и тд. Пока всё влезает в pandas - норм, но когда данных слишком много, pandas начинает падать. Spark позволяет легко подтянуть все необходимые источники и собрать их в одну большую таблицу, не заботясь об ограничениях памяти.

2. Выполняю тяжёлые вычисления и агрегации
После того как все данные собраны, начинаются подсчеты метрик по большим объёмам данных. Здесь Spark выигрывает за счёт распределённых вычислений: вся тяжёлая работа идёт на кластере, а не на ноутбуке. Как только нужные агрегаты посчитаны, можно забрать результат и уже дальше анализировать, строить графики и т.д.

😍 Spark реально может работать дольше, чем pandas, если данных немного. Всё из-за архитектуры: Spark каждый раз поднимает распределённую систему, разбивает задачи на части, отправляет их на кластер и только потом собирает результат. Pandas же считает всё в оперативке и на небольших данных это быстрее почти всегда.

🔽Ниже прикрепляю основные функции для работы в Spark, которые я использую для решения задач аналитики

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, count

# запускаем Spark-сессию, тут еще можно закопаться в настройки приложения (если будет много 🐳, выложу)

spark = SparkSession.builder.appName("zasql_python").getOrCreate() # название приложения может быть произвольным

# читаем csv и кучу источников

df_csv = spark.read.csv("file.csv", header=True, inferSchema=True)
df_parquet = spark.read.parquet("file.parquet")
df_json = spark.read.json("file.json")

# джойним таблицы между собой

df_joined = df_csv.join(df_parquet, on="user_id", how="inner")

# фильтруем данные

df_filtered = df_joined.filter(df_joined["is_active"] == 1)

# применяем агрегирующие функции, считаем сумму строчек, среднее значение по заказам

df_grouped = (
df_filtered
.groupBy("country")
.agg(
count("*").alias("users_count"),
avg("order_sum").alias("avg_order_sum")
)
)

df_pandas = df_grouped.toPandas()


🐼 Очень похоже на pandas + можно в любой момент пересесть на spark.sql и писать в sql-формате любой запрос.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("zasql_python_sql").getOrCreate() # произвольное название приложения, должно быть другим, если запускаем параллельно

df_orders = spark.read.parquet("orders.parquet") # читаем в Spark DataFrame первый источник источник
df_users = spark.read.csv("users.csv", header=True, inferSchema=True) # читаем в Spark DataFrame второй источник

df_orders.createOrReplaceTempView("orders") # создаем темповые таблицы заказов
df_users.createOrReplaceTempView("users") # создаем темповые таблицы юзеров

# теперь читаем тут в sql-формате

query = """
SELECT
u.country,
COUNT(DISTINCT o.user_id) AS active_users,
AVG(o.order_sum) AS avg_order_sum
FROM orders o
JOIN users u ON o.user_id = u.user_id
WHERE o.is_active = 1
GROUP BY u.country
ORDER BY avg_order_sum DESC
"""

result = spark.sql(query) # читаем в spark.sql, результат тот же получаем, но в SQL-формате
result.show() # показать значения, но можно перевести и в pandas, но ресурсов много сожрет


Spark спасает, когда надо соединить и обработать десятки миллионов строк из разных источников, и обычный pandas падает по памяти, ядро умирает.

Ставьте
🐳, если хотите продолжение истории про Spark 💫

BY Заскуль питона (Data Science)


Share with your friend now:
tgoop.com/zasql_python/393

View MORE
Open in Telegram


Telegram News

Date: |

In the “Bear Market Screaming Therapy Group” on Telegram, members are only allowed to post voice notes of themselves screaming. Anything else will result in an instant ban from the group, which currently has about 75 members. How to create a business channel on Telegram? (Tutorial) How to create a business channel on Telegram? (Tutorial) Don’t publish new content at nighttime. Since not all users disable notifications for the night, you risk inadvertently disturbing them. Ng Man-ho, a 27-year-old computer technician, was convicted last month of seven counts of incitement charges after he made use of the 100,000-member Chinese-language channel that he runs and manages to post "seditious messages," which had been shut down since August 2020.
from us


Telegram Заскуль питона (Data Science)
FROM American