tgoop.com/rzv_de/304
Create:
Last Update:
Last Update:
Чтобы распарсить SQL, нужно всего лишь написать правильную регулярку. Правда? (Часть 2) 2/2
Тогда при помощи такого кода мы можем найти все ссылки на таблицы-источники и заменить их на {{ source() }}
макрос
import re
from sqllineage.runner import LineageRunner
DEFAULT_SCHEMA = "stg"
runner = LineageRunner(sql, dialect="trino") # added dialect
result_sql = sql
for table in runner.source_tables:
table_str = str(table)
parts = table_str.split(".")
table_name = parts[-1]
# Determine schema
if parts[-2] == "<default>":
schema_in_sql = DEFAULT_SCHEMA
# Match both standalone table and table
pattern = rf"(from|join)\s+{re.escape(table_name)}"
else:
schema_in_sql = parts[-2]
# Match both schema.table and schema.table
pattern = rf"(from|join)\s+{re.escape(schema_in_sql)}\.{re.escape(table_name)}"
# Find all matches with case insensitivity
for match in re.finditer(pattern, result_sql, re.IGNORECASE):
original = match.group(0)
keyword = match.group(1) # FROM or JOIN
replacement = f"{keyword} {{{{ source('{schema_in_sql}', '{table_name}') }}}}"
# Replace just this specific occurrence
result_sql = result_sql.replace(original, replacement, 1)
print(result_sql)
Обрати внимание, что корректно обработаны названия CTE, случаи с упоминанием таблицы без схемы.
p.s. без регулярок не обошлось, потому что результат парсинга не связан с исходным SQL кодом, и для замены нужно найти верное место. А может и я не до конца разобрался :)
BY rzv Data Engineering
Share with your friend now:
tgoop.com/rzv_de/304