Автоматическая генерация спецификации асинхронного обмена сообщениями AsyncAPI для RabbitMQ с помощью Python-библиотеки FastStream в VSCode и в Google Colab.
Что такое FastStream и при чем здесь AsyncAPI
Я уже несколько раз рассказывала про AsyncAPI — спецификацию описания асинхронных API, которая похожа на OpenAPI (Swagger), но для асинхронных архитектур. А здесь показывала пример разработки такой спецификации для брокера сообщений RabbitMQ. Однако, на практике вручную писать большой YAML- или JSON-файл с операциями публикации и потребления данных, а также их заголовками и полезной нагрузкой сообщений, довольно утомительно. К счастью, эту работу можно автоматизировать, получив готовую спецификацию в виде JSON-файл, сгенерированный с помощью Python-библиотеки FastStream.
FastStream — это простой, но довольно мощный Python-фреймворк для разработки асинхронных потоковых приложений, которые обрабатывают сообщения из различных популярных брокеров: RabbitMQ, Kafka, NATS и др. Подобно тому как FastAPI и Flask позволяют сгенерировать спецификацию OpenAPI из кода веб-приложения с REST-архитектурой, FastStream создает документацию AsyncAPI с помощью Python-декораторов. А встроенная валидация данных и аннотирование их типов обеспечивается использованием библиотеки Pydantic, которая автоматически проверяет соответствие данных заявленным типам при создании схем данных. Асинхронность и высокая скорость работы FastStream реализуется благодаря библиотеке асинхронного программирования asyncio и компилятора Cython, который преобразует Python-код в C-код для последующей компиляции. Все функции-обработчики сообщений в FastStream объявляются с помощью асинхронного объявления async def, что позволяет обрабатывать множество сообщений параллельно, без блокировки. Поэтому FastStream может эффективно управлять большим количеством одновременных соединений и задач, не блокируя поток исполнения.
Познакомившись с FastStream, далее рассмотрим, как использовать эту библиотеку для автогенерации спецификации AsyncAPI для RabbitMQ.
В качестве примера, как обычно, возьмем интернет магазин, куда приложение-продюсер публикует клиентские обращения, которые могут быть заявками на покупку продуктов от физических или юридических лиц, а также вопросы по оплате, доставке, работе с vip-клиентами и скидкам. Все эти обращения обрабатывают разные приложения-обработчики. Топология потокового конвейера на RabbitMQ для этого примера выглядит следующим образом:
- все обращения публикуются на обменник типа Fanout, который реплицирует входящие сообщения на 2 обменника: Topic и Headers;
- обменник типа Topic маршрутизирует только заявки на покупку продуктов в 2 очереди: очередь корпоративных заявок и очередь индивидуальных заявок;
- обменник типа Headers маршрутизирует вопросы по темам в 3 разные очереди: очередь вопросов по оплате, очередь вопросов по доставке и очередь всех остальных вопросов.
Определение и визуализация структуры данных
Для визуализации структур данных и самих спецификаций в среде разработки нужно установить соответствующие плагины: PlantUML и AsyncAPI Preview. О них я недавно рассказывала здесь. Создадим текстовый файл для отображения схем полезной нагрузки сообщений, публикуемых в RabbitMQ. Предположим, приложение-продюсер публикует в брокер следующие сообщения:
- Request – базовая структура данных для всех типов обращений, т.е. вопросов и заявок от корпоративных и частных клиентов на покупку товаров. В этой структуре данные есть следующие обязательные поля:
- moment — момент времени обращения с типом данных datetime;
- name — имя заявителя (строка);
- subject — тема обращения (строка);
- content — содержание обращения (строка);
- CorporateRequest – корпоративная заявка на покупку, которая наследует все поля класса Request и дополнительно содержит обязательное строковое поле inn для хранения ИНН организации;
- PrivateRequest – заявка на покупку от частного лица, которая наследует все поля класса Request и дополнительно содержит обязательное строковое поле phone_number с номером телефона клиента и целочисленное поле для хранения его возраста age;
- QuestionRequest – обращение с вопросом, которое наследует все поля класса Request и дополнительно содержит обязательное целочисленное поле priority для обозначения приоритета обращения.
Скрипт PlantUML:
@startuml class Request { +moment : datetime [required] +name : string [required] +subject : string [required] +content : string [required] } note top of Request Базовый класс обращения end note class CorporateRequest { +inn : string [required] } note top of CorporateRequest Корпоративная заявка на покупку end note class PrivateRequest { +phone_number: string [required] +age: int [required] } note bottom of PrivateRequest Частная заявка на покупку end note class QuestionRequest { +priority: int [required] } note top of QuestionRequest Обращение с вопросом end note Request <|-- CorporateRequest Request <|-- PrivateRequest Request <|-- QuestionRequest @enduml
Разобравшись со схемами полезной нагрузки, далее напишем Python-скрипт для автогенерации спецификации AsyncAPI.
Автогенерация спецификации AsyncAPI с библиотекой FastStream
Чтобы избежать конфликта зависимостей, при разработке на Python на локальном компьютере надо использовать виртуальные среды или работать в интерактивной веб-среде типа Google Colab. При работе в локальном редакторе VSCode надо сперва создать виртуальную среду и установить в ней библиотеки. Как создать виртуальную среду, я показывала здесь. Установим библиотеки в терминале уже созданной и настроенной виртуальной среды:
pip install faststream[rabbbit] pip install faststream[cli] pip install asyncio
При работе в Google Colab помимо faststream[rabbbit] надо установить библиотеку nest_asyncio, которая позволяет повторно использовать существующий цикл асинхронных событий asyncio внутри уже запущенного:
!pip install faststream[rabbbit] !pip install nest_asyncio
Чтобы сохранить список всех библиотек, установленных в текущей виртуальной среде и их версий, можно запустить в терминале команду
pip freeze > requirements.txt
Она создать файл requirements.txt, куда будут записаны все установленные через менеджер пакетов библиотеки. Это позволит потом воссоздать среду исполнения в другом окружении, например, на другом сервере.
Для моего примера код файла topology.py с описанием сообщений, т.е. структуры данных полезной нагрузки и метаданных, а также топологии самого потокового конвейера с использованием FastStream выглядит так:
#импорт модулей import asyncio import logging import json import random import time from datetime import datetime from dataclasses import dataclass, asdict from typing import Union, Any, Dict, Sequence, Literal from typing import List from faststream import FastStream from faststream.rabbit import ExchangeType, RabbitBroker, RabbitExchange, RabbitQueue # Конструктор RabbitBroker broker = RabbitBroker("amqps://user:password@host:1111/vhost") #Определение обменников exch_1 = RabbitExchange("InputsFanoutExchange", type=ExchangeType.FANOUT) exch_2 = RabbitExchange("AppsTopicExchange", type=ExchangeType.TOPIC, bind_to=exch_1, routing_key="app.#") exch_3 = RabbitExchange("QuestionsHeadersExchange", type=ExchangeType.HEADERS,bind_arguments={"subject": "question", "theme": "delivery", "x-match": "all"}) #Определение очередей queue_1 = RabbitQueue("InputsQueue") queue_2 = RabbitQueue("CorpAppsQueue", routing_key="app.company.*") queue_3 = RabbitQueue("IndAppsQueue", routing_key="app") queue_4 = RabbitQueue("DeliveryQueue", bind_arguments={"subject": "question", "theme": "delivery", "x-match": "all"}) queue_5 = RabbitQueue("PaymentQueue", bind_arguments={"subject": "question", "theme": "payment", "x-match": "all"}) queue_6 = RabbitQueue("OtherQuestionsQueue", bind_arguments={"subject": "question", "theme": "vip", "x-match": "all"}) # Инициализация FastStream приложения app = FastStream(broker) ########### Определение классов полезной нагрузки сообщений############# # Базовый класс для запросов @dataclass class RequestData: moment: datetime name: str subject: str content: str # Класс для корпоративных запросов @dataclass class CorporateRequest(RequestData): inn: str # Класс для частных запросов @dataclass class PrivateRequest(RequestData): phone_number: str age: int # Класс для заявок с темой "question" @dataclass class QuestionRequest(RequestData): priority: int # Варианты ключей маршрутизации corp = random.choice([1, 0]) if corp == 1: name = 'company.name' routing_keys = [f'app.company.{name}', 'question'] else: routing_keys = ['app', 'question'] # Случайный выбор одного из ключей маршрутизации subject = random.choice(routing_keys) # Варианты заголовков маршрутизации questions = ['payment', 'delivery', 'discount', 'vip', 'staff'] theme = random.choice(questions) # Варианты схем публикуемых сообщений RequestSchema = Union[CorporateRequest, PrivateRequest, QuestionRequest] # Публикация сообщений в exch_1 @broker.publisher(exchange=exch_1, routing_key=subject, headers={'subject': subject, 'theme':theme}) async def publish_request(request: RequestSchema): logging.info(f"Публикация запроса: {asdict(request)}") return request # Подписка на очередь queue_1 @broker.subscriber(queue=queue_1, exchange=exch_1) async def handle_inputs_queue(request: RequestSchema): logging.info(f"Получен запрос из InputsQueue: {asdict(request)}") # Подписка на очереди, связанные с exch_2 @broker.subscriber(queue=queue_2, exchange=exch_2) async def handle_corp_apps_queue(request: CorporateRequest): logging.info(f"Получен корпоративный запрос из CorpAppsQueue: {asdict(request)}") @broker.subscriber(queue=queue_3, exchange=exch_2) async def handle_ind_apps_queue(request: PrivateRequest): logging.info(f"Получен частный запрос из IndAppsQueue: {asdict(request)}") # Подписка на очереди, связанные с exch_3 @broker.subscriber(queue=queue_4, exchange=exch_3, consume_args={"subject": "question", "theme": "delivery", "x-match": "all"}) async def handle_delivery_queue(request: QuestionRequest): logging.info(f"Получен запрос по доставке из DeliveryQueue: {asdict(request)}") @broker.subscriber(queue=queue_5, exchange=exch_3, consume_args={"subject": "question", "theme": "payment", "x-match": "all"}) async def handle_payment_queue(request: QuestionRequest): logging.info(f"Получен платежный запрос из PaymentQueue: {asdict(request)}") @broker.subscriber(queue=queue_6, exchange=exch_3, consume_args={"subject": "question", "theme": {theme}, "x-match": "all"}) async def handle_other_questions_queue(request: QuestionRequest): logging.info(f"Получен VIP запрос из OtherQuestionsQueue: {asdict(request)}")
Чтобы сгенерировать из этого кода спецификацию AsyncAPI, надо выполнить в терминале команду FastStream с указанием имени файла и потокового приложения. В моем примере это выглядит так:
faststream docs gen topology:app
Эта команда запускает генератор документации FastStream, который анализирует топологию RabbitMQ, описанную в приложении, т.е. объекте app в py-файле topology, и создает структурированную документацию по очередям, обменникам, маршрутам и обработчикам сообщений.
При отсутствии ошибок JSON-файл сгенерированной спецификации AsyncAPI можно просмотреть сразу в VSCode с помощью расширения asyncapi-preview, которое надо установить заранее.
При работе в Google Colab надо скопировать содержимое сгенерированного JSON-файла и просмотреть в AsyncAPI Studio или редакторе Swagger.
Как обычно, примеры кода доступны в моем Github:
- для VSCode https://github.com/AnnaVichugova/API_specifications/tree/main/RabbitMQ
- для Google Colab https://github.com/AnnaVichugova/API_specifications/blob/main/autogenerate_FastStream_RabbitMQ
Надеюсь, что этот небольшой пример поможет вам познакомиться с библиотекой FastStream и ее возможностями по автогенерации спецификации AsyncAPI для брокера сообщений RabbitMQ.
Подробнее изучить все рассмотренные темы, а также другие технологии разработки архитектуры и интеграции информационных систем вы сможете на моих курсах Школы прикладного бизнес-анализа и проектирования информационных систем в нашем лицензированном учебном центре обучения и повышения квалификации системных и бизнес-аналитиков в Москве:
- Проектирование потокового конвейера на RabbitMQ с разработкой спецификации AsyncAPI
- Основы архитектуры и интеграции информационных систем