Для запуска SQL скриптов в Clichouse из Airflow я использую airflow-clickhouse-plugin. Установка:
Установка airflow-clickhouse-plugin
pip install airflow-clickhouse-plugin
Чтобы Airflow мог подключиться к Yandex Managed Service for ClickHouse потребуется сгенерировать SSL сертификаты. На примере Windows PoserShell:
mkdir -Force $HOME\.yandex; `
curl.exe https://storage.yandexcloud.net/cloud-certs/RootCA.pem `
--output $HOME\.yandex\RootCA.crt; `
curl.exe https://storage.yandexcloud.net/cloud-certs/IntermediateCA.pem `
--output $HOME\.yandex\IntermediateCA.crt; `
Import-Certificate `
-FilePath $HOME\.yandex\RootCA.crt `
-CertStoreLocation cert:\CurrentUser\Root; `
Import-Certificate `
-FilePath $HOME\.yandex\IntermediateCA.crt `
-CertStoreLocation cert:\CurrentUser\Root
И поместить эти сертификаты на сервер в папку /usr/local/share/ca-certificates/Yandex/.
У нас Airflow разворачивается при помощи Docker. Сертификат скопированы в образ:
А в Dockerfile прописаны команды:
RUN pip install airflow-clickhouse-plugin
COPY ./IntermediateCA.crt /usr/local/share/ca-certificates/Yandex/
COPY ./RootCA.crt /usr/local/share/ca-certificates/Yandex/
Создание подключения к ClickHouse в Airflow
A Airflow надо создать новое подключение (Admin > Connections).
- Connection Id — название подключения, которое будет использоваться в DAG.
- Connection Type — тип подключения Sqlite.
- Host — FQDN хост из консоли управления, указанные по нажатию кнопки "Подключиться" на странице кластера.
- Login — логин для подключения.
- Password — пароль логина.
- Port — 9440
- Extra:
{
"ca_certs": "/usr/local/share/ca-certificates/Yandex/RootCA.crt",
"secure": true,
"verify": true
}
Пример DAG с подключением к Clickhouse
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow_clickhouse_plugin.operators.clickhouse import ClickHouseOperator
default_args = {
'owner': 'rfathutdinov',
'catchup': False,
'description': """
Пример DAG с подключением к Clickhouse
"""
}
with DAG(dag_id='ch_test',
schedule=None,
start_date=days_ago(0),
tags=['clickhouse', 'example'],
default_args=default_args
) as dag:
clickhouse_example_task = ClickHouseOperator(
task_id='clickhouse_example_task',
sql='SELECT version();',
clickhouse_conn_id='clickhouse_datago',
)
clickhouse_example_task
Пример запуска DAG: