Источник: Nuances of Programming
Чтобы эффективно управлять взаимодействием между микросервисами, нужна подходящая архитектура API. Не бойтесь создавать новые микросервисы, максимально разделяя их по функциональности. Например, вместо одного сервиса уведомлений можно создать отдельные для электронной почты, SMS и мобильных push-уведомлений.
Чтобы следовать дальнейшим шагам, нужен шлюз API, который управляет запросами, обрабатывает маршрутизацию на балансирующие нагрузку серверы и ограничивает несанкционированный доступ.
Типы взаимодействий
- Синхронный протокол. Один из примеров — HTTP. Клиент отправляет запрос сервису и ожидает ответа. Важной особенностью протокола (HTTP/HTTPS) является синхронность. Дальнейшее исполнение клиентского кода возможно только после получения ответа от сервера HTTP.
- Асинхронный протокол. Пример — AMQP (поддерживается многими операционными системами и облачными средами). Код клиента или отправитель сообщения обычно не ожидают ответа. Они просто отправляют сообщение в сервис передачи сообщений, такой как RabbitMQ или Kafka (если используется управляемая событиями архитектура).
Недостатки синхронного протокола
- Если добавлять новые микросервисы, которые взаимодействуют друг с другом, то использование конечных точек в коде вызовет беспорядок. Особенно в тех случаях, когда нужно передать в конечную точку экстренную информацию, например токен авторизации.
- Ответа придется ожидать некоторое время, а при его отсутствии выполняются повторные вызовы, что снижает производительность.
- Если сервис приема данных не работает или не может обработать запрос, выполняется процедура ожидания. Например, пользователь размещает заказ в интернет-магазине. В сервис доставки отправляется запрос на отправку заказа, но если он не работает, то заказ теряется. Как отправить тот же заказ в сервис доставки после ее активации?
- Принимающая сторона может быть временами перегружена запросами. В этом случае нужен буфер для ожидания до тех пор, пока получатель не освободится.
Чтобы решить эти проблемы, можно использовать промежуточный сервис, известный как «брокер сообщений» (message broker), который управляет связью между двумя микросервисами.
В качестве брокера сообщений часто применяют RabbitMQ. А если вы используете облако Azure в качестве хостинг-провайдера, то подойдет и Служебная шина Azure.
Как использовать RabbitMQ для связи между микросервисами
Рассмотрим случай, когда отправитель хочет послать сообщение нескольким сервисам. Структура RabbitMQ представлена на рисунке ниже.
Когда Publisher отправляет сообщение, оно принимается Exchange и отправляется в целевые очереди. Сообщение остается в очереди до тех пор, пока получатель не получит и не обработает его.
Типы обмена (Exchange)
Direct exchange передает сообщения в очереди на основе ключа маршрутизации. Этот тип используется по умолчанию.
Fanout exchange передает сообщения во все очереди.
Header Exchange определяет целевую очередь по заголовкам сообщений.
Topic exchange аналогичен direct exchange, но маршрутизация выполняется в соответствии с шаблоном. Вместо фиксированного ключа маршрутизации здесь используются символы подстановки.
Предположим, что у нас есть следующие шаблоны маршрутизации:
- order.logs.customer;
- order.logs.international;
- order.logs.customer.electronics;
- order.logs.international.electronics.
Шаблон маршрутизации order.*.*.electronics соответствует только тем ключам маршрутизации, где первое слово — order, а четвертое — electronics.
Шаблон маршрутизации order.logs.customer.# соответствует любым ключам маршрутизации, начинающимся с order.logs.customer.
Реализация RabbitMQ
Установка
Установите RabbitMQ. После этого сервис RabbitMQ будет запущен на http://localhost:15672/. Для входа в систему введите guest в поле для имени пользователя и пароля. Теперь вам доступна вся статистика.
Создание сервиса отправления
После запуска RabbitMQ создайте два консольных приложения.
- Sender отправляет сообщения в RabbitMQ.
- Receiver принимает сообщения от RabbitMQ.
Добавьте пакет RabbitMQ.Client в оба приложения.
using System;
using RabbitMQ.Client;
using System.Text;
class Send
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
Приведенный выше код создаст соединение с RabbitMQ и очередь hello, а также передаст сообщение в очередь.
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
class Receive
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
А этот код создаст соединение с RabbitMQ и очередь (если она еще не создана), а также зарегистрирует обработчик, который примет и обработает сообщение.
После запуска приложений отправителя и получателя можно увидеть очередь, созданную на портале RabbitMQ, и всплеск на графике, отмечающий получение нового сообщения. На портале можно увидеть, какой сервис ожидает обработки сообщений, и добавить еще один экземпляр этого сервиса для балансировки нагрузки.
Поначалу работа RabbitMQ проходит гладко. Однако при увеличении сложности и наличии множества конечных точек, вызывающих другие сервисы, может возникать путаница.
Вскоре вы обнаружите, что создаете оболочку вокруг драйвера, чтобы уменьшить объем создаваемого кода. Например, при каждом вызове конечной точки другого сервиса нужно предоставлять токен аутентификации. Затем вы обнаружите, что нужно иметь дело с Ack и Nack (acknowledgment и negative acknowledgment), и создадите для этого простой API. В конце концов придется заняться подозрительными сообщениями, которые искажены и вызывают исключения.
Для взаимодействия со всеми этими рабочими процессами можно использовать NServiceBus. Рассмотрим структуру проекта:
С учетом этой архитектуры, конечная точка ClientUI отправляет команду PlaceOrder на конечную точку Sales. В результате с использованием шаблона publish/subscribe конечная точка Sales опубликует событие OrderPlaced, которое будет принято конечной точкой Billing.
Конфигурация NserviceBus
class Program
{
static async Task Main(string[] args)
{
await CreateHostBuilder(args).RunConsoleAsync();
}
public static IHostBuilder CreateHostBuilder(string[] args)
{
return Host.CreateDefaultBuilder(args)
.UseNServiceBus(context =>
{
var endpointConfiguration = new EndpointConfiguration("Sales");
//configure transport - configure where your message will be published/saved
//you can configure it for RabbitMq, Azure Queue, Amazon SQS or any other cloud provider
endpointConfiguration.UseTransport<LearningTransport>();
endpointConfiguration.SendFailedMessagesTo("error"); //When a message fails processing it will be forwarded here.
endpointConfiguration.AuditProcessedMessagesTo("audit"); //All messages received by an endpoint will be forwarded to the audit queue.
return endpointConfiguration;
});
}
}
Затем отправьте сообщение с помощью объекта IMessageSession.
public class HomeController : Controller
{
static int messagesSent;
private readonly ILogger<HomeController> _log;
private readonly IMessageSession _messageSession;
public HomeController(IMessageSession messageSession, ILogger<HomeController> logger)
{
_messageSession = messageSession;
_log = logger;
}
[HttpPost]
public async Task<ActionResult> PlaceOrder()
{
string orderId = Guid.NewGuid().ToString().Substring(0, 8);
var command = new PlaceOrder { OrderId = orderId };
// Send the command
await _messageSession.Send(command)
.ConfigureAwait(false);
_log.LogInformation($"Sending PlaceOrder, OrderId = {orderId}");
dynamic model = new ExpandoObject();
model.OrderId = orderId;
model.MessagesSent = Interlocked.Increment(ref messagesSent);
return View(model);
}
}
В завершение добавьте обработчик для приема и обработки сообщений.
public class PlaceOrderHandler :
IHandleMessages<PlaceOrder>
{
static readonly ILog log = LogManager.GetLogger<PlaceOrderHandler>();
static readonly Random random = new Random();
public Task Handle(PlaceOrder message, IMessageHandlerContext context)
{
log.Info($"Received PlaceOrder, OrderId = {message.OrderId}");
return Task.CompletedTask;
}
}
Это базовая реализация NServiceBus и RabbitMQ.
Заключение
Для обмена данными между сервисами не стоит применять синхронной протокол. Используйте в этом случае RabbitMQ для временного хранения сообщений при доставке из источника в пункт назначения. А NserviceBus пригодится для разделения кода приложения и брокера сообщений, а также для управления длительно выполняющимися запросами.
Читайте также:
Перевод статьи Irfan Yusanif: Best practices to communicate between microservices