...

Пишем продюсер и консумер для RabbitMQ на Python в Google Colab

RabbitMQ producer and consumer Python Google Colab пример, интеграция информационных систем простыми словами для начинающих примеры курсы обучение, интеграция RabbitMQ примеры курсы обучение, основы архитектуры и интеграции информационных систем для бизнес-аналитика, архитектура информационных систем основы введение, RabbitMQ для аналитика краткий ликбез, обучение системных и бизнес-аналитиков, курсы системного и бизнес-анализа, Школа прикладного бизнес-анализа Учебный Центр Коммерсант

Как именно JMS-брокер RabbitMQ обеспечивает взаимодействие разных приложений: смотрим на примере Python-сервисов, запущенных в Google Colab для публикации и потребления сообщений из очередей облачной платформы cloudamqp.com.

Постановка задачи

Хотя системный, и, тем более, бизнес-аналитик и не пишет исходный код, некоторые архитектурные концепции, нужные в работе на проектах интеграции информационных систем, можно понять, заглянув под капот технологии. Поэтому сегодня посмотрим, как именно JMS-брокер сообщений RabbitMQ обеспечивает взаимодействие разных приложений. Для этого я написала 2 небольших Python-сервиса в Goggle Colab для публикации и потребления сообщений из очередей RabbitMQ, развернутых на облачной платформе cloudamqp.com.

Напомню, Google Colab – это интерактивная среда для написания и интерпретации Python-кода. Как она работает, я ранее писала здесь. Свой экземпляр брокера сообщений RabbitMQ развернем в облачной платформе cloudamqp.com с бесплатным планом. Как она устроена, и какие возможности предоставляет пользователю, смотрите в прошлой статье.

Для демонстрации возьмем пример из моего недавнего материала про RabbitMQ и немного упростим его. Пусть приложением-продюсером, которое создает данные и отправляет их в RabbitMQ, будет умный датчик с 2-мя видами измерений: температура и давление. Хотя эти данные и считывает одно приложение-потребитель, предположим, что оно обрабатывает их по-разному, в разных функциях. Чтобы маршрутизировать сообщения разных измерений в разные очереди temperature и pressure, промаркируем сообщение с разными измерениями ключом маршрутизации (temperature и pressure) соответственно и будем принимать их на прямой тип обменника (Direct), который обеспечивает простую маршрутизацию по точному значению ключа.

RabbitMQ пример курсы обучение для аналитика
Публикация и потребление сообщений из RabbitMQ: пример

 Пусть данные от умного датчика, т.е. приложения-продюсера, отправляются в JSON-формате по следующей схеме

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "type": "object",
  "properties": {
    " device_id": {
      "type": "integer"
    },
    "measure": {
      "type": "string"
    },
    "value": {
      "type": "integer"
    }
  },
  "required": [
    " device_id",
    "measure",
    "value"
  ]
}

Чтобы не заполнять данные по измерениям вручную, в Python-скрипте для продюсера будем использовать оператор автоматической генерации случайных значений (random).

Подготовка RabbitMQ и создание приложения-продюсера

Теперь подготовим свой облачный экземпляр RabbitMQ. Создадим обменник прямого типа под названием sensor_exchange и 2 очереди: temperature и pressure.

обменник и очереди RabbitMQ
Создаем обменник и очереди в облачном экземпляре RabbitMQ

Далее напишем код приложения продюсера в интерактивном блокноте Google Colab. Это Python-приложение отправляет JSON-сообщение в обменник RabbitMQ со случайными значениями температуры или давления каждые 3 секунды:

!pip install pika
import pika
import json
import random
import time

# Connect to RabbitMQ server
connection = pika.BlockingConnection(pika.URLParameters('здесь_должна_быть_URL-адрес_вашего_экзампляра_RabbitMQ:порт_5671_или_5672/виртуальный_хост_вашего_экземпляра'))
channel = connection.channel()

# Declare an exchange
channel.exchange_declare(exchange='sensor_exchange', exchange_type='direct')

# Prepare a list of divices
devices =[random.randint(0, 100) for i in range(100)]

# Prepare a list of possible routing keys
routing_keys = ['pressure', 'temperature']

while True:
    # Prepare random message in JSON format
    measure=random.choice(routing_keys)
    data = {'device': random.choice(devices), 'measure': measure, 'value': random.randint(0,100)}
    message = json.dumps(data)

    # Send the message to the exchange
    channel.basic_publish(exchange='sensor_exchange', routing_key=measure, body=message)
    print(f' [x] Sent {message}')

    # Sleep for 3 seconds
    time.sleep(3)

channel.close()
connection.close()

В Google Colab этот код лучше распределить по 3-м ячейкам:

  • в 1-ой ячейке будет установка и импорт Python-библиотек для работы с RabbitMQ (pika), а также форматом JSON, операторами random и времени;
  • во 2-ой ячейке код подключения к RabbitMQ и публикации сообщений;
  • в 3-ей ячейке закрытие канала и активного подключения.
RQM producer Python Google Colab
Python-код продюсера для RQM в Google Colab

