Найти в Дзене

Оценка прогнозирование прожароопасности и риска затопления маршрутов на основе методов машинного обучения с использованием MLFLOW

Аннотация В работе представлен программный комплекс на языке Python для анализа пространственно-временных данных маршрутов и оценки рисков природных опасностей (пожары, затопления). Реализован класс-агент ModelCreator, который: загружает данные из базы SQLite; проводит предобработку признаков; обучает несколько моделей классификации (ExtraTreesClassifier, MLPClassifier, GradientBoostingClassifier, LogisticRegression); выбирает лучшую модель по метрике точности; выполняет поиск гиперпараметров для улучшения качества; сохраняет модели с использованием системы логирования mlflow; строит прогноз на несколько лет вперёд с учётом роста температуры; визуализирует прогнозы на карте с помощью folium; поддерживает дообучение моделей на новых данных и контроль дрейфа данных с помощью библиотеки evidently. Введение Рост частоты экстремальных природных явлений (лесные пожары, затопления) требует разработки инструментов анализа и прогноза рисков на транспортных и туристических маршрутах. В условиях
Оглавление

Аннотация

В работе представлен программный комплекс на языке Python для анализа пространственно-временных данных маршрутов и оценки рисков природных опасностей (пожары, затопления). Реализован класс-агент ModelCreator, который:

  • загружает данные из базы SQLite;
  • проводит предобработку признаков;
  • обучает несколько моделей классификации (ExtraTreesClassifier, MLPClassifier, GradientBoostingClassifier, LogisticRegression);
  • выбирает лучшую модель по метрике точности;
  • выполняет поиск гиперпараметров для улучшения качества;
  • сохраняет модели с использованием системы логирования mlflow;
  • строит прогноз на несколько лет вперёд с учётом роста температуры;
  • визуализирует прогнозы на карте с помощью folium;
  • поддерживает дообучение моделей на новых данных и контроль дрейфа данных с помощью библиотеки evidently.

Введение

Рост частоты экстремальных природных явлений (лесные пожары, затопления) требует разработки инструментов анализа и прогноза рисков на транспортных и туристических маршрутах. В условиях соревнований по компетенциям в области анализа данных и разработки программных решений важно иметь прототип системы, которая:

  1. Хранит и обрабатывает пространственно-временные данные (маршруты, точки, метеорологические параметры).
  2. Обучает ансамблевые и нейросетевые модели для классификации риска.
  3. Позволяет прогнозировать изменение риска при росте температуры.
  4. Обеспечивает удобную визуализацию и сопровождение жизненного цикла моделей.

В данной работе описывается реализация такого комплекса в виде класса ModelCreator.

Используемые библиотеки и инструменты

Основу системы составляют библиотеки для:

  • обработки табличных данных (pandas, sqlite3);
  • машинного обучения (scikit-learn, imblearn);
  • логирования и управления версиями моделей (mlflow);
  • контроля дрейфа данных (evidently);
  • картографической визуализации (folium).

Импорт библиотек

import pandas as pd # Библиотека для работы с табличными данными

import folium # Библиотека для визуализации карт

import warnings # Библиотека для отключения предупреждений и ошибок

import sqlite3 # Библиотека для работы с базами данных

# Библиотека для реализации машинного обучения

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

from sklearn.linear_model import LogisticRegression, Perceptron, SGDClassifier

from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, train_test_split, StratifiedKFold, cross_val_score, KFold

from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier, ExtraTreesClassifier

from sklearn.preprocessing import StandardScaler

from sklearn.utils.class_weight import compute_class_weight

from sklearn.neural_network import MLPClassifier

from sklearn.neighbors import KNeighborsClassifier

from sklearn.tree import DecisionTreeClassifier

# Библиотека для обработки данных

from imblearn.over_sampling import SMOTE, BorderlineSMOTE

from imblearn.pipeline import make_pipeline, Pipeline

# Библиотека для логирования моделей

import mlflow

import mlflow.sklearn

from mlflow.tracking import MlflowClient

from mlflow.models import infer_signature

# Библиотека для контроля дрейфа данных

import evidently

from evidently.report import Report

from evidently.metric_preset import DataDriftPreset

Отключение предупреждений

Для удобства восприятия и во избежание лишнего вывода предупреждений используется глобальное отключение варнингов

warnings.filterwarnings("ignore")

Архитектура программного комплекса

Центральным элементом является класс ModelCreator, реализующий типичный жизненный цикл модели:

  1. Загрузка и хранение исходных данных.
  2. Предобработка и оптимизация признаков.
  3. Обучение и сравнение базовых моделей.
  4. Тонкая настройка (RandomizedSearchCV) для лучшей модели.
  5. Кросс-валидация.
  6. Долгосрочный прогноз с учётом изменения температуры.
  7. Визуализация прогнозов на карте.
  8. Дообучение на новых данных с контролем дрейфа.
  9. Чтение логов из mlflow.

Инициализация логирования и класса ModelCreator

class ModelCreator:

# Подключение логов
mlflow.set_tracking_uri("sqlite:///mlflow.db")
mlflow.set_experiment("Clean_Experiment")
mlflow.sklearn.autolog(max_tuning_runs=10, silent=True)

def __init__(self, sql_path: str):
"""
Определение некоторых переменных и импорт базы данных
"""
self.predict_table = pd.DataFrame()
try:
con = sqlite3.connect(sql_path.replace("\\", "\\\\"))
self.data = pd.read_sql("SELECT * FROM clear_data_№2", con=con)
except Exception as e:
print(f"Ошибка в загрузке данных: {e}")

Здесь задаются:

  • адрес хранилища экспериментов mlflow (локальная база SQLite);
  • имя эксперимента;
  • включается автологирование параметров и метрик моделей.

Предобработка и выбор признаков

Перед обучением моделей выполняется оптимизация набора признаков: удаляются столбцы, которые либо слабо влияют на задачу, либо дублируют информацию.

Оптимизация данных

def data_optimization(self):
"""
Очистка датасета и его подготовка перед использованием в моделях.
"""
try:
self.opt_data = self.data.drop(["elevation", "steps", "cluster", "cluster2", "common_danger"], axis=1)
except Exception as e:
...

Метод формирует self.opt_data, который далее используется для разбиения на обучающую и тестовую выборки.

Обучение базовых моделей и выбор лучшей

На подготовленных данных обучаются четыре различные модели:

  • ExtraTreesClassifier;
  • MLPClassifier;
  • GradientBoostingClassifier;
  • LogisticRegression.

Для каждой модели используется конвейер (Pipeline) со стандартизацией признаков и балансировкой классов методом SMOTE.

Обучение и логирование моделей

def fit_model(self, target_column: str):
"""
Разделение датасета на выборки и обучение моделей
"""
self.target_column = target_column
self.X = self.opt_data.drop(self.target_column, axis=1)
self.y = self.opt_data[self.target_column]
self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
self.X, self.y, test_size=0.33, random_state=42
)

with mlflow.start_run(run_name="ExtraTreesClassifier"):
# Подбор параметров для модели ExtraTreesClassifier
SC = make_pipeline(
StandardScaler(),
SMOTE(sampling_strategy='auto', random_state=None, k_neighbors=5),
ExtraTreesClassifier()
)

# Обучение модели ExtraTreesClassifier
SC.fit(self.X_train, self.y_train)

# Получение результатов для метрик модели ExtraTreesClassifier
SC_pred = SC.predict(self.X_test)
SC_pred_proba = SC.predict_proba(self.X_test)[:, 1]
signature = infer_signature(self.X_train[:100], SC_pred)
mlflow.sklearn.log_model(
SC,
"ExtraTreesClassifier_Model",
signature=signature,
registered_model_name="ExtraTreesClassifier"
)

with mlflow.start_run(run_name="MLPClassifier"):
# Подбор параметров для модели RandomForestClassifier
MLPC = make_pipeline(
StandardScaler(),
SMOTE(sampling_strategy='auto', random_state=None, k_neighbors=5),
MLPClassifier(max_iter=1000, tol=1e-4)
)

# Обучение модели RandomForestClassifier
MLPC.fit(self.X_train, self.y_train)

# Получение результатов для метрик модели RandomForestClassifier
MLPC_pred = MLPC.predict(self.X_test)
MLPC_pred_proba = MLPC.predict_proba(self.X_test)[:, 1]
signature = infer_signature(self.X_train[:100], MLPC_pred)
mlflow.sklearn.log_model(
MLPC,
"MLPClassifier_Model",
signature=signature,
registered_model_name="MLPClassifier"
)

with mlflow.start_run(run_name="GradientBoostingClassifier"):
# Подбор параметров для модели GradientBoostingClassifier
GB = make_pipeline(
StandardScaler(),
SMOTE(sampling_strategy='auto', random_state=None, k_neighbors=5),
GradientBoostingClassifier()
)

# Обучение модели GradientBoostingClassifier
GB.fit(self.X_train, self.y_train)

