...

Альтернативные обменники и очереди недоставленных сообщений в RabbitMQ

RabbitMQ, очереди сообщений, асинхронная интеграция, топология потокового конвейера, брокеры сообщений для аналитика, обучение системных аналитиков, Школа приклданого бизнес-анализа Учебный центр Коммерсант

Зачем нужны альтернативные обменники в RabbitMQ, где настроить очередь недоставленных сообщений и как этот JMS-брокер обеспечивает альтернативную маршрутизацию. Пример топологии потокового конвейера и пара Python-сервисов, запущенных в Google Colab для публикации и потребления сообщений из очередей облачной платформы cloudamqp.com.

Постановка задачи и проектирование потокового конвейера на RabbitMQ

В отличие от Apache Kafka, JMS-брокер сообщений RabbitMQ реализует подход «умный сервер, тупой клиент», позволяя облегчить код потребителя за счет настроек самой платформы. Например, можно направлять сообщения, которые не смог обработать потребитель, в специализированный обменник-сборщик недоставленных сообщений. К этому обменнику будет привязана очередь недоставленных сообщений, на которую подписывается обработчик.

Аналогично можно настроить сбор сообщений, неотправленных в очередь из-за несоответствия правилам привязки. Например, если приложение-продюсер отправляет в обменник типа direct, headers или topic сообщение, которое не соответствует правилам привязки очереди к обменнику, оно будет отклонено. А, поскольку обменник не накапливает сообщения, а только маршрутизирует их, отклонение означает фактическую потерю данных.

Избежать этого можно, привязав к обменнику не только очередь с правилами привязки, но и альтернативный обменник, связанный с очередью для накопления отклоненных сообщений.

Чтобы продемонстрировать, как это работает, я написала и запустила в интерактивной среде Google Colab 2 небольших Python-сервиса для публикации и потребления сообщений из очередей RabbitMQ.  Экземпляр брокера сообщений RabbitMQ у меня развернут в облачной платформе cloudamqp.com с бесплатным планом. Как она устроена, и какие возможности предоставляет пользователю, смотрите в прошлой статье.

Рассмотрим пример, когда на обменник типа direct, который маршрутизирует сообщения в очередь по точно заданному ключу маршрутизации, приложение-продюсер отправляет сообщения. Причем ключ маршрутизации не всегда устанавливается верно. Кроме того, иногда полезная нагрузка сообщения формируется не в том виде, на который рассчитывает приложение-потребитель.

альтернативный обменник RabbitMQ, потоковый конвейер RabbitMQ
Потоковый конвейер на RabbitMQ с альтернативным обменником

Чтобы реализовать это поведение, надо создать в GUI serverless-платформы RabbitMQ необходимые обменники и очереди. «Правильным» обменником, который принимает правильные сообщения у меня является обменник с названием REX типа direct. К нему привязана очередь right_queue по точно заданному ключу маршрутизации rkey. Альтернативным обменником для REX является обменник DLEX типа fanout, который просто перенаправляет сообщения в привязанную очередь dl_queue без проверки каких-либо правил. Помимо привязки альтернативного обменника к самому обменнику, я также привязала к очереди right_queue альтернативный обменник DLEX с альтернативным ключом dlkey. Это значит, что ключ маршрутизации будет переназначаться, если сообщение маршрутизируется на обменник DLEX из очереди right_queue. Такое возможно, если сообщение не может быть обработано приложением-потребителем, истек срок жизни сообщения или достигнут предел очереди.

Конфигурации обменников и очередей
Конфигурации обменников и очередей

BPMN-диаграмма процессов работы с сообщениями выглядит так.

BPMN, RabbitMQ, публикация и потребление сообщений
BPMN-диаграмма процессов публикации и потребления сообщений с RabbitMQ

Далее рассмотрим реализацию Python-сервисов публикации и потребления сообщений по этой топологии потокового конвейера.

Основы архитектуры и интеграции информационных систем

Код курса
OAIS
Ближайшая дата курса
20 января, 2025
Продолжительность
16 ак.часов
Стоимость обучения
36 000 руб.

Пример реализации Python-сервисов потокового конвейера

Код приложения-продюсера, который каждые 3 секунды формирует и публикует в RabbitMQ сообщение, выглядит так:

################################### новая ячейка Colab ####################################################
#установка библиотеки
!pip install pika

#импорт модулей
import pika
import json
import random
from datetime import datetime
import time
import os
################################### новая ячейка Colab ####################################################
# Подключение к серверу RabbitMQ server в облачной платформе cloudamqp.com
connection = pika.BlockingConnection(pika.URLParameters('amqps://username:password@v-host.rmq.cloudamqp.com:port/username'))#учетные данные
channel = connection.channel()

# объявление обменника
exch='REX'
exch_type='direct'
alt_exh = 'DLEX'
channel.exchange_declare(exchange=exch, exchange_type=exch_type, durable=True, arguments={'alternate-exchange': alt_exh})

number=0 #начальный номер события

#бесконечный цикл публикации данных
while True:
  start_time = time.time()  # запоминаем время начала отправки сообщения
  #подготовка данных для публикации в JSON-формате
  producer_publish_time = time.strftime("%m/%d/%Y %H:%M:%S", time.localtime(start_time))

  number=number+1

  #задаем ключ маршрутизации (верный или для dlq)
  OK = random.choice([1,0])
  if OK==1:
    rk='rkey'
    content=f'ВСЕ НОРМАЛЬНО, произошло событие номер {number}'
    #создаем полезную нагрузку в JSON
    data = {'producer_publish_time': producer_publish_time,'content': content}
  else :
    rk=''
    content='НЕКОРРЕКТНОЕ измерение'
    data = {'event_time': producer_publish_time,'content': content}

  message = json.dumps(data)

  #отправка сообщения в обменник RabbitMQ с ключом маршрутизации и свойствами (заголовок)
  channel.basic_publish(exchange=exch, routing_key=rk, body=message)

  #вывод отладочной информации
  print(f' ключ маршрутизации {rk} Сообщение {number} отправлено {message}')

  #повтор через 3 секунды
  time.sleep(3)
################################### новая ячейка Colab ####################################################
#закрываем канал и соединение
channel.close()
connection.close()

Код приложения-потребителя, который считывает данные из «правильной» очереди right_queue и записывает данные в Google-таблицу, выглядит так:

################################### новая ячейка Colab ####################################################
#установка библиотек
!pip install pika

#импорт модулей
import pika
import json
import random
import requests
from datetime import datetime

#импорт модулей для GS
from google.colab import auth
auth.authenticate_user()
import gspread
from google.auth import default
creds, _ = default()
################################### новая ячейка Colab ####################################################
# Подключение к серверу RabbitMQ server в облачной платформе cloudamqp.com
connection = pika.BlockingConnection(pika.URLParameters('amqps://username:password@v-host.rmq.cloudamqp.com:port/username'))#учетные данные
channel = connection.channel()

#объявление очереди RQM
dlrk='dlkey'
alt_exh = 'DLEX'

my_queue='right_queue'
questions_queue = channel.queue_declare(queue=my_queue, arguments={'x-dead-letter-exchange': alt_exh, 'x-dead-letter-routing-key': dlrk})

#Google Sheets Autentificate
gc = gspread.authorize(creds)

#Открытие заранее созданного файла Гугл-таблицы по идентификатору (взять из его URL, например, у меня это https://docs.google.com/spreadsheets/d/1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM)
sh = gc.open_by_key('1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM')
wks = sh.worksheet("test_0") #в какой лист гугл-таблиц будем записывать

#начальный номер строки для записи данных
x=1

def on_inputs_message(ch, method, properties, body):
    global x

    try:
        # распаковка сообщения
        data = json.loads(body)

        # парсинг сообщения
        producer_publish_time = data['producer_publish_time']
        content = data['content']

        now = datetime.now()
        consuming_time = now.strftime("%m/%d/%Y %H:%M:%S")

        # вывод распарсенных данных в консоль
        print(f'{json.dumps(data)}')

        # обновление данных в Google Sheets
        print(x)
        x += 1
        wks.update_cell(x, 1, producer_publish_time)
        wks.update_cell(x, 2, consuming_time)
        wks.update_cell(x, 3, content)

    except Exception as e:
        # запись ошибок в лог-файл на Google Диске
        error_str = f"Error: {str(e)}, Value: {data}\n"
        with open("dlq.txt", "a") as f:
            f.write(error_str)
        print(f"Error: {str(e)}")

#потребляем данные из RabbitMQ
while True:
  print('Waiting for messages. To exit press CTRL+C')
  #привязка к очереди headers-обменника RQM
  channel.basic_consume(queue=questions_queue.method.queue, on_message_callback=on_inputs_message, auto_ack=True)
  channel.start_consuming()
################################### новая ячейка Colab ####################################################
#закрываем канал и соединение
channel.close()
connection.close()

В коде потребителя команды обработки считанного сообщения заключены в конструкцию try-except. Это предупреждает сбой потребления из-за исключений, возникающих, если схема данных потребленного сообщения не совпадает с заложенным алгоритмом синтаксического разбора,. Такие сообщения записываются в текстовый лог-файл dlq.txt, расположенный в рабочей директории сеансового пространства среды Google Colab. В данном примере этот файл остается пустым по той причине, что в очередь потребления right_queue сообщения с «неправильно» структурой данных просто-напросто не попадают из-за алгоритма формирования полезной нагрузки точно в соответствии с ключом маршрутизации маршрутизации.

Результат работы представлен на картинке.

Потребление и накопление сообщений, очереди сообщений, RabbitMQ
Потребление и накопление сообщений

Весь код, приведенный в этой статье, работает. Можно повторить все действия, настроив свой облачный экземпляр брокера сообщений  и подставив нужные учетные данные. Надеюсь, после этого краткого ликбеза стали понятные принципы работы RabbitMQ, что часто используется для интеграции ИС и асинхронного взаимодействия микросервисов в виде приложений-потребителей и продюсеров.

Основы архитектуры и интеграции информационных систем

Код курса
OAIS
Ближайшая дата курса
20 января, 2025
Продолжительность
16 ак.часов
Стоимость обучения
36 000 руб.

Подробнее познакомиться со всеми рассмотренными темами, а также другими основами архитектуры и интеграции информационных систем вы сможете на курсах Школы прикладного бизнес-анализа в нашем лицензированном учебном центре обучения и повышения квалификации системных и бизнес-аналитиков в Москве:

 

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Добавить комментарий