Всем привет. Сегодня я хочу рассказать вам, как реализовать SSE (отправляемые сервером события) на Java.
Что такое Server Sent Events?
SSE — это технология, которая позволяет передавать данные с сервера на клиент в рамках одного HTTP-соединения в одном направлении.
Давайте создадим Spring Boot приложение использую Spring initializer.
Идем на сайт https://start.spring.io
Выбираем следующие параметры проект:
Project Type — Maven
Language — Java
Spring Boot version — последняя доступная (На момент написания – это Spring Boot 3.0.1)
Добавляем зависимость Spring Web
Нажимаем Generate. Сохраняем сгенерированный шаблон проекта и открываем в своей IDE. В моем случае это IntelliJ IDEA.
Давайте представим, что у нас есть система, которая отображает изменение котировок акций. Соответственно, при изменении котировок мы должны отправлять обновленную информацию клиентам. Создадим простой класс, отражающий информацию об акции.
Этот класс будет содержать только одно поле — цена акции.
Теперь давайте создадим класс, который будет симулировать изменение цены акции.
У нас есть зависимость от ApplicationEventPublisher внедренная через конструктор (1).
Экземпляр класса Random для генерации случайной цены акции (2).
Мы будем генерировать изменение цены в отдельном потоке через случайные промежутки времени. Для этого создадим пул потоков, содержащий один поток, который может планировать выполнение задач с заданной задержкой или выполнять их периодически (3).
В конструкторе нашего класса давайте запустим поток генерации случайной цены акции (4).
В методе changePrice мы получаем случайное значение цены (5), публикуем событие Stock для всех подписчиков (6) и планируем создание следующего значения со случайной задержкой (7).
Как я упоминал выше, у нас есть зависимость класса от ApplicationEventPublisher.
Spring "из коробки" предоставляет простой механизм для обработки событий, который уменьшает связность компонентов системы. Событие, происходящее в одной точке приложения, может быть перехвачено и обработано в любой другой части приложения благодаря таким сущностям, как publisher и eventListener.
Далее, создадим контроллер для обработки запросов от клиентов.
В Spring 4.2 появился новый класс, ResponseBodyEmitter, который используется в качестве возвращаемого типа в контроллерах Spring Web MVC для асинхронных запросов. ResponseBodyEmitter можно использовать для отправки нескольких объектов, где каждый объект записывается с помощью совместимого HttpMessageConverter. SseEmitter расширяет ResponseBodyEmitter и позволяет отправлять множество сообщений в ответ на один запрос, как того требует протокол SSE.
Давайте рассмотрим этот класс поближе:
Создадим Set, в котором мы будем хранить подключенных клиентов (2.1). Создадим метод контроллера, с помощью которого клиент будет регистрироваться на сервере для получения уведомлений об изменении цен (2.2). В этом методе мы создаем новый SseEmitter, чтобы в будущем отправлять с помощью него уведомления, и регистрируем два обработчика, onTimeout и onError, которые используются для того, чтобы при возникновении ошибки или таймаута удалить клиента из списка зарегистрированных клиентов.
Обработчик сообщений (2.3). Этот метод имеет две аннотации @Async и @EventListener. Аннотация @EventListener используется для указания на то, что метод является обработчиком событий. Тип событий, которые он обрабатывает, определяется типом аргумента, в данном случае Stock. По умолчанию обработчик вызывается синхронно. Однако мы можем легко сделать его асинхронным, добавив аннотацию @Async. Аннотирование метода с помощью @Async заставит его выполняться в отдельном потоке. Другими словами, вызывающая сторона не будет ждать завершения работы вызванного метода. Наш метод принимает новое событие со значением цены акции и асинхронно отправляет его всем клиентам в формате JSON (2.4). Если при отправке произошла ошибка, мы сохраняем соответствующий SseEmitter, а затем удаляем их из списка активных клиентов (2.5).
Для того чтобы Spring распознавал методы, помеченные аннотацией @Async, и запускал их в фоновом пуле потоков, необходимо добавить аннотацию @EnableAsync над классом конфигурации (3.1)
Теперь давайте создадим простой пользовательский интерфейс, чтобы увидеть, что у нас получилось в итоге.
Добавим файл index.html в наш проект по пути src/main/resources/static
И напишем простой код для отображения изменений в котировках акций:
Чтобы начать получать данные, мы создаем новый EventSource("/stocks-stream") (4.1).
Браузер установит соединение с /stocks-stream и будет держать его открытым в ожидании события.
По умолчанию объект EventSource генерирует 3 события:
- message - полученное сообщение, доступно как e.data.
- open - соединение открыто.
- error – ошибка соединения, например, сервер вернул статус 500
В обработчике событий message мы считываем полученную котировку и создаем новый элемент списка (4.2)
Наконец, давайте запустим наше приложение и посмотрим, что у нас получилось.
После запуска приложения нам нужно перейти по адресу http://localhost:8080, и мы увидим примерно следующее изображение.
Посмотрев на этот вывод, можно задаться вопросом - «почему периодически соединение закрывается и открывается снова»?
Это хороший вопрос, и хорошо, что эти сообщения есть в выводе - это демонстрирует, что SSE поддерживает автоматическое восстановление соединения после таймаута, например, из-за проблем с сетью.
В своем проекте мы можем установить необходимое значение для свойства spring.mvc.async.request-timeout (количество времени до завершения обработки асинхронных запросов. Если это значение не задано, используется таймаут по умолчанию) в файле конфигурации.
В моем случае это свойство как раз не установлено, поэтому используется значение по умолчанию. Для Apache Tomcat/10.1.4 используемого в текущем проекте - 30 секунд (см. атрибут asyncTimeout). Если мы, например, изменим это свойство на 90 секунд, то увидим соответствующие изменения.
Спасибо за внимание.
Весь код примеров доступен по ссылке GitHub