Найти в Дзене

РАЗРАБОТКА РАСПРЕДЕЛЕННОЙ ИНФОРМАЦИОННОЙ СИСТЕМЫ ДЛЯ МОНИТОРИНГА И ПРОГНОЗИРОВАНИЯ ГЕОКЛИМАТИЧЕСКИХ РИСКОВ

В работе рассматривается архитектура и программная реализация веб-ориентированной системы для оценки безопасности туристических маршрутов. Комплекс состоит из двух модулей: серверной части (API на базе FastAPI), обеспечивающей инференс моделей машинного обучения и генерацию временных рядов (ARIMA), и клиентской части (интерфейс на Dash), реализующей загрузку GPX-треков и геоинформационную визуализацию прогнозов. Эффективное управление рисками на природных территориях требует интеграции разнородных данных: метеорологических сводок, географических особенностей местности и исторических данных о происшествиях. Для обеспечения доступности сложных прогностических моделей конечным пользователям необходима клиент-серверная архитектура, где вычислительно затратные операции вынесены в API, а взаимодействие с пользователем осуществляется через интуитивный веб-интерфейс. Разработанное решение базируется на микросервисном подходе: Серверная логика сосредоточена в модуле main.py. Ключевой особенност
Оглавление

В работе рассматривается архитектура и программная реализация веб-ориентированной системы для оценки безопасности туристических маршрутов. Комплекс состоит из двух модулей: серверной части (API на базе FastAPI), обеспечивающей инференс моделей машинного обучения и генерацию временных рядов (ARIMA), и клиентской части (интерфейс на Dash), реализующей загрузку GPX-треков и геоинформационную визуализацию прогнозов.

Введение

Эффективное управление рисками на природных территориях требует интеграции разнородных данных: метеорологических сводок, географических особенностей местности и исторических данных о происшествиях. Для обеспечения доступности сложных прогностических моделей конечным пользователям необходима клиент-серверная архитектура, где вычислительно затратные операции вынесены в API, а взаимодействие с пользователем осуществляется через интуитивный веб-интерфейс.

Архитектура системы

Разработанное решение базируется на микросервисном подходе:

  1. Сервер приложений (Backend): реализован на Python с использованием фреймворка FastAPI. Отвечает за загрузку предобученных моделей, сбор данных из OSM (OpenStreetMap) и Meteostat, а также выполнение прогнозов.
  2. Клиентское приложение (Frontend): реализовано на базе Dash и Plotly. Отвечает за парсинг пользовательских файлов, отправку запросов к API и рендеринг интерактивных карт.

Реализация серверной части (Backend)

Серверная логика сосредоточена в модуле main.py. Ключевой особенностью является асинхронная загрузка моделей при старте приложения для минимизации задержек при обработке запросов.

Инициализация и управление жизненным циклом

Используется механизм lifespan для предварительной загрузки моделей классификации (пожары, наводнения, эвакуация) в оперативную память.

# Импорт нужных библиотек
import time
from contextlib import asynccontextmanager
import joblib
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel
import datetime
import pandas as pd
import osmnx as ox
from meteostat import Point, Daily
import sqlite3
from statsmodels.tsa.arima.model import ARIMA
from tqdm import tqdm

models = {}

