Telegram Web
"Промежуточный ход" или как можно креативно оптимизировать расчёт витрины 2/2

🔸 И решения для этой проблемы я нашёл два, одно чудеснее другого:
* или рекурсивно вычислять связи, проверяя правила и цепляя запись к мастер-ключу всё более общего кластера
* или сгенерировать все возможные комбинации связей для записи по правилам, а потом уже схлопнуть кластер по общему правилу

Первое было отброшено из-за взрывного роста стадий в движке Trino -- с минимальной указанной вложенностью улетало за 450.
Второе даже работало, пока не вылезла ещё одна особенность данных на источнике:
ПО ОДНОМУ ПРАВИЛУ ВОЗМОЖНЫ ТЫСЯЧИ СОВПАДЕНИЙ, т.к. под "ИНН" tax_id могут скрываться компании, объединения и прочие юридические сущности.

Получаем классический row explosion, когда сотни тысяч записей превращаются в сотни миллионов и миллиарды. Упираемся в лимит с другой стороны. Тупик?

🔸 И тут аналитику нашей команды приходит в голову светлая идея:
* таких "tax_id" с кучей дублей не больше десятка на каждого клиента
* дак давайте сначала отбросим их, посчитаем остальные связи как обычно
* а потом в отдельном потоке посчитаем простым group by + min() и присоединим к остальным через union

🔸 Результат: падение по памяти после 25 минут натужной работы железа сменилось 3.5 минутами в обычном режиме. С полным сохранением логики алгоритма. Красота.

А какие у тебя были случаи успешной (или не очень) оптимизации в сфере DE?
#вести_с_полей

Собранные грабли на пути интеграции Spark + MinIO + Jupyter + Airflow в Docker Compose 1/?

Привет! Поделюсь с тобой историей того, как я отлаживал локальное окружение для DE задач. Серия постов будет выходить по паре штук несколько дней, взбодримся немного после затишья.

Это не будет готовым решением "бери и используй", хочу показать личную боль борьбы со всякими девопсовыми историями. Ну и может подкину пару идей "а что ещё стоит сразу проверить", прежде чем собирать все грабли самому.

🔸 Начало: наивная попытка

Стянул из докерхаба bitnami/spark:3.5.4 - думал, одна из последних версий будет работать из коробки. У образа миллионы скачиваний, и bitnami вроде сильный и известный бренд в докер-кругах. Написал простой код для чтения файла из MinIO:

spark = SparkSession.builder \
.appName("Test") \
.getOrCreate()

df = spark.read.csv("s3a://bucket/file.csv")


Запустил spark-submit и получил:
ClassNotFoundException: com.amazonaws.services.s3.AmazonS3Client


Оказалось, что базовый образ Spark не содержит драйверов для работы с S3-совместимыми хранилищами. Стало понятно, что нужно добавлять JAR'ы вручную. Хотя S3 это вроде как промышленный стандарт в последние годы, странно что до сих пор не включили в ядро...
Собранные грабли на пути интеграции Spark + MinIO + Jupyter + Airflow в Docker Compose 2/?

🔸 Первые грабли: jar'ы (пакеты с классами для Java приложений)

Быстро нагуглил, что нужны два драйвера:
• aws-java-sdk-bundle - для работы с AWS API (о господи какой он большой, мне же не нужны все возможные классы для AWS)
• hadoop-aws - для интеграции Hadoop с S3

Скачал последние версии aws-java-sdk-bundle-1.12.783.jar и hadoop-aws-3.4.1.jar. Логика была простая: "новее = лучше".

Скачал их курлом в Dockerfile в /opt/bitnami/spark/jars/ по совету из офф. репо, пересобрал контейнер. Запустил тот же код. Новая ошибка:
java.lang.NoSuchMethodError: org.apache.hadoop.fs.s3a.Retries.translateException


Выяснилось, что не все версии драйверов совместимы с выбранной версией Spark.

*Глубоко вздыхаю*. Ну да, и почему я каждый раз верю, что все проблемы уже позади, инструменты стали простыми и удобными, а я уже получил весь нужный опыт.

Что же встретится на пути дальше? Намёк в меме перед первым постом :)
#вести_с_полей

Собранные грабли на пути интеграции Spark + MinIO + Jupyter + Airflow в Docker Compose 3/?