# Получение результатов для метрик модели GradientBoostingClassifier
GB_pred = GB.predict(self.X_test)
GB_pred_proba = GB.predict_proba(self.X_test)[:, 1]
signature = infer_signature(self.X_train[:100], GB_pred)
mlflow.sklearn.log_model(
GB,
"GradientBoostingClassifier_Model",
signature=signature,
registered_model_name="GradientBoostingClassifier"
)

with mlflow.start_run(run_name="LogisticRegression"):
# Подбор параметров для модели LogisticRegression
LR = make_pipeline(
StandardScaler(),
SMOTE(sampling_strategy='auto', random_state=None, k_neighbors=5),
LogisticRegression(max_iter=500)
)

# Обучение модели LogisticRegression
LR.fit(self.X_train, self.y_train)

# Получение результатов для метрик модели LogisticRegression
LR_pred = LR.predict(self.X_test)
LR_pred_proba = LR.predict_proba(self.X_test)[:, 1]
signature = infer_signature(self.X_train[:100], LR_pred)
mlflow.sklearn.log_model(
LR,
"LogisticRegression_Model",
signature=signature,
registered_model_name="LogisticRegression"
)

print(f"Были обучены 4 разные модели со следующими результатами:\n\n"
f"ExtraTreesClassifier:\n"
f"Точность: {accuracy_score(self.y_test, SC_pred):.2f};\n"
f"Полнота: {recall_score(self.y_test, SC_pred, average='weighted'):.2f};\n"
f"F1-мера: {f1_score(self.y_test, SC_pred, average='weighted'):.2f};\n\n"
f"MLPClassifier:\n"
f"Точность: {accuracy_score(self.y_test, MLPC_pred):.2f};\n"
f"Полнота: {recall_score(self.y_test, MLPC_pred, average='weighted'):.2f};\n"
f"F1-мера: {f1_score(self.y_test, MLPC_pred, average='weighted'):.2f};\n\n"
f"GradientBoostingClassifier:\n"
f"Точность: {accuracy_score(self.y_test, GB_pred):.2f};\n"
f"Полнота: {recall_score(self.y_test, GB_pred, average='weighted'):.2f};\n"
f"F1-мера: {f1_score(self.y_test, GB_pred, average='weighted'):.2f};\n\n"
f"LogisticRegression:\n"
f"Точность: {accuracy_score(self.y_test, LR_pred):.2f};\n"
f"Полнота: {recall_score(self.y_test, LR_pred, average='weighted'):.2f};\n"
f"F1-мера: {f1_score(self.y_test, LR_pred, average='weighted'):.2f};\n\n")

# Создание словаря со всеми моделями и их результатом точности. Выбор модели с максимальным результатом
models_result = {
"ExtraTreesClassifier": accuracy_score(self.y_test, SC_pred),
"MLPClassifier": accuracy_score(self.y_test, MLPC_pred),
"GradientBoostingClassifier": accuracy_score(self.y_test, GB_pred),
"LogisticRegression": accuracy_score(self.y_test, LR_pred)
}
self.best_model = max(models_result, key=models_result.get)
print(f"Самая лучшая модель: {self.best_model}")

Улучшение лучшей модели (RandomizedSearchCV)

После выбора лучшей модели выполняется поиск гиперпараметров с использованием RandomizedSearchCV. В зависимости от того, какая модель признана лучшей, строится соответствующий конвейер.

Подбор гиперпараметров

def improving_best_model(self):
"""
Подбор параметров для лучшей подели
"""

if self.best_model == "MLPClassifier":
self.MLPC = Pipeline([
("ss", StandardScaler()),
("smote", SMOTE()),
("model", MLPClassifier())
])
param_grid = {
"smote__k_neighbors": [2, 3, 4, 5, 6, 8, 10],
"model__activation": ["identity", "logistic", "tanh", "relu"],
"model__solver": ["lbfgs", "sgd", "adam"],
"model__alpha": [0.0001, 0.001, 0.01, 0.1, 1.0, 0.00001],
"model__learning_rate": ["constant", "invscaling", "adaptive"],
"model__max_iter": [200, 300, 500],
"model__learning_rate_init": [0.0001, 0.001, 0.01, 0.1, 1.0, 0.00001]
}
GS = RandomizedSearchCV(self.MLPC, param_grid, cv=3, n_iter=100, n_jobs=-1)
GS.fit(self.X_train, self.y_train)
self.best_model = GS.best_estimator_
mlflow.sklearn.log_model(
self.best_model,
f"best_model_{self.target_column}",
input_example=self.X_train[:5],
registered_model_name=f"best_model_{self.target_column} "
)
return print(f"Оценка качества лучшей модели: {self.best_model.score(self.X_test, self.y_test)}")

elif self.best_model == "ExtraTreesClassifier":
self.ETC = Pipeline([
("ss", StandardScaler()),
("smote", SMOTE()),
("model", ExtraTreesClassifier())
])
param_grid = {
"smote__k_neighbors": [2, 3, 4, 5, 6, 8, 10],
"model__n_estimators": [100, 200, 400, 800, 1000],
"model__criterion": ["gini", "entropy", "log_loss"],
"model__bootstrap": [True, False],
"model__class_weight": ["balanced", "balanced_subsample"]
}
GS = RandomizedSearchCV(self.ETC, param_grid, cv=3, n_iter=100, n_jobs=-1)
GS.fit(self.X_train, self.y_train)
self.best_model = GS.best_estimator_
mlflow.sklearn.log_model(
self.best_model,
f"best_model_{self.target_column}",
input_example=self.X_train[:5],
registered_model_name=f"best_model_{self.target_column}"
)
return print(f"Оценка качества лучшей модели: {self.best_model.score(self.X_test, self.y_test)}")

elif self.best_model == "GradientBoostingClassifier":
self.GBC = Pipeline([
("ss", StandardScaler()),
("smote", SMOTE()),
("model", GradientBoostingClassifier())
])
param_grid = {
"smote__k_neighbors": [2, 3, 4, 5, 6, 8, 10],
"model__loss": ["log_loss", "exponential"],
"model__learning_rate": [0.1, 0.001, 0.0001, 1],
"model__n_estimators": [100, 200, 400, 800, 1000],
"model__criterion": ["friedman_mse", "squared_error"],
"model__max_depth": [3, 6, 10],
"model__max_features": ["sqrt", "log2"]
}
GS = RandomizedSearchCV(self.GBC, param_grid, cv=3, n_iter=100, n_jobs=-1)
GS.fit(self.X_train, self.y_train)
self.best_model = GS.best_estimator_
mlflow.sklearn.log_model(
self.best_model,
f"best_model_{self.target_column}",
input_example=self.X_train[:5],
registered_model_name=f"best_model_{self.target_column}"
)
return print(f"Оценка качества лучшей модели: {self.best_model.score(self.X_test, self.y_test)}")

elif self.best_model == "LogisticRegression":
self.LR = Pipeline([
("ss", StandardScaler()),
("smote", SMOTE()),
("model", LogisticRegression())
])
param_grid = {
"smote__k_neighbors": [2, 3, 4, 5, 6, 8, 10],
"model__penalty": ["l1", "l2", "elasticnet", None],
# здесь далее в ноутбуке задаются остальные параметры:
# "dual": [True, False], "C": [...], и т.д.
}
GS = RandomizedSearchCV(self.LR, param_grid, cv=3, n_iter=100, n_jobs=-1)
GS.fit(self.X_train, self.y_train)
self.best_model = GS.best_estimator_
mlflow.sklearn.log_model(
self.best_model,
f"best_model_{self.target_column}",
input_example=self.X_train[:5],
registered_model_name=f"best_model_{self.target_column}"
)
return print(f"Оценка качества лучшей модели: {self.best_model.score(self.X_test, self.y_test)}")

Кросс-валидация и доступ к лучшей модели

Для оценки устойчивости модели используется стратифицированная k-fold кросс-валидация.

Кросс-валидация

def cvtest_best_model(self, n_splits: int):
"""
Оценка модели кросс-валидацией
"""
try:
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
scores = cross_val_score(self.best_model, self.X, self.y, cv=skf, scoring='accuracy')
return scores
except Exception as e:
print(f"Ошибка: {e}")

Получение ссылки на модель

def get_best_model(self):
"""
Получение лучшей модели
"""
return self.best_model

Долгосрочный прогноз рисков с учётом изменения температуры

Отдельный блок отвечает за прогноз на несколько лет вперёд. Предполагается, что среднее изменение температуры Земли составляет порядка delta = 0.002 градуса в год. На этой основе формируется таблица с прогнозами по пожарам и затоплениям.

Формирование прогноза

def predictions(self, years: int, delta = 0.002):
"""
Получение предсказаний на указанное количество лет вперед
(значение дельта рекомендуется 0.002, так как это среднее изменение температуры за год по Земле)
"""
try:
pred_data = self.opt_data
self.predict_table = pd.concat([self.predict_table, pred_data["temperature"]], axis=1)
fire_model = mlflow.pyfunc.load_model(f"models:/best_model_fire/latest")
flooding_model = mlflow.pyfunc.load_model(f"models:/best_model_flooding/latest")
except Exception as e:
print(f"Ошибка: {e}")