@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Загрузка моделей и сохранение их в словарь models
"""
model_fire = joblib.load(r'C:\Users\User\Prfiki\best_model_fire.pkl')
model_flooding = joblib.load(r'C:\Users\User\Prfiki\best_model_flooding.pkl')
model_common_danger = joblib.load(r'C:\Users\User\Prfiki\best_model_common_danger.pkl')
model_evacuation = joblib.load(r'C:\Users\User\Prfiki\best_model_evacuation.pkl')

models["model_fire"] = model_fire
models["model_flooding"] = model_flooding
models["model_common_danger"] = model_common_danger
models["model_evacuation"] = model_evacuation
yield
print("API выключено")

# Инициализация FastAPI приложения
app = FastAPI(lifespan=lifespan)

Схемы данных и извлечение геопространственных признаков

Для обогащения данных используются методы геокодинга через библиотеку osmnx. Функция check_features анализирует окружение точки (наличие воды, леса, строений) в радиусе 500 метров.

# Создание схем
class CoordAndDate(BaseModel):
longitude: float
latitude: float
date: datetime.date

class TrackPredictData(BaseModel):
start_time: datetime.date
end_time: datetime.date
track_id: int

def check_features(lat: float, lon: float, radius=500):
"""
Создание приватного метода, возвращающего объекты в области 500 метров от точки
"""
point = (lat, lon)
tags = {
"natural": ["water", "wood"],
"waterway": True,
"building": True
}

features = {"water": 0, "forest": 0, "buildings": 0}
try:
gdf = ox.features.features_from_point(point, dist=radius, tags=tags)
indices = gdf.index.tolist()
for i in range(len(indices)):
row = gdf.iloc[i]
if 'natural' in row.index and pd.notna(row['natural']):
if row['natural'] == 'water':
features['water'] += 1
elif row['natural'] == 'wood':
features['forest'] += 1
if 'waterway' in row.index and pd.notna(row['waterway']):
features['water'] += 1
if 'building' in row.index and pd.notna(row['building']):
features['buildings'] += 1
except:
pass
return features

Агрегация данных для предиктивной модели

Функция data_generate формирует вектор признаков, включая климатические параметры (температуру) запрашиваемой даты.

def data_generate(input: CoordAndDate):
"""
Дополнение данных для загрузки их в модели
"""
prediction_date = pd.DataFrame()
prediction_date["time"] = [pd.Timestamp(input.date)]
prediction_date["time"] = prediction_date["time"].astype("int64")
prediction_date["latitude"] = [input.latitude]
prediction_date["longitude"] = [input.longitude]

location = Point(input.latitude, input.longitude)
try:
prediction_date["temperature"] = Daily(location, pd.Timestamp(input.date)).fetch()["tavg"].values[0]
except Exception as e:
prediction_date["temperature"] = 12

features = check_features(input.latitude, input.longitude)
prediction_date["water_feature"] = features["water"]
prediction_date["forest_feature"] = features["forest"]
prediction_date["buildings_feature"] = features["buildings"]

return prediction_date

API Эндпоинты

Реализованы два ключевых метода: точечная оценка уровня опасности и комплексный прогноз для трека с использованием модели ARIMA для предсказания климатических изменений

@app.post("/predict/danger_level", summary="Предсказание уровня опасности по координатам и дате")
async def danger_level_predict(data: CoordAndDate):
"""
Реализация первого API запроса, который определяет уровень опасности на точке в определенное время
"""
prediction_date = data_generate(input=data)
danger_pred = models["model_common_danger"].predict(prediction_date)

return {"Danger_Level_Predict": int(danger_pred),
"Decoding": "0 - безопасная зона\n1 - Зона со сложностями в эвакуации\n2 - Зона с опасностью затопления\n"
"3 - Зона с опасностью затопления и сложностью эвакуации\n4 - Зона с опасностью пожара\n"
"5 - Зона с опасностью пожара и сложностью эвакуации"}
@app.post("/predict/track_pred")
async def track_predict(data: TrackPredictData):
"""
Реализация второго API запроса, который определяет уровень пожароопаности, опасности затопления и сложно
эвакуации в опредленный промежуток времени
"""
predictions_answer = {}
track_id = data.track_id
start_time = data.start_time
end_time = data.end_time

con = sqlite3.connect(r"C:\Users\User\Prfiki\test.db")
df = pd.read_sql("SELECT * FROM clear_data_№2", con=con)
df["time"] = df["time"].astype("datetime64[ns]")
df = df[df["track_id"] == track_id]

for i in tqdm(range(len(df.index.tolist()))):
temperatures = []
point_delta_temperature = pd.DataFrame()
df_row = df.iloc[i]

# Генерация исторического ряда для ARIMA
date_list = pd.Series(pd.date_range(end=start_time, periods=5, freq='1YE'), name="time")
point_delta_temperature = pd.concat([point_delta_temperature, date_list])

for k in range(len(point_delta_temperature.index.tolist())):
location = Point(df_row["latitude"], df_row["longitude"])
temperatures.append(Daily(location, point_delta_temperature.iloc[k]["time"]).fetch()["tavg"].values[0])

temperatures = pd.Series(temperatures, name="temperature")
point_delta_temperature["temperature"] = temperatures
point_delta_temperature.set_index("time", inplace=True)
point_delta_temperature.index.freq = 'YE-DEC'

# Прогнозирование температуры
model = ARIMA(point_delta_temperature, order=(4, 1, 1))
results = model.fit()

delta_time = round((end_time - start_time).days / 360)
result = results.forecast(steps=delta_time).reset_index().drop("index", axis=1)

# Подготовка данных для классификаторов
point_delta_temperature = pd.Series(pd.date_range(start=point_delta_temperature.iloc[-1].name,
periods=delta_time + 1, freq='1YE', inclusive='right'),
name="time")

a = pd.concat([df_row.drop(
["common_danger", "cluster", "cluster2", "elevation", "evacuation", "fire", "flooding", "place_type",
"point_id", "steps", "track_id", "time"], axis=0)] * len(point_delta_temperature.index.tolist()), axis=1,
ignore_index=True).T

point_delta_temperature = pd.concat([point_delta_temperature, a], axis=1)
point_delta_temperature["time"] = point_delta_temperature["time"].astype("int64")
point_delta_temperature["temperature"] = result

model_fire_predictions = models["model_fire"].predict(point_delta_temperature)
model_flooding_predictions = models["model_flooding"].predict(point_delta_temperature)
model_evacuation_predictions = models["model_evacuation"].predict(point_delta_temperature)

predictions_answer[f"point_{i}"] = [{"fire_predictions": model_fire_predictions.tolist(),
"flooding_predictions": model_flooding_predictions.tolist(),
"evacuation_predictions": model_evacuation_predictions.tolist()}]

return predictions_answer

if __name__ == "__main__":
uvicorn.run("main:app", reload=True)

Реализация клиентской части (Frontend)

Интерфейс пользователя реализован в файле Application_2.py с использованием библиотеки Dash. Он обеспечивает интерактивное взаимодействие с картой и отправку данных на сервер.

Макет приложения и парсинг файлов

Интерфейс включает элементы загрузки файлов (dcc.Upload), выбора дат и карты. Функция gpx_parse извлекает координаты маршрута.

# Импорт нужных библиотек
import datetime
import io
import sqlite3
from datetime import date
import requests
import base64, io
from dash import Dash, html, dcc, callback, Output, Input, dash, State
import dash_leaflet as dl
import folium
import gpxpy
import plotly.express as px
import pandas as pd
from tqdm import tqdm

# Инициализация Dash-приложения с поддержкой асинхронности
app = Dash(use_async=True)

# Загрузка данные из базы данных
con = sqlite3.connect(r"C:\Users\User\Prfiki\test.db")
df = pd.read_sql("SELECT * FROM clear_data_№2", con)

def track_count(df: pd.DataFrame):
"""
Создает список маршрутов для выпадающего списка
"""
list_choose = []
for i in range(len(df["track_id"].unique())):
list_choose.append(f"Маршрут №{i + 1}")
return list_choose

def gpx_parse(gpx_file):
"""
Парсит GPX-файл и извлекает точки маршрута
"""
gpx = gpxpy.parse(gpx_file)
point_list = []
point_id = 0
for track in gpx.tracks:
for segment in track.segments:
for point in segment.points:
point_list.append((point.latitude, point.longitude))
point_id += 1
return point_list

# Определение макета приложения
app.layout = [
html.H1(children='Определение уровня опасности точки по координате и дате', style={'textAlign': 'center', 'display': 'block'}),
dcc.Upload(id='get_gpx_file', children=html.Div(["Drag and Drop или ", html.A("выбери GPX")]), multiple=False),
dcc.DatePickerSingle(display_format="YYYY.DD.MM", date=date(2023, 10, 15),
style={'display': 'block', "alignItems": "center"}, id="input_date"),
html.Iframe(id="out_api_1", style={"width": "100%", "height": "600px", "border": "none"}),

html.H1(children='Прогноз пожарной опасности или затоплений на заданный период, оценка сложности эвакуации.',
style={'textAlign': 'center', 'display': 'block'}),
dcc.DatePickerRange(display_format="YYYY", style={'display': 'block'}, id="input_range_date"),
dcc.Dropdown(track_count(df), style={'display': 'block'}, id="input_track_id"),
html.Div(id="loading_bar", style={'white-space': 'pre-line', "font-size": "20px"}),
html.Iframe(id="out_api_2", style={"width": "100%", "height": "600px", "border": "none"})
]

Взаимодействие с API: Анализ точек GPX

Коллбэк api_1 обрабатывает загруженный GPX-файл, отправляет координаты в метод danger_level_predict и отображает результаты на карте folium.

@app.callback(Output("out_api_1", "children"),
Input("get_gpx_file", "contents"),
State("get_gpx_file", "filename"),
Input("input_date", "date"),
prevent_initial_call=True)
def api_1(contents, filename, date):
"""
Обрабатывает GPX файл и получает прогноз уровня опасности для каждой точки, визуализируя все на карте
"""
_, content_string = contents.split(",", 1)
decoded = base64.b64decode(content_string)
text = decoded.decode("utf-8", errors="replace")
point_list = gpx_parse(io.StringIO(text))

points = []
count = 0
for i in tqdm(point_list):
data = {
"longitude": i[0],
"latitude": i[1],
"date": date
}

response = requests.post("http://127.0.0.1:8000/predict/danger_level", json=data)

points.append({"name": f"Точка №{count}", "coords": (i[1], i[0]),
f"info": f"Уровень опасности на точке: {response.json()['Danger_Level_Predict']}"})
count += 1

m = folium.Map(location=points[0]["coords"], zoom_start=12)
folium.PolyLine(
locations=[p['coords'] for p in points],
color='blue',
weight=3,
opacity=0.7
).add_to(m)

for point in points:
folium.Marker(
location=point['coords'],
popup=f"{point['name']}{point['info']}",
tooltip=point['name'],
icon=folium.Icon(color='red', icon='info-sign')
).add_to(m)

return m.get_root().render()

Взаимодействие с API: Прогноз по маршруту

Коллбэк api_2 запрашивает комплексный прогноз для выбранного трека за указанный период, используя метод track_predict.

@app.callback(Output("out_api_2", "srcDoc"),
Input("input_range_date", "start_date"),
Input("input_range_date", "end_date"),
Input("input_track_id", "value"),
prevent_initial_call=True,
running=[Output("loading_bar", "children"), "Загрузка", ""],)
def api_2(start_date, end_date, track_id):
"""
Обрабатывает GPX файл и получает прогноз сложности эвакуации, пажароопасности и опасности затопления для каждой
точки на заданный период, визуализируя все на карте
"""
if start_date is None or end_date is None or track_id is None:
return dash.no_update

if datetime.datetime.strptime(end_date, "%Y-%m-%d") - datetime.timedelta(days=365) < datetime.datetime.strptime(start_date, "%Y-%m-%d"):
return "Разница между началом и концом должна быть год или больше"

track_id = int(track_id[9:])

data = {
"start_time": start_date,
"end_time": end_date,
"track_id": track_id
}

response = requests.post("http://127.0.0.1:8000/predict/track_pred", json=data)

track_list = df[df["track_id"] == track_id]
points = []

for k, v in response.json().items():
row = track_list.iloc[int(k.split("_")[1])]
lat = row["latitude"]
lon = row["longitude"]

for dangerous in v:
points.append({'coords': [lat, lon], 'name': f'Точка №{k.split("_")[1]}',
'info': f'Изменение пажароопасности: {dangerous["fire_predictions"]}\n'
f'Изменение опасности затопления {dangerous["flooding_predictions"]}\n'
f'Изменение сложности эвакуации: {dangerous["evacuation_predictions"]}'})

m = folium.Map(location=points[0]["coords"], zoom_start=12)
folium.PolyLine(
locations=[p['coords'] for p in points],
color='blue',
weight=3,
opacity=0.7
).add_to(m)

for point in points:
folium.Marker(
location=point['coords'],
popup=f"{point['name']}{point['info']}",
tooltip=point['name'],
icon=folium.Icon(color='red', icon='info-sign')
).add_to(m)

return m.get_root().render()

if __name__ == '__main__':
app.run(debug=True)

Заключение

Представленная система демонстрирует эффективный подход к интеграции методов машинного обучения в пользовательские приложения. Разделение на серверную и клиентскую части позволяет масштабировать решение и обеспечивать высокую скорость отклика, а использование современных библиотек визуализации (Folium, Dash Leaflet) делает результаты анализа доступными для широкого круга пользователей.