Как именно JMS-брокер RabbitMQ обеспечивает взаимодействие разных приложений: смотрим на примере Python-сервисов, запущенных в Google Colab для публикации и потребления сообщений из очередей облачной платформы cloudamqp.com.
Постановка задачи
Хотя системный, и, тем более, бизнес-аналитик и не пишет исходный код, некоторые архитектурные концепции, нужные в работе на проектах интеграции информационных систем, можно понять, заглянув под капот технологии. Поэтому сегодня посмотрим, как именно JMS-брокер сообщений RabbitMQ обеспечивает взаимодействие разных приложений. Для этого я написала 2 небольших Python-сервиса в Goggle Colab для публикации и потребления сообщений из очередей RabbitMQ, развернутых на облачной платформе cloudamqp.com.
Напомню, Google Colab – это интерактивная среда для написания и интерпретации Python-кода. Как она работает, я ранее писала здесь. Свой экземпляр брокера сообщений RabbitMQ развернем в облачной платформе cloudamqp.com с бесплатным планом. Как она устроена, и какие возможности предоставляет пользователю, смотрите в прошлой статье.
Для демонстрации возьмем пример из моего недавнего материала про RabbitMQ и немного упростим его. Пусть приложением-продюсером, которое создает данные и отправляет их в RabbitMQ, будет умный датчик с 2-мя видами измерений: температура и давление. Хотя эти данные и считывает одно приложение-потребитель, предположим, что оно обрабатывает их по-разному, в разных функциях. Чтобы маршрутизировать сообщения разных измерений в разные очереди temperature и pressure, промаркируем сообщение с разными измерениями ключом маршрутизации (temperature и pressure) соответственно и будем принимать их на прямой тип обменника (Direct), который обеспечивает простую маршрутизацию по точному значению ключа.
Пусть данные от умного датчика, т.е. приложения-продюсера, отправляются в JSON-формате по следующей схеме
{ "$schema": "http://json-schema.org/draft-04/schema#", "type": "object", "properties": { " device_id": { "type": "integer" }, "measure": { "type": "string" }, "value": { "type": "integer" } }, "required": [ " device_id", "measure", "value" ] }
Чтобы не заполнять данные по измерениям вручную, в Python-скрипте для продюсера будем использовать оператор автоматической генерации случайных значений (random).
Подготовка RabbitMQ и создание приложения-продюсера
Теперь подготовим свой облачный экземпляр RabbitMQ. Создадим обменник прямого типа под названием sensor_exchange и 2 очереди: temperature и pressure.
Далее напишем код приложения продюсера в интерактивном блокноте Google Colab. Это Python-приложение отправляет JSON-сообщение в обменник RabbitMQ со случайными значениями температуры или давления каждые 3 секунды:
!pip install pika import pika import json import random import time # Connect to RabbitMQ server connection = pika.BlockingConnection(pika.URLParameters('здесь_должна_быть_URL-адрес_вашего_экзампляра_RabbitMQ:порт_5671_или_5672/виртуальный_хост_вашего_экземпляра')) channel = connection.channel() # Declare an exchange channel.exchange_declare(exchange='sensor_exchange', exchange_type='direct') # Prepare a list of divices devices =[random.randint(0, 100) for i in range(100)] # Prepare a list of possible routing keys routing_keys = ['pressure', 'temperature'] while True: # Prepare random message in JSON format measure=random.choice(routing_keys) data = {'device': random.choice(devices), 'measure': measure, 'value': random.randint(0,100)} message = json.dumps(data) # Send the message to the exchange channel.basic_publish(exchange='sensor_exchange', routing_key=measure, body=message) print(f' [x] Sent {message}') # Sleep for 3 seconds time.sleep(3) channel.close() connection.close()
В Google Colab этот код лучше распределить по 3-м ячейкам:
- в 1-ой ячейке будет установка и импорт Python-библиотек для работы с RabbitMQ (pika), а также форматом JSON, операторами random и времени;
- во 2-ой ячейке код подключения к RabbitMQ и публикации сообщений;
- в 3-ей ячейке закрытие канала и активного подключения.
Запустим на исполнение этот код, выполнив последовательно первые 2 ячейки, и увидим вывод сгенерированных и отправленных RabbitMQ сообщений в области отладки в Colab:
Основы архитектуры и интеграции информационных систем
Код курса
OAIS
Ближайшая дата курса
20 января, 2025
Продолжительность
16 ак.часов
Стоимость обучения
36 000 руб.
Посмотрим в GUI-интерфейсы платформы cloudamqp, как сообщения, отправленные на один обменник, попадают в разные очереди по ключу маршрутизации:
Также в GUI-интерфейсе платформы можно посмотреть статистику по активным подключениям, открытым каналам и общий обзор опубликованных и считанных сообщений.
Потребление сообщений
Теперь напишем код приложения-потребителя, которое не просто потребляет данные из двух разных очередей RabbitMQ, но и обрабатывает принятые значения измерений, маркируя состояние устройства отметкой OK или ошибкой ERROR:
!pip install pika import pika import json import random from time import sleep # Connect to RabbitMQ server connection = pika.BlockingConnection(pika.URLParameters('здесь_должна_быть_URL-адрес_вашего_экзампляра_RabbitMQ:порт_5671_или_5672/виртуальный_хост_вашего_экземпляра')) channel = connection.channel() # Declare an exchange channel.exchange_declare(exchange='sensor_exchange', exchange_type='direct') temperature_queue = channel.queue_declare(queue='temperature') pressure_queue = channel.queue_declare(queue='pressure') channel.queue_bind(exchange='sensor_exchange', queue=temperature_queue.method.queue, routing_key='temperature') channel.queue_bind(exchange='sensor_exchange', queue=pressure_queue.method.queue, routing_key='pressure') def on_temperature_message(ch, method, properties, body): #print("Температура ", json.loads(body)) dictDataTemperature = json.loads(body) temperature_state = 'состояние температуры' temperature_value=int(dictDataTemperature['value']) if ((temperature_value) <= 80) and ((temperature_value) >= 20) : temperature_state = 'OK' else: temperature_state = 'ERROR' print("Устройство № ", dictDataTemperature["device"], "состояние температуры", temperature_state, "значение ", temperature_value) def on_pressure_message(ch, method, properties, body): #print("Pressure: ", json.loads(body)) dictDataPressure = json.loads(body) pressure_state = 'состояние давления' pressure_value=int(dictDataPressure['value']) if ((pressure_value) <= 50) and ((pressure_value) >= 10) : pressure_state = 'OK' else: pressure_state = 'ERROR' print("Устройство № ", dictDataPressure["device"], "состояние давления", pressure_state, "значение ", pressure_value) print('Waiting for messages. To exit press CTRL+C') channel.basic_consume(queue=temperature_queue.method.queue, on_message_callback=on_temperature_message, auto_ack=True) channel.basic_consume(queue=pressure_queue.method.queue, on_message_callback=on_pressure_message, auto_ack=True) channel.start_consuming() channel.close() connection.close()
Аналогично продюсеру, разделим его на 3 ячейки в Google Colabб сохранив в отдельном блокноте. Чтобы потребление сообщений работало, следует параллельно запустить их публикацию. Поэтому будет 2 разных ipynb-файла. Посмотрим, как работает потребление сообщений из 2-х очередей.
При публикации и потреблении сообщений пропускную способность обменника можно посмотреть в GUI платформы cloudamqp:
Используемый обменник sensor_exchange имеет отличные от нуля значения скоростей публикации и потребления сообщений.
Весь код, приведенный в этой статье, работает и вы можете повторить все действия, настроив свой облачный экземпляр брокера сообщений и подставив нужные учетные данные. Надеюсь, после этого краткого ликбеза стали понятные принципы работы этого популярного JMS-брокера сообщений, который часто используется для интеграции ИС и асинхронного взаимодействия микросервисов в виде приложений-потребителей и продюсеров. Повторить подобный пример для Apache Kafka можно, прочитав мою новую статью.
Основы архитектуры и интеграции информационных систем
Код курса
OAIS
Ближайшая дата курса
20 января, 2025
Продолжительность
16 ак.часов
Стоимость обучения
36 000 руб.
Подробнее познакомиться со всеми рассмотренными темами, а также другими основами архитектуры и интеграции информационных систем вы сможете на курсах Школы прикладного бизнес-анализа в нашем лицензированном учебном центре обучения и повышения квалификации системных и бизнес-аналитиков в Москве: