Как сгенерировать спецификацию AsyncAPI с Python-библиотекой FastStream: описание потокового конвейера для RabbitMQ

RabbitMQ для аналитика, проектирование потокового конвейера на RabbitMQ, генерация спецификации AsyncAPI, библиотека FastStream пример

Автоматическая генерация спецификации асинхронного обмена сообщениями AsyncAPI для RabbitMQ с помощью Python-библиотеки FastStream в VSCode и в Google Colab.

Что такое FastStream и при чем здесь AsyncAPI

Я уже несколько раз рассказывала про AsyncAPI — спецификацию описания асинхронных API, которая похожа на OpenAPI (Swagger), но для асинхронных архитектур. А здесь показывала пример разработки такой спецификации для брокера сообщений RabbitMQ. Однако, на практике вручную писать большой YAML- или JSON-файл с операциями публикации и потребления данных, а также их заголовками и полезной нагрузкой сообщений, довольно утомительно. К счастью, эту работу можно автоматизировать, получив готовую спецификацию в виде JSON-файл, сгенерированный с помощью Python-библиотеки FastStream.

FastStream — это простой, но довольно мощный Python-фреймворк для разработки асинхронных потоковых приложений, которые обрабатывают сообщения из различных популярных брокеров:  RabbitMQ, Kafka, NATS и др. Подобно тому как FastAPI и Flask позволяют сгенерировать спецификацию OpenAPI из кода веб-приложения с REST-архитектурой, FastStream создает документацию AsyncAPI с помощью Python-декораторов. А встроенная валидация данных и аннотирование их типов обеспечивается использованием библиотеки Pydantic, которая автоматически проверяет соответствие данных заявленным типам при создании схем данных. Асинхронность и высокая скорость работы FastStream реализуется благодаря библиотеке асинхронного программирования asyncio и компилятора Cython, который преобразует Python-код в C-код для последующей компиляции. Все функции-обработчики сообщений в FastStream объявляются с помощью асинхронного объявления async def, что позволяет обрабатывать множество сообщений параллельно, без блокировки. Поэтому FastStream может эффективно управлять большим количеством одновременных соединений и задач, не блокируя поток исполнения.

Познакомившись с FastStream, далее рассмотрим, как использовать эту библиотеку для автогенерации спецификации AsyncAPI для RabbitMQ.

В качестве примера, как обычно, возьмем интернет магазин, куда приложение-продюсер публикует клиентские обращения, которые могут быть заявками на покупку продуктов от физических или юридических лиц, а также вопросы по оплате, доставке, работе с vip-клиентами и скидкам. Все эти обращения обрабатывают разные приложения-обработчики. Топология потокового конвейера на RabbitMQ для этого примера выглядит следующим образом:

  • все обращения публикуются на обменник типа Fanout, который реплицирует входящие сообщения на 2 обменника: Topic и Headers;
  • обменник типа Topic маршрутизирует только заявки на покупку продуктов в 2 очереди: очередь корпоративных заявок и очередь индивидуальных заявок;
  • обменник типа Headers маршрутизирует вопросы по темам в 3 разные очереди: очередь вопросов по оплате, очередь вопросов по доставке и очередь всех остальных вопросов.
Топология потокового конвейера на RabbitMQ
Топология потокового конвейера на RabbitMQ

Определение и визуализация структуры данных

Для визуализации структур данных и самих спецификаций в среде разработки нужно установить соответствующие плагины: PlantUML и AsyncAPI Preview. О них я недавно рассказывала здесь. Создадим текстовый файл для отображения схем полезной нагрузки сообщений, публикуемых в RabbitMQ. Предположим, приложение-продюсер публикует в брокер следующие сообщения:

  • Request – базовая структура данных для всех типов обращений, т.е. вопросов и заявок от корпоративных и частных клиентов на покупку товаров. В этой структуре данные есть следующие обязательные поля:
    • moment — момент времени обращения с типом данных datetime;
    • name — имя заявителя (строка);
    • subject — тема обращения (строка);
    • content — содержание обращения (строка);
  • CorporateRequest – корпоративная заявка на покупку, которая наследует все поля класса Request и дополнительно содержит обязательное строковое поле inn для хранения ИНН организации;
  • PrivateRequest – заявка на покупку от частного лица, которая наследует все поля класса Request и дополнительно содержит обязательное строковое поле phone_number с номером телефона клиента и целочисленное поле для хранения его возраста age;
  • QuestionRequest – обращение с вопросом, которое наследует все поля класса Request и дополнительно содержит обязательное целочисленное поле priority для обозначения приоритета обращения.
Диаграмма классов PlantUML
Диаграмма классов PlantUML

Скрипт PlantUML:

@startuml

class Request {
    +moment : datetime [required]
    +name : string [required]
    +subject : string [required]
    +content : string [required]
}

note top of Request
  Базовый класс обращения
end note

class CorporateRequest {
    +inn : string [required]
}
note top of CorporateRequest
  Корпоративная заявка на покупку
end note

class PrivateRequest {
    +phone_number:  string [required]
    +age: int [required]
}
note bottom of PrivateRequest
  Частная заявка на покупку
end note

class QuestionRequest {
    +priority: int [required]
}
note top of QuestionRequest
  Обращение с вопросом
end note

Request <|-- CorporateRequest 
Request <|-- PrivateRequest 
Request <|-- QuestionRequest

@enduml

Разобравшись со схемами полезной нагрузки, далее напишем Python-скрипт для автогенерации спецификации AsyncAPI.

Автогенерация спецификации AsyncAPI с библиотекой FastStream

Чтобы избежать конфликта зависимостей, при разработке на Python на локальном компьютере надо использовать виртуальные среды или работать в интерактивной веб-среде типа Google Colab. При работе в локальном редакторе VSCode надо сперва создать виртуальную среду и установить в ней библиотеки. Как создать виртуальную среду, я показывала здесь. Установим библиотеки в терминале уже созданной и настроенной виртуальной среды:

pip install faststream[rabbbit] 
pip install faststream[cli] 
pip install asyncio
Установка библиотек
Установка библиотек

При работе в Google Colab помимо faststream[rabbbit] надо установить библиотеку nest_asyncio, которая позволяет повторно использовать существующий цикл асинхронных событий asyncio внутри уже запущенного:

!pip install faststream[rabbbit] 
!pip install nest_asyncio

Чтобы сохранить список всех библиотек, установленных в текущей виртуальной среде и их версий, можно запустить в терминале команду

pip freeze > requirements.txt

Она создать файл requirements.txt, куда будут записаны все установленные через менеджер пакетов библиотеки. Это позволит потом воссоздать среду исполнения в другом окружении, например, на другом сервере.

Для моего примера код файла topology.py с описанием сообщений, т.е. структуры данных полезной нагрузки и метаданных, а также топологии самого потокового конвейера с использованием FastStream выглядит так:

#импорт модулей
import asyncio
import logging
import json
import random
import time
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import Union, Any, Dict, Sequence, Literal
from typing import List
from faststream import FastStream
from faststream.rabbit import ExchangeType, RabbitBroker, RabbitExchange, RabbitQueue

# Конструктор RabbitBroker
broker = RabbitBroker("amqps://user:password@host:1111/vhost")

#Определение обменников
exch_1 = RabbitExchange("InputsFanoutExchange", type=ExchangeType.FANOUT)
exch_2 = RabbitExchange("AppsTopicExchange", type=ExchangeType.TOPIC, bind_to=exch_1, routing_key="app.#")
exch_3 = RabbitExchange("QuestionsHeadersExchange", type=ExchangeType.HEADERS,bind_arguments={"subject": "question", "theme": "delivery", "x-match": "all"})

#Определение очередей
queue_1 = RabbitQueue("InputsQueue")
queue_2 = RabbitQueue("CorpAppsQueue", routing_key="app.company.*")
queue_3 = RabbitQueue("IndAppsQueue", routing_key="app")
queue_4 = RabbitQueue("DeliveryQueue", bind_arguments={"subject": "question", "theme": "delivery", "x-match": "all"})
queue_5 = RabbitQueue("PaymentQueue", bind_arguments={"subject": "question", "theme": "payment", "x-match": "all"})
queue_6 = RabbitQueue("OtherQuestionsQueue", bind_arguments={"subject": "question", "theme": "vip", "x-match": "all"})

# Инициализация FastStream приложения
app = FastStream(broker)

########### Определение классов полезной нагрузки сообщений#############
# Базовый класс для запросов
@dataclass
class RequestData:
    moment: datetime
    name: str
    subject: str
    content: str

# Класс для корпоративных запросов
@dataclass
class CorporateRequest(RequestData):
    inn: str

# Класс для частных запросов
@dataclass
class PrivateRequest(RequestData):
    phone_number: str
    age: int

# Класс для заявок с темой "question"
@dataclass
class QuestionRequest(RequestData):
    priority: int

# Варианты ключей маршрутизации
corp = random.choice([1, 0])

if corp == 1:
    name = 'company.name'
    routing_keys = [f'app.company.{name}', 'question']
else:
    routing_keys = ['app', 'question']

# Случайный выбор одного из ключей маршрутизации
subject = random.choice(routing_keys)

# Варианты заголовков маршрутизации
questions = ['payment', 'delivery', 'discount', 'vip', 'staff']
theme = random.choice(questions)

# Варианты схем публикуемых сообщений
RequestSchema = Union[CorporateRequest, PrivateRequest, QuestionRequest]

# Публикация сообщений в exch_1
@broker.publisher(exchange=exch_1, routing_key=subject, headers={'subject': subject, 'theme':theme})
async def publish_request(request: RequestSchema):
    logging.info(f"Публикация запроса: {asdict(request)}")
    return request

# Подписка на очередь queue_1
@broker.subscriber(queue=queue_1, exchange=exch_1)
async def handle_inputs_queue(request: RequestSchema):
    logging.info(f"Получен запрос из InputsQueue: {asdict(request)}")

# Подписка на очереди, связанные с exch_2
@broker.subscriber(queue=queue_2, exchange=exch_2)
async def handle_corp_apps_queue(request: CorporateRequest):
    logging.info(f"Получен корпоративный запрос из CorpAppsQueue: {asdict(request)}")

@broker.subscriber(queue=queue_3, exchange=exch_2)
async def handle_ind_apps_queue(request: PrivateRequest):
    logging.info(f"Получен частный запрос из IndAppsQueue: {asdict(request)}")

# Подписка на очереди, связанные с exch_3
@broker.subscriber(queue=queue_4, exchange=exch_3, consume_args={"subject": "question", "theme": "delivery", "x-match": "all"})
async def handle_delivery_queue(request: QuestionRequest):
    logging.info(f"Получен запрос по доставке из DeliveryQueue: {asdict(request)}")

@broker.subscriber(queue=queue_5, exchange=exch_3, consume_args={"subject": "question", "theme": "payment", "x-match": "all"})
async def handle_payment_queue(request: QuestionRequest):
    logging.info(f"Получен платежный запрос из PaymentQueue: {asdict(request)}")

@broker.subscriber(queue=queue_6, exchange=exch_3, consume_args={"subject": "question", "theme": {theme}, "x-match": "all"})
async def handle_other_questions_queue(request: QuestionRequest):
    logging.info(f"Получен VIP запрос из OtherQuestionsQueue: {asdict(request)}")

Чтобы сгенерировать из этого кода спецификацию AsyncAPI, надо выполнить в терминале команду FastStream с указанием имени файла и потокового приложения. В моем примере это выглядит так:

faststream docs gen topology:app

Эта команда запускает генератор документации FastStream, который анализирует топологию RabbitMQ, описанную в приложении, т.е. объекте app в py-файле topology, и создает структурированную документацию по очередям, обменникам, маршрутам и обработчикам сообщений.

Генерация спецификации AsyncAPI
Генерация спецификации AsyncAPI

При отсутствии ошибок JSON-файл сгенерированной спецификации AsyncAPI можно просмотреть сразу в VSCode с помощью расширения asyncapi-preview, которое надо установить заранее.

Просмотр сгенерированной спецификации AsyncAPI в VSCode с расширением asyncapi-preview
Просмотр сгенерированной спецификации AsyncAPI в VSCode с расширением asyncapi-preview

При работе в Google Colab надо скопировать содержимое сгенерированного JSON-файла и просмотреть в AsyncAPI Studio или редакторе Swagger.

Просмотр сгенерированной спецификации AsyncAPI в онлайн-редакторе Swagger
Просмотр сгенерированной спецификации AsyncAPI в онлайн-редакторе Swagger

Как обычно, примеры кода доступны в моем Github:

Надеюсь, что этот небольшой пример поможет вам познакомиться с библиотекой FastStream и ее возможностями по автогенерации спецификации AsyncAPI для брокера сообщений RabbitMQ.

Подробнее изучить все рассмотренные темы, а также другие технологии разработки архитектуры и интеграции информационных систем вы сможете на моих курсах Школы прикладного бизнес-анализа и проектирования информационных систем в нашем лицензированном учебном центре обучения и повышения квалификации системных и бизнес-аналитиков в Москве:

 

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.