for i in range(years):
pred_data[f"temperature"] = pred_data["temperature"] + delta * i
fire_pred = fire_model.predict(pred_data.drop("fire", axis=1))
flooding_pred = flooding_model.predict(pred_data.drop("flooding", axis=1))
self.predict_table[f"temperature_{i}"] = pred_data[f"temperature"]
self.predict_table[f"fire_pred_{i}"] = fire_pred
self.predict_table[f"flooding_pred_{i}"] = flooding_pred
return pd.DataFrame(self.predict_table)

Визуализация прогнозов на карте

Для анализа маршрутов и точек используется интерактивная карта folium. В каждой точке отображается информация о температуре, прогнозе пожароопасности и опасности затопления через заданное количество лет.

Визуализация маршрута

def predict_visualization(self, track: int, year: int):
"""
Визуализация заранее полученных предсказаний на указанном маршруте
"""
points = []
for i in range(len(self.data.index.tolist())):
try:
row = self.data[self.data["track_id"] == track].iloc[i]
except Exception as e:
break
index = row.name
try:
pred_row = self.predict_table.loc[int(index)]
except Exception as e:
print(f"Ошибка: {e}")
break

points.append({
'coords': [row["latitude"], row["longitude"]],
'name': f'Точка №{index}',
'info': (
f'Температура изначально:{pred_row["temperature"]}\n'
f'Темпиратура через {year} года:{pred_row[f"temperature_{year}"]}\n'
f'Пожароопасность: {pred_row[f"fire_pred_{year}"]}'
f'\nОпасность затопления:{pred_row[f"flooding_pred_{year}"]}'
)
})
try:
m = folium.Map(location=points[0]["coords"], zoom_start=12)
folium.PolyLine(
locations=[p['coords'] for p in points],
color='blue',
weight=3,
opacity=0.7
).add_to(m)
for point in points:
folium.Marker(
location=point['coords'],
popup=f"""
**{point['name']}**
{point['info']}""",
tooltip=point['name'],
icon=folium.Icon(color='red', icon='info-sign')
).add_to(m)
return m
except Exception as e:
print(f"Ошибка: {e}")

Дообучение модели на новых данных и контроль дрейфа

Система поддерживает сценарий поступления новых данных (например, обновлённые маршруты или измерения). В этом случае возможно:

  1. Выявить дрейф данных (изменение распределений признаков).
  2. Дообучить существующую модель на объединённом датасете.
  3. Сохранить обновлённую модель в mlflow.

Дообучение и отчёт о дрейфе

def further_training_model(self, new_data_path: str, name_table: str, model_name: str):
"""
Дообучение модели на новом датасете
"""
try:
model = mlflow.sklearn.load_model(f"models:/{model_name}/latest")
target_column = model_name.split("_")[2]
except Exception as e:
print(f"Ошибка в загрузке модели!\n{e}")

try:
con = sqlite3.connect(new_data_path.replace("\\", "\\\\"))
new_data = pd.read_sql(f"SELECT * FROM {name_table}", con=con) # Чтение базы данных
except Exception as e:
print(f"Ошибка в загрузке данных: {e}")

report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=self.data, current_data=new_data)

self.data = pd.concat([self.data, new_data], axis=0, ignore_index=True)
new_data = self.data.drop(["elevation", "steps", "cluster", "cluster2", "common_danger"], axis=1)
X = new_data.drop(target_column, axis=1)
y = new_data[target_column]

model.fit(X, y)

mlflow.sklearn.log_model(
model,
f"best_model_{target_column}",
input_example=X[:5],
registered_model_name=f"best_model_{target_column}"
)
return report

Анализ логов экспериментов

Для обзора обученных и зарегистрированных моделей используется чтение логов mlflow.

Чтение логов

def read_log(self):
"""
Чтение логов моделей
"""
model_log = mlflow.search_logged_models(experiment_ids=None)
return model_log[model_log["name"] != "model"]

Заключение

В работе реализован полный цикл разработки и эксплуатации моделей машинного обучения для оценки рисков природных опасностей на маршрутах:

  • проведена предобработка данных и отбор признаков;
  • обучены и сравнены несколько моделей классификации;
  • реализован автоматический выбор лучшей модели с последующей тонкой настройкой гиперпараметров;
  • обеспечено логирование и версионирование моделей в mlflow;
  • разработан механизм долгосрочного прогноза с учётом роста температуры;
  • реализована визуализация на карте и функционал дообучения на новых данных с контролем дрейфа.

Полученный код и описанная архитектура могут быть использованы как основа для создания производственных систем мониторинга и прогноза природных рисков, а также как демонстрация владения современным стеком инструментов анализа данных в рамках чемпионатов профессионального мастерства.