Запустим на исполнение этот код, выполнив последовательно первые 2 ячейки, и увидим вывод сгенерированных и отправленных RabbitMQ сообщений в области отладки в Colab:

producer Python Google Colab
Публикация сообщений

Основы архитектуры и интеграции информационных систем

Код курса
OAIS
Ближайшая дата курса
20 января, 2025
Продолжительность
16 ак.часов
Стоимость обучения
36 000 руб.

Посмотрим в GUI-интерфейсы платформы cloudamqp, как сообщения, отправленные на один обменник, попадают в разные очереди по ключу маршрутизации:

cloudamqp RabbitMQ
Публикация сообщений в GUI-интерфейсе облачной платформы cloudamqp

Также в GUI-интерфейсе платформы можно посмотреть статистику по активным подключениям, открытым каналам и общий обзор опубликованных и считанных сообщений.

Потребление сообщений

Теперь напишем код приложения-потребителя, которое не просто потребляет данные из двух разных очередей RabbitMQ, но и обрабатывает принятые значения измерений, маркируя состояние устройства отметкой OK или ошибкой ERROR:

!pip install pika
import pika
import json
import random
from time import sleep

# Connect to RabbitMQ server
connection = pika.BlockingConnection(pika.URLParameters('здесь_должна_быть_URL-адрес_вашего_экзампляра_RabbitMQ:порт_5671_или_5672/виртуальный_хост_вашего_экземпляра'))
channel = connection.channel()

# Declare an exchange
channel.exchange_declare(exchange='sensor_exchange', exchange_type='direct')

temperature_queue = channel.queue_declare(queue='temperature')
pressure_queue = channel.queue_declare(queue='pressure')

channel.queue_bind(exchange='sensor_exchange', queue=temperature_queue.method.queue, routing_key='temperature')
channel.queue_bind(exchange='sensor_exchange', queue=pressure_queue.method.queue, routing_key='pressure')

def on_temperature_message(ch, method, properties, body):
   #print("Температура ", json.loads(body))
   dictDataTemperature = json.loads(body)
   temperature_state = 'состояние температуры'
   temperature_value=int(dictDataTemperature['value'])
   if ((temperature_value) <= 80) and ((temperature_value) >= 20) :
     temperature_state = 'OK'
   else:
     temperature_state = 'ERROR'
   print("Устройство № ", dictDataTemperature["device"], "состояние температуры", temperature_state, "значение ", temperature_value)
   
def on_pressure_message(ch, method, properties, body):
   #print("Pressure: ", json.loads(body))
   dictDataPressure = json.loads(body)
   pressure_state = 'состояние давления'
   pressure_value=int(dictDataPressure['value'])
   if ((pressure_value) <= 50) and ((pressure_value) >= 10) :
     pressure_state = 'OK'
   else:
     pressure_state = 'ERROR'
   print("Устройство № ", dictDataPressure["device"], "состояние давления", pressure_state, "значение ", pressure_value)

print('Waiting for messages. To exit press CTRL+C')

channel.basic_consume(queue=temperature_queue.method.queue, on_message_callback=on_temperature_message, auto_ack=True)
channel.basic_consume(queue=pressure_queue.method.queue, on_message_callback=on_pressure_message, auto_ack=True)
channel.start_consuming()


channel.close()
connection.close()

Аналогично продюсеру, разделим его на 3 ячейки в Google Colabб сохранив в отдельном блокноте. Чтобы потребление сообщений работало, следует параллельно запустить их публикацию. Поэтому будет 2 разных ipynb-файла. Посмотрим, как работает потребление сообщений из 2-х очередей.

Google Colab Python-consumer RabbitMQ
Работа потребителя сообщений в Google Colab

При публикации и потреблении сообщений пропускную способность обменника можно посмотреть в GUI платформы cloudamqp:

exchange cloudamqp, обменник RabbitMQ
Обменники в cloudamqp

Используемый обменник sensor_exchange имеет отличные от нуля значения скоростей публикации и потребления сообщений.

Весь код, приведенный в этой статье, работает и вы можете повторить все действия, настроив свой облачный экземпляр брокера сообщений  и подставив нужные учетные данные. Надеюсь, после этого краткого ликбеза стали понятные принципы работы этого популярного JMS-брокера сообщений, который часто используется для интеграции ИС и асинхронного взаимодействия микросервисов в виде приложений-потребителей и продюсеров. Повторить подобный пример для Apache Kafka можно, прочитав мою новую статью.

Основы архитектуры и интеграции информационных систем

Код курса
OAIS
Ближайшая дата курса
20 января, 2025
Продолжительность
16 ак.часов
Стоимость обучения
36 000 руб.

Тест по Apache Kafka и RabbitMQ для аналитика

Подробнее познакомиться со всеми рассмотренными темами, а также другими основами архитектуры и интеграции информационных систем вы сможете на курсах Школы прикладного бизнес-анализа в нашем лицензированном учебном центре обучения и повышения квалификации системных и бизнес-аналитиков в Москве:

 

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Добавить комментарий