Роль посредника при интеграции информационных систем может играть не только Apache Kafka, о чем я рассказывала здесь, но и JMS-брокеры. Наиболее популярным из них является RabbitMQ. Разберем на практическом примере, как он работает и что нужно аналитику знать об этом ПО, чтобы корректно разрабатывать требования, а также описывать межсистемное взаимодействие в интеграционных проектах и микросервисной архитектуре.
Что такое RabbitMQ и как он работает
RabbitMQ – это брокер сообщений с открытым исходным кодом, который маршрутизирует сообщения, отправленные приложением-продюсером (producer) согласно принципам протокола AMQP (Advanced Message Queuing Protocol). В отличие от Apache Kafka, RabbitMQ не просто принимает сообщение от продюсера, а доставляет его приложению-потребителю (consumer). Более того, RabbitMQ работает по модели «умный сервер, тупой клиент», позволяя реализовать сложную логику маршрутизации сообщения благодаря нескольким типам обменников (exchanger), которые отправляют сообщения в очереди (queue).
RabbitMQ работает следующим образом:
- Приложение-продюсер отправляет сообщение конкретному обменнику;
- Обменник маршрутизирует полученное сообщение его в одну или несколько очередей, в зависимости от типа самого обменника, правилам его привязки к очереди (binding) и значению ключа маршрутизации в сообщении (routing key);
- Очередь хранит ссылку на полученное сообщение, которое физически может хранится в оперативной памяти или на диске, в зависимости от свойств самой очереди;
- Когда приложение-потребитель готово получить сообщение из очереди, брокер копирует его по ссылке из очереди и отправляет копию сообщения потребителю;
- Получив сообщение, приложение-потребитель отправляет брокеру подтверждение об успешном получении данных;
- После получения подтверждения брокер удаляет копию сообщения из очереди;
- Наконец, брокер удаляет само исходное сообщение из очереди, очищая место в оперативной памяти или на жестком диске.
В проектах межсистемного взаимодействия с RabbitMQ аналитику важно определить способ маршрутизации сообщений между интегрируемыми приложениями, что задается типом обменника. Чтобы кратко рассказать, чем они отличаются друг от друга и в каких случаях стоит выбирать тот или другой, я составила таблицу.
Тип обменника | Принцип работы | Сценарий применения |
Direct | Прямая отправка сообщений в одну или несколько очередей с совпадающим значением ключа маршрутизации | Когда есть точно известный ключ, по значению которого отдельные приложения-потребители должны получить подходящие сообщения |
Fanout | Все сообщения отправляются во все очереди независимо от ключа маршрутизации | Когда все приложения-потребители должны быстро получать все сообщения |
Headers | Маршрутизация по нескольким атрибутам, заданным в заголовке сообщения. Ключ маршрутизации игнорируется | Когда правила маршрутизации сообщения в очереди сложнее, чем просто по ключу, например, формат данных, комбинация полей и пр. |
Topic | Сообщение отправляется в конкретные очереди по значению ключа маршрутизации, заданного по шаблону | Когда ключ маршрутизации сложный и поток сообщений надо разделить по разным приложениям-потребителям |
Можно создать собственный обменник, комбинируя несколько разных типов между собой, чтобы еще более повысить гибкость маршрутизации сообщений и задать сложный маршрут их распределения по приложениям-потребителям.
Обменник отправляет сообщение в очередь согласно правилам привязки. Очереди в RabbitMQ работают по принципу FIFO (First In – First Out) и бывают 2-х видов:
- временные (autoDelete) — создаются при подключении первого клиента и удаляются при отсутствии клиентских подключений;
- постоянные(durable) — сохраняют свое состояние и восстанавливаются после перезапуска брокера. Это более надежный вид очередей.
Каждая очередь в RabbitMQ должна иметь название (name), а также ряд других обязательных и опциональных свойств, которые определяют поведение очереди. Например, эксклюзивная очередь используется только одним соединением и удаляется при его закрытии, подобно очереди с автоматическим удалением (Auto-Delete). Эксклюзивная очередь предполагает, что сообщения из нее получает только одно приложение-потребитель. Также в свойствах очереди можно задать ее тип (классическая Classic, реплицируемая Quorum или потоковая Stream) и максимальную длину, время жизни (TTL, Time To Live), приоритет и правила репликации.
Чтобы понять, как все эти концепции RabbitMQ работают на самом деле, рассмотрим практический пример.
Основы архитектуры и интеграции информационных систем
Код курса
OAIS
Ближайшая дата курса
20 января, 2025
Продолжительность
16 ак.часов
Стоимость обучения
36 000 руб.
Практический пример
Предположим, у нас есть 2 продюсера, которые создают данные и отправляют их в RabbitMQ. Пусть это будут приложения, которые снимают показания с датчиков температуры (Producer 1) и давления (Producer 2) на нескольких устройствах. Данные по температуре обрабатывает приложение-потребитель Обработчик температуры (Consumer1), а данные по давлению – Обработчик давления (Consumer 2).
Приложение-продюсер по температуре отправляет в RabbitMQ данные в следующей JSON-структуре:
{ "measuring": "temperature", "timestamp": "2023-01-21T02:05:55.000Z ", "device": [ { "device_id": 4123, "value": 69.67, "status": "OK" }, { "device_id": 587, "value": 19.67, "status": "ERROR" }, { "device_id": 1524, "value": 123, "status": "ERROR" }, { "device_id": 97, "value": 70, "status": "OK" } ] }
Аналогичная структура данных приходит от приложения-продюсера по давлению:
{ "measuring": "pressure", "timestamp": "2023-01-21T02:05:55.000Z ", "device": [ { "device_id": 4123, "value": 9.67, "status": "OK" }, { "device_id": 587, "value": 1.03, "status": "ERROR" }, { "device_id": 1524, "value": 6.34, "status": "OK" }, { " device_id": 97, "value": 0.21, "status": "ERROR" } ] }
Чтобы направлять сообщения разным приложениям-потребителям, проще всего выбрать ключ маршрутизации pressure или temperature и маркировать этим ключом каждое отправляемое от соответствующего продюсера сообщение. Такую простую маршрутизацию по точному значению ключа обеспечит прямой тип обменника (Direct).
Для демонстрации возьмем облачную платформу cloudamqp.com (https://www.cloudamqp.com/), которая позволяет создать собственный экземпляр RabbitMQ с бесплатным планом. Зарегистрировавшись на платформе, создадим новый инстанс, назвав его и выбрав подходящий тарифный план.
Мой инстанс rabbitmq_demo называется Anna, имеет имя хоста dingo с бесплатным тарифным планом Little Lemur и находится в амазоновском датацентре.
Кликнув на имя своего инстанса, можно увидеть все его параметры – некоторые из них понадобятся при настройке репликации между брокерами.
Далее можно переходить в панель управления RabbitMQ, которая в моем случае располагается по URL-адресу https://dingo.rmq.cloudamqp.com/.
Сперва выберем подходящий тип обменника на вкладке Exchanges. В случае рассматриваемого примера это прямой обменник, т.е. amq.direct. Можно создать собственный обменник, используя существующий типы или комбинировав их между собой. Изначально платформа cloudmq предоставляет 7 обменников на основе 4-х стандартных типов.
Чтобы привязать обменник к очереди, надо сперва создать ее. Для этого перейдем на вкладку очередей (Queues). Создадим 2 очереди: pressure – для сообщений с данными по давлению и temperature – для данных по температуре.
Свойства каждой очереди опишем в таблице:
Очередь | Смысл | Тип | Приложение-Потребитель | Эксклюзивная | Постоянная | Авто-удаление | Хранение данных | Длина (число сообщений) | Приоритет |
Pressure | Хранит данные по давлению | classic | Обработчик давления | да | Да | Нет | В памяти | 100 | 5 |
Temperature | Хранит данные по температуре | classic | Обработчик температуры | Да | да | нет | В памяти | 100 | 3 |
Весь перечень очередей показан на вкладке Queues. Сейчас все они находятся в «холостом» состоянии (idle), т.к. на текущий момент приложения-продюсеры не публикуют в них сообщения. Также каждая очередь развернута в кластере c с высокой доступностью, что отмечено маркером HA (High Availability). Из-за отсутствия приложений-потребителей нет никаких сведений о подтверждении получения сообщений (ack).
Далее вернемся на вкладку Exchanges и привяжем обменник к очереди, задав ключ маршрутизации. В случае рассматриваемого примера выбираем обменник amq.direct и привязываем к очереди pressure, написав в поле Routing key значение pressure и нажав кнопку Bind. Аналогично проделаем для temperature.
Затем опубликуем несколько сообщений в этот обменник, задав каждому ключ маршрутизации. Дополнительно сообщению можно задать свойства, например, приоритет, время жизни и пр. В проектах интеграции и описания информационных систем это удобно сделать в табличном виде.
Также в реальных проектах межсистемной интеграции и микросервисного взаимодействия с применением RabbitMQ аналитик может определить следующие свойства сообщения:
- тип контента (content_type), например, application/json;
- кодировка содержимого (content_encoding), например, gzip;
- приоритет (priority);
- время жизни (expiration);
- идентификатор сообщения (message_id);
- отметка времени (timestamp);
- тип (type) — произвольная строка, которая помогает приложениям-продюсерам сообщать, что это за сообщение. RabbitMQ не проверяет и не использует это поле, оно нужно для использования и интерпретации приложениями и плагинами.
- идентификатор пользователя (user_id), приложения (app_id) и кластера (cluster_id).
После публикации сообщений можно посмотреть, как меняется поведение очереди.
Хотя в реальных проектах RabbitMQ доставляет очереди приложениям-потребителям программным образом, сейчас мы не будем писать собственное приложение, а посмотрим содержимое очереди в GUi платформы cloudmq на вкладке Queues, нажав кнопку Get Messages. Предположим, хотим просмотреть 10 сообщений из тех, что находятся в очереди.
Если нужно настроить репликацию, т.е. копирование данных между брокерами, в RabbitMQ есть плагин Shovel. Его можно использовать для балансировки нагрузки на очередь или когда нужно перенаправлять сообщения с одного брокера на другой. Динамические перенаправления определяются с использованием параметров среды выполнения, а статические задаются в файле advanced.config. Создадим 1 динамическое перенаправление на вкладке Admin, в разделе Shovel Management.
После создания этого появятся сведения о динамической репликации данных между брокерами.
Разумеется, в этой короткой статье рассмотрены не все аспекты работы с RabbitMQ. Однако, надеюсь, она помогла аналитикам познакомиться с этим промежуточным ПО, которое часто используется для интеграции ИС и асинхронного взаимодействия микросервисов как приложений-потребителей и продюсеров. Более подробно с примерами программного кода о работе RabbitMQ читайте в здесь и здесь.
Основы архитектуры и интеграции информационных систем
Код курса
OAIS
Ближайшая дата курса
20 января, 2025
Продолжительность
16 ак.часов
Стоимость обучения
36 000 руб.
Подробнее познакомиться со всеми рассмотренными темами, а также другими основами архитектуры и интеграции информационных систем вы сможете на курсах Школы прикладного бизнес-анализа в нашем лицензированном учебном центре обучения и повышения квалификации системных и бизнес-аналитиков в Москве:
- Основы архитектуры и интеграции информационных систем
- Разработка ТЗ на информационную систему по ГОСТ и SRS
Источники