Найти Π² Π”Π·Π΅Π½Π΅
CODERIKK

πŸ”Ή ΠŸΠ»Π°Π½ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠ΅ ETL: расписания ΠΈ зависимости

πŸ”Ή Как Π·Π°ΠΏΡƒΡΠΊΠ°Ρ‚ΡŒ ETL-ΠΏΠ°ΠΉΠΏΠ»Π°ΠΉΠ½ ΠΏΠΎ Ρ€Π°ΡΠΏΠΈΡΠ°Π½ΠΈΡŽ ΠΈ ΡƒΡ‡ΠΈΡ‚Ρ‹Π²Π°Ρ‚ΡŒ зависимости? πŸ”Έ Π‘ΡƒΡ‚ΡŒ: Π½ΡƒΠΆΠ½Ρ‹ рСгулярныС запуски, Ρ‡Ρ‚ΠΎΠ±Ρ‹ Π΄Π°Π½Π½Ρ‹Π΅ Π±Ρ‹Π»ΠΈ свСТими, ΠΈ ΠΊΠΎΠ½Ρ‚Ρ€ΠΎΠ»ΡŒ зависимостСй, Ρ‡Ρ‚ΠΎΠ±Ρ‹ Π·Π°Π΄Π°Ρ‡ΠΈ Π½Π΅ ΠΏΠ΅Ρ€Π΅ΠΊΡ€Ρ‹Π²Π°Π»ΠΈΡΡŒ ΠΈ Π½Π΅ ΠΏΡ€ΠΈΠ²ΠΎΠ΄ΠΈΠ»ΠΈ ΠΊ нСконсистСнтности Π΄Π°Π½Π½Ρ‹Ρ…. Π‘Π΅Π· этого β€” Π΄ΡƒΠ±Π»ΠΈ, Π³ΠΎΠ½ΠΊΠΈ ΠΈ Π½Π΅ΠΏΠΎΠ»Π½Ρ‹Π΅ Π·Π°Π³Ρ€ΡƒΠ·ΠΊΠΈ. πŸ”Έ cron β€” это расписаниС ΠΏΠΎ Π²Ρ€Π΅ΠΌΠ΅Π½ΠΈ. ΠŸΡ€ΠΈΠΌΠ΅Ρ€: 0 2 * * * запускаСт Π² 02:00. ΠŸΠΎΠ΄Ρ…ΠΎΠ΄ΠΈΡ‚ для простой пСриодичности, Π½ΠΎ cron Π½Π΅ Π·Π½Π°Π΅Ρ‚ ΠΎ порядкС Π·Π°Π΄Π°Ρ‡ ΠΈ Π½Π΅ управляСт retry. πŸ”Έ DAG (Directed Acyclic Graph β€” Π½Π°ΠΏΡ€Π°Π²Π»Π΅Π½Π½Ρ‹ΠΉ Π°Ρ†ΠΈΠΊΠ»ΠΈΡ‡Π½Ρ‹ΠΉ Π³Ρ€Π°Ρ„) ΠΌΠΎΠ΄Π΅Π»ΠΈΡ€ΡƒΠ΅Ρ‚ зависимости ΠΌΠ΅ΠΆΠ΄Ρƒ Π·Π°Π΄Π°Ρ‡Π°ΠΌΠΈ. Airflow ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ DAG’ы (ΠΈΠ»ΠΈ ΠΏΠΎ простому – Даг’и): ΠΎΠ½ ставит Π·Π°Π΄Π°Ρ‡ΠΈ Π² ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ, слСдит Π·Π° состояниСм, Π΄Π΅Π»Π°Π΅Ρ‚ Ρ€Π΅Ρ‚Ρ€Π°ΠΈ ΠΈ сСнсоры, ΡƒΠΌΠ΅Π΅Ρ‚ backfill. from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime with DAG('etl', start_date=datetime(2023,1,1), schedule_interval='0 2 * * *') as dag: extract = BashOperator(task_id='extract', bash_command='echo extract') transform = BashOperator(task_id='transform', bash_

πŸ”Ή ΠŸΠ»Π°Π½ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠ΅ ETL: расписания ΠΈ зависимости

πŸ”Ή Как Π·Π°ΠΏΡƒΡΠΊΠ°Ρ‚ΡŒ ETL-ΠΏΠ°ΠΉΠΏΠ»Π°ΠΉΠ½ ΠΏΠΎ Ρ€Π°ΡΠΏΠΈΡΠ°Π½ΠΈΡŽ ΠΈ ΡƒΡ‡ΠΈΡ‚Ρ‹Π²Π°Ρ‚ΡŒ зависимости?

πŸ”Έ Π‘ΡƒΡ‚ΡŒ: Π½ΡƒΠΆΠ½Ρ‹ рСгулярныС запуски, Ρ‡Ρ‚ΠΎΠ±Ρ‹ Π΄Π°Π½Π½Ρ‹Π΅ Π±Ρ‹Π»ΠΈ свСТими, ΠΈ ΠΊΠΎΠ½Ρ‚Ρ€ΠΎΠ»ΡŒ зависимостСй, Ρ‡Ρ‚ΠΎΠ±Ρ‹ Π·Π°Π΄Π°Ρ‡ΠΈ Π½Π΅ ΠΏΠ΅Ρ€Π΅ΠΊΡ€Ρ‹Π²Π°Π»ΠΈΡΡŒ ΠΈ Π½Π΅ ΠΏΡ€ΠΈΠ²ΠΎΠ΄ΠΈΠ»ΠΈ ΠΊ нСконсистСнтности Π΄Π°Π½Π½Ρ‹Ρ…. Π‘Π΅Π· этого β€” Π΄ΡƒΠ±Π»ΠΈ, Π³ΠΎΠ½ΠΊΠΈ ΠΈ Π½Π΅ΠΏΠΎΠ»Π½Ρ‹Π΅ Π·Π°Π³Ρ€ΡƒΠ·ΠΊΠΈ.

πŸ”Έ cron β€” это расписаниС ΠΏΠΎ Π²Ρ€Π΅ΠΌΠ΅Π½ΠΈ. ΠŸΡ€ΠΈΠΌΠ΅Ρ€: 0 2 * * * запускаСт Π² 02:00. ΠŸΠΎΠ΄Ρ…ΠΎΠ΄ΠΈΡ‚ для простой пСриодичности, Π½ΠΎ cron Π½Π΅ Π·Π½Π°Π΅Ρ‚ ΠΎ порядкС Π·Π°Π΄Π°Ρ‡ ΠΈ Π½Π΅ управляСт retry.

πŸ”Έ DAG (Directed Acyclic Graph β€” Π½Π°ΠΏΡ€Π°Π²Π»Π΅Π½Π½Ρ‹ΠΉ Π°Ρ†ΠΈΠΊΠ»ΠΈΡ‡Π½Ρ‹ΠΉ Π³Ρ€Π°Ρ„) ΠΌΠΎΠ΄Π΅Π»ΠΈΡ€ΡƒΠ΅Ρ‚ зависимости ΠΌΠ΅ΠΆΠ΄Ρƒ Π·Π°Π΄Π°Ρ‡Π°ΠΌΠΈ. Airflow ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ DAG’ы (ΠΈΠ»ΠΈ ΠΏΠΎ простому – Даг’и): ΠΎΠ½ ставит Π·Π°Π΄Π°Ρ‡ΠΈ Π² ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ, слСдит Π·Π° состояниСм, Π΄Π΅Π»Π°Π΅Ρ‚ Ρ€Π΅Ρ‚Ρ€Π°ΠΈ ΠΈ сСнсоры, ΡƒΠΌΠ΅Π΅Ρ‚ backfill.

from airflow import DAG

from airflow.operators.bash import BashOperator

from datetime import datetime

with DAG('etl', start_date=datetime(2023,1,1), schedule_interval='0 2 * * *') as dag:

extract = BashOperator(task_id='extract', bash_command='echo extract')

transform = BashOperator(task_id='transform', bash_command='echo transform')

load = BashOperator(task_id='load', bash_command='echo load')

extract >> transform >> load

πŸ”Έ ΠŸΡ€Π°ΠΊΡ‚ΠΈΠΊΠ°: для простых ΠΎΠ΄Π½ΠΎΡˆΠ°Π³ΠΎΠ²Ρ‹Ρ… Π·Π°Π΄Π°Ρ‡ β€” cron; для pipeline с зависимостями β€” Airflow/DAG; Π·Π°Π΄Π°Π²Π°ΠΉΡ‚Π΅ catchup=False ΠΈ max_active_runs=1, Ρ‡Ρ‚ΠΎΠ±Ρ‹ ΠΈΠ·Π±Π΅ΠΆΠ°Ρ‚ΡŒ Π½Π°ΠΊΠ»Π°Π΄ΠΎΠΊ.

πŸ“š cron для простого расписания, DAG+Airflow для контроля зависимостСй ΠΈ надёТности.

#CODERIKK #Etl #Junior

➑️ ΠœΡ‹ Π² Telegram - Π‘Π΅Ρ‚ΠΊΠ΅ - Π”Π·Π΅Π½

Π‘ΡƒΠ΄Ρƒ Ρ€Π°Π΄ вашСй Ρ€Π΅Π°ΠΊΡ†ΠΈΠΈ Π·Π΄Π΅ΡΡŒβ¬‡οΈ