Найти тему
In Data We Trust

Подключение Airflow к Yandex Managed Service for ClickHouse

Оглавление

Для запуска SQL скриптов в Clichouse из Airflow я использую airflow-clickhouse-plugin. Установка:

Установка airflow-clickhouse-plugin

pip install airflow-clickhouse-plugin

Чтобы Airflow мог подключиться к Yandex Managed Service for ClickHouse потребуется сгенерировать SSL сертификаты. На примере Windows PoserShell:

mkdir -Force $HOME\.yandex; `
curl.exe
https://storage.yandexcloud.net/cloud-certs/RootCA.pem `
--output $HOME\.yandex\RootCA.crt; `
curl.exe
https://storage.yandexcloud.net/cloud-certs/IntermediateCA.pem `
--output $HOME\.yandex\IntermediateCA.crt; `
Import-Certificate `
-FilePath $HOME\.yandex\RootCA.crt `
-CertStoreLocation cert:\CurrentUser\Root; `
Import-Certificate `
-FilePath $HOME\.yandex\IntermediateCA.crt `
-CertStoreLocation cert:\CurrentUser\Root

И поместить эти сертификаты на сервер в папку /usr/local/share/ca-certificates/Yandex/.

У нас Airflow разворачивается при помощи Docker. Сертификат скопированы в образ:

А в Dockerfile прописаны команды:

RUN pip install airflow-clickhouse-plugin

COPY ./IntermediateCA.crt /usr/local/share/ca-certificates/Yandex/

COPY ./RootCA.crt /usr/local/share/ca-certificates/Yandex/

Создание подключения к ClickHouse в Airflow

A Airflow надо создать новое подключение (Admin > Connections).

  • Connection Id — название подключения, которое будет использоваться в DAG.
  • Connection Type — тип подключения Sqlite.
  • Host — FQDN хост из консоли управления, указанные по нажатию кнопки "Подключиться" на странице кластера.
  • Login — логин для подключения.
  • Password — пароль логина.
  • Port — 9440
  • Extra:
{
"ca_certs": "/usr/local/share/ca-certificates/Yandex/RootCA.crt",
"secure": true,
"verify": true
}
Готовое соединение с Clickhouse в Airflow
Готовое соединение с Clickhouse в Airflow

Пример DAG с подключением к Clickhouse

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow_clickhouse_plugin.operators.clickhouse import ClickHouseOperator

default_args = {
'owner': 'rfathutdinov',
'catchup': False,
'description': """
Пример DAG с подключением к Clickhouse
"""
}


with DAG(dag_id='ch_test',
schedule=None,
start_date=days_ago(0),
tags=['clickhouse', 'example'],
default_args=default_args
) as dag:

clickhouse_example_task = ClickHouseOperator(
task_id='clickhouse_example_task',
sql='SELECT version();',
clickhouse_conn_id='clickhouse_datago',
)

clickhouse_example_task

Пример запуска DAG:

-3