🔸 Урок про версии или взгляд питониста на джава зависимости

Итак, вчера остановились на несовместимости версий. Было ли это очевидно со стороны, когда видишь сотни строк исключений и каких-то строк в духе "класс не найден"? Вообще нет, скорее было похоже на то что я просто пропустил какой-то ещё jar. И это меня окончательно увело с тропинки "полный бандл слишком много весит, выкину-ка я из него всё лишнее и соберу только нужное для S3 сам".

Перекопав несколько десятков страниц форумов и документации Spark, определил версии так:

1. Нашёл версию Hadoop в офф. репо spark: https://github.com/apache/spark/blob/v3.5.4/pom.xml#L125 . pom.xml это что-то вроде requirements.txt из Python мира

2. Для версии 3.3.4 нашёл версию hadoop-aws: https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws/3.3.4

3. Внизу на той же странице в разделе "Compile Dependencies" вышел на версию aws-java-sdk-bundle: https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bundle/1.12.262

Как находить совместимые зависимости для соседних версий bundle'а, если они не указаны явно на сайте, я не понял. Допускаю, что "предыдущая зависимость сохраняется, пока не указана новая". И что это решается установкой maven локально и сборкой своего pom.xml файла, но это было бы как-то слишком для первой итерации инфры.

Для Spark 3.5.4 рабочими оказались:
• aws-java-sdk-bundle-1.12.262
• hadoop-aws-3.3.4

Пересобрал с правильными версиями и ура, сессия прошла ошибки ненайденных классов. Код наконец-то запустился, но это ещё не значит что он работает :)
Собранные грабли на пути интеграции Spark + MinIO + Jupyter + Airflow в Docker Compose 4/?

🔸 MinIO подключение, которое подключается

Итак, простейший код спарк сессии компилировался, но MinIO недоступен. Настало время изучать конфигурацию спарк сессии для S3-совместимых хранилищ!

Нашёл такие ключевые конфиги для работы с MinIO:

spark = SparkSession.builder \
.master("spark://spark-master:7077") \ # SPARK_MASTER_URL
.config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \ # service name : port
.config("spark.hadoop.fs.s3a.access.key", "minio") \ # MINIO_ROOT_USER
.config("spark.hadoop.fs.s3a.secret.key", "password") \ # MINIO_ROOT_PASSWORD
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
.getOrCreate()


После этого сессия создалась и файлик был считан.

Знаешь ещё полезные параметры? Пиши в комменты!

А мне пора добавлять системе интерактивности и "уходить в Jupyter". И хотелось бы ещё UI заставить работать, о чём и будет следующая пара постов. Продолжу небольшую традицию анонсировать мемной картинкой грядущие проблемы :)
Собранные грабли на пути интеграции Spark + MinIO + Jupyter + Airflow в Docker Compose 5/?

Приключения дата инженера на поле девопса продолжаются.

🔸 Jupyter + UI проблемы в History Server с логами воркеров и Driver UI

Базовый функционал получилось поднять довольно быстро. По первым ссылкам в инете нашёл, что достаточно доустановить на образ спарка с джарниками jupyter jupyterlab boto3 py4j pyspark==3.5.4 и прописать SPARK_HOME и PYTHONPATH. Ещё по одной короткой инструкции из сети через генерацию ~/.jupyter/jupyter_notebook_config.py сбросил авторизацию.

В принципе, код сессии работает в Jupyter, ячейки бегают и даже что-то возвращают. Но в History server (Spark Master UI) при клике на "Application Detail UI" получаю ссылку вида http://a7b8c9d:4040 вместо localhost:4040. И ссылки на stdout, stderr воркеров не работают.

Но подожди, это же популярный образ от крупной компании, миллионы скачиваний... Почему настолько базовый функционал не запускается из коробки?! Тут я начинаю крепко задумываться о том, это я очень многого не понимаю или все вокруг борются с теми же проблемами и не желаются на это.

Причина в том, что внутри Docker сети каждый контейнер имеет свой hostname, который недоступен извне. Браузер не может подключиться к случайному container_id. А спарк не настроен по умолчанию на работу внутри докер сети, которая пробрасывается на локалхост через другие порты.
Собранные грабли на пути интеграции Spark + MinIO + Jupyter + Airflow в Docker Compose 6/?

Полное решение я не нашёл, частично поможет следующее:

1. Добавить в SparkSession кода приложения:
.config("spark.driver.bindAddress", "0.0.0.0") \
.config("spark.ui.port", "4040")


2. В docker-compose для jupyter сервиса вывести наружу 4040 порт:
ports:
- "4040:4040" # для Driver UI


3. Указать, чтобы выбранные порты для мастера и воркеров совпадали для "внутренней" и "внешней" сетей. И добавить пару переменных окружения. Например для мастера:
environment:
SPARK_MASTER_WEBUI_PORT: 9090 # переопределяет стандартный порт из контейнера
SPARK_PUBLIC_DNS: localhost # подменяет возвращаемую из UI ссылку
ports:
- "9090:9090"


4. Environment variable для воркера (для 2го воркера 9092 и т.д.):
environment:
SPARK_WORKER_WEBUI_PORT: 9091
SPARK_PUBLIC_DNS: localhost
ports:
- "9091:9091"


Теперь все worker ссылки в UI корректно указывают на localhost:[9091, 9092]. Для Application Detail UI работает прямой переход по localhost:4040, но клик через UI всё равно возвращает адрес контейнера...

Поделись идеями или решением, как можно заставить Spark Master UI возвращать localhost:4040 для Application Detail UI так, чтобы воркеры не перезапускались в цикле (теряя связь с драйвером и уходя в статус EXITED).

p.s. Кто догадается, о чём будет следующая пара постов?
Собранные грабли на пути интеграции Spark + MinIO + Jupyter + Airflow в Docker Compose 7/?

Верно, из названия пока не затронули только Airflow.

🔸 Airflow интеграция

При переходе к Airflow появилась новая проблема: SparkSubmitOperator не видит JAR'ы, установленные в образе Spark. И вообще не видит спарк.

Оказывается, для обращения к спарку нужно в сам контейнер воркеров airflow (ну или в общий образ в случае standalone) установить локально джаву и клиент спарка со всеми джарниками. Иначе у меня SparkSubmitOperator отказывался общаться с кластером. Может, это я здесь чего-то не понял, и можно всё сильно проще делать?

Но теперь я точно понял, почему существующие сборки в сети используют один базовый образ и от него уже наследуют остальные docker image. Мне пришлось изрядно накопипастить, чтобы сохранить простоту сборки через Docker compose и не плодить .sh скрипты:

  build:
context: ./airflow
dockerfile: Dockerfile


p.s. Нашёл, что есть поддержка multi-stage build через ключевое слово target: https://stackoverflow.com/questions/53093487/multi-stage-build-in-docker-compose

Выглядит как хороший повод вернуться и опять всё немного сломать, чтобы через время починить и стало немного лучше ;)
Собранные грабли на пути интеграции Spark + MinIO + Jupyter + Airflow в Docker Compose 8/8

🔸 Вместо заключения

Из меня получился бы не очень успешный девопс, если бы большая часть задач по работе была про инфру. Как бы ни казалось всё просто и понятно вначале, сколько бы ни существовало готовых сборок в сети, всё равно уходят десятки часов на отладку работы и интеграции сервисов, нахождение совместимых версий зависимостей и скармливание этого всего Docker compose для воспроизводимости.

Но это всё получается преодолеть, особенно если отбрасывать в сторону сопротивление и читать официальную документацию. Ну и форумы, а не только общаться с GPT — объяснения живых людей пока на порядок надёжнее и понятнее потуг ИскИн'а (AI).

Самое забавное, что даже использование IaC технологий вроде Docker не гарантирует воспроизводимости. Потому что доступ к репозиториям могут закрыть, сервис может переехать на другой хост, кто-то может грязно пользоваться SemVer и пихать обновления в ту же версию или эту версию в принципе могут удалить из индекса, и она перестанет определяться по сети. Ну и кто-то наверняка заметил, что в одном из прошлых постов у пайтон пакетов я закрепил версию только для pyspark ;)

Так что буду разбираться с локальным кэшированием джарников и других зависимостей через Artifact Registry вроде nexus и размещать готовые версии образов в приватном аналоге dockerhub.

У меня пока всё, но не прощаюсь. А тебе знакома боль настройки локального (или даже продакшен) окружения? Делись историями в комментах ;)
#реклама #рекомендация

Про семантический слой и LLM-боты к данным 1/2

🔸 В последнее время всё чаще мелькают дискуссии вокруг Self-Service BI и чат-интерфейсов к данным. Мол, скоро забудем дашборды и будем кверить данные прямо в чате.
Регулярно встречаю статьи от Хабра до Медиума, где кто-то показывает, как легко можно сделать Text-to-SQL агента и общаться с базой на естественном языке. Обычно в качестве примера демонстрируют, как модели ловко справляются с запросом в духе «Выручка за последний год».

Но в реальной жизни никто не приходит в БД с вопросом “Посчитай выручку за месяц”, не учитывая возвраты, бонусы и особенности учёта у разных подразделений. Простой Text-to-SQL агент без понимания бизнес-логики бесполезен.

Мне это чем-то напоминает известный мем про
select * from some_ideal_clean_and_pristine.table_that_you_think_exist


🔸 Другая проблема в том, что LLM падки на галлюцинации и генерацию цифр, которые не существуют в реальности.
Чтобы сделать надёжный llm-интерфейс к данным, используют четыре подхода:
1. structured output generation — сначала модель выдаёт проверяемый код (SQL, Python), а уже потом текст
2. strict rules enforcement — автоматическая валидация запросов по заранее прописанным бизнес-правилам.
3. system prompt enhancements — промпты, обогащённые метаданными (источник, период, ключевые переменные)
4. semantic layer — словари синонимов, кастомные правила и онтологии, переводящие «человеческий» запрос в точный термин БД
Про семантический слой и LLM-боты к данным 2/2

🔸 Разберем отдельно семантический слой. Он в чём-то объединяет все описанные выше техники:
— Вывод структурированных данных прежде, чем формировать текстовый ответ (SQL-запросы или код всегда проверяемы),
— Жёсткое соблюдение встроенных правил (минимальные объёмы выборки, фиксированные временные интервалы),
— Улучшенные системные промпты, обогащённые метаданными (источник, период, важные переменные),

Поэтому роль инженеров данных сместится с написания SQL-запросов на управление метаданным, промпт-инжиниринг и обучение моделей. Вместо ручного создания ETL-пайплайнов они будут использовать LLM-агентов и дорабатывать решения с их помощью в рамках существующей экосистемы.

В конце концов не стоит забывать, что мы пришли ко всему этому с SQL, который был создан с мыслью о том, чтобы те, кому нужны данные, могли сами писать запросы.

Если интересна эта тема, подписывайтесь на канал про семантический слой от команды Synmetrix: @semanticlayer
#вести_с_полей

🌺 .airflowignore или как в нашей команде ускорили обновление дагов на порядок

🔸 В Airflow есть аналог .gitignore — конфигурационный файлик, который нужно поместить в корень DAG_FOLDER.

В нашем сетапе, как и у многих, есть генератор дагов, который на основе кода и конфигов складывает .py файлы в отдельную директорию и потом синхронизирует их через гит. Для запуска дага этот код нужен, а для парсинга и генерации дагов — нет, т.е. структура из тасок и их зависимостей не меняется.

🔸 В Airflow есть отдельный компонент scheduler, одна из задач которого — парсить даги и записывать их структуру в бэкенд-базу. В итоге это всё и попадает в UI, и определяет последовательность запуска.

У нас есть сотни .py и связанных с ними файлов для дата-приложений. По этим файлам и вложенным директориям бедный шедулер пробегался в каждом цикле — и конечно же он не успевал это делать за адекватное время — цикл загрузки обновлений дага выходил далеко за 5 минут. После добавления в корень файла .airflowignore с одной строкой всё начало "летать".

.*


Под капотом используется re.findall(), т.е. у нас пропускаются все файлы, где всё равно нет ни одного дага.

Проверь, может и на твоём проекте пригодится ;)
Please open Telegram to view this post
VIEW IN TELEGRAM
🔸 Ещё один мок-собес вышел, на этот раз с Junior DE

Знаю, что прошлое видео с джуном многим зашло -- welcome. В этот раз пришёл крепкий джун, верю в успешность его карьеры :)

И напоминаю, что не обязательно расти до его уровня, прежде чем идти на собеседования самим. Пробуйте, ошибайтесь, делайте выводы, пробуйте снова

https://youtu.be/mMrjKEgBWXI
2025/05/29 08:19:13
Back to Top
HTML Embed Code: