Что такое AsyncAPI, как эта спецификация позволяет описать серверы и операции асинхронного обмена сообщениями с учетом специфики протокола. Практический пример проектирования потокового конвейера на RabbitMQ и спецификации AsyncAPI 3.0.
Что такое AsyncAPI
В прошлой статье про масштабирование потокового конвейера на RabbitMQ я упоминала AsyncAPI — спецификацию описания асинхронных API. Она аналогична OpenAPI (Swagger), но предназначена для проектирования асинхронных архитектур. Подобно OpenAPI, спецификация AsyncAPI тоже представляет собой YAML-файл, который описывает операции публикации и потребления данных, а также их заголовки и полезную нагрузку.

AsyncAPI реализует подход Spec First, описывая, какие сообщения приложения-продюсеры отправляют на сервер асинхронного обмена данными и как приложения-потребители могут считать эти данные, используя асинхронные протоколы, например, Kafka, AMQP, MQTT, WebSocket и пр.
Для передачи сообщений сервер, т.е. брокер сообщений, использует канал, куда приложение-продюсер публикует сообщение. Приложение-потребитель подписывается на канал, чтобы потреблять оттуда опубликованные продюсером сообщения. Клиенты, т.е. продюсеры и потребители сообщений используют протоколы, поддерживаемые сервером. Протокол – это набор правил, которые определяют способы обмена информацией между клиентами и серверами. Особенности протокола описываются в привязках (binding), специфичных для каждой технологии. В спецификации AsyncAPI привязки позволяют детализировать, как именно сообщения должны быть переданы по сети с учетом конкретного протокола. Например, для RabbitMQ используется протокол AMQP, привязки для которого включают очереди, обменники, обязательность доставки сообщений, параметры маршрутизации и пр. Как и в спецификации OpenAPI, в AsyncAPI также описывается схема полезной нагрузки и заголовков сообщений, передаваемых от продюсера к потребителю через каналы.

Постановка задачи: потоковый конвейер на RabbitMQ
В качестве примера рассмотрим задачу приема клиентских обращений в интернет-магазин. Предположим, в магазин могут поступить заявки на покупку продуктов от физических или юридических лиц, а также вопросы по оплате, доставке, работе с vip-клиентами и скидкам. Все эти обращения обрабатывают разные приложения-потребители. Используя возможности гибкой маршрутизации RabbitMQ, потоковый конвейер можно спроектировать следующим образом. Все обращения публикуются на обменник типа Fanout, к которому привязаны 2 обменника: типа Topic и типа Headers. Обменник типа Topic маршрутизирует только заявки на покупку продуктов в 2 очереди: очередь корпоративных заявок и очередь индивидуальных заявок. Обменник типа Headers маршрутизирует вопросы по темам в 3 разные очереди: очередь вопросов по оплате, очередь вопросов по доставке и очередь всех остальных вопросов.

В табличной форме топология этого конвейера будет выглядеть так:
| Обменник | Очередь | ||||
| Название | Тип | Назначение | Название | Назначение | Привязка |
| InputsFanoutExchange | Fanout | Распараллеливание входящих сообщений на обменники | |||
| AppsTopicExchange | Topic | Маршрутизация по шаблону ключа | corp_apps_queue | Прием корпоративных заявок от юрлиц | routing key =app.company.* |
| ind_apps_queue | Прием частных заявок от физлиц | Routing key = app | |||
| QuestionsHeadersExchange | Headers | Маршрутизация по заголовкам сообщения | delivery_questions_queue | Вопосы по доставке | subject: question
theme: delivery x-match: all |
| payment_questions_queue | Вопросы по оплате | subject: question
theme: payment x-match: all |
|||
| other_questions_queue | Все остальные вопросы (по VIP-статусу, персоналу, скидкам) | subject: question
theme: vip theme: discount theme: staff x-match: all |
|||
Пример спецификации AsyncAPI для RabbitMQ
Поскольку AsyncAPI является стандартизованной спецификацией, она пишется по определенным правилам. В спецификации 3-ей версии могут присутствовать следующие разделы:
- версия стандарта AsyncAPI (asyncapi);
- общая информация(info), где указывается название (title), версия самой спецификации (version) и краткое описание этого документа (description);
- серверы (servers) с указанием хоста (host), краткого описания (description), протокола (protocol) и тегов (tags) для каждого;
- каналы (channels) для обмена сообщениями с адресом (address), названием (title), кратким описанием (description), сообщениями (messages) и привязками (bindings) с их специфическими параметрами. Например, для RabbitMQ это указание протокола amqp, параметры очереди или обменника
- операции (operations) отправки и получения сообщений с использованием выбранного протокола, включая его специфические параметры. Например, для RabbitMQ это ключи маршрутизации, параметры доставки, автоматического подтверждения потребителя, приоритет.
- компоненты (components) с описанием схем данных полезной нагрузки сообщений (schemas) и их заголовков (messageTraits).
В моей спецификации я опишу 3 сервера (для разработки, для тестирования и для производства), а также каналы для публикации и потребления сообщений. YAML-файл для вышеописанной постановки задачи выглядит так:
asyncapi: '3.0.0'
info:
title: Прием клиентских обращений через RabbitMQ
version: '1.0.0'
description: AsyncAPI спецификация для приема клиентских обращений через RabbitMQ
servers:
development:
host: my_rmq_dev_host:my_port
description: контур разработчика - брокер RabbitMQ. Порт 5672 или 5671 с TLS/SSL-шифрованием
protocol: amqp
protocolVersion: 0-9-1
tags:
- name: "env:development"
description: Среда для разработки и отладки
staging:
host: my_rmq_stage_host:my_port
description: тестовый контур - брокер RabbitMQ. Порт 5672 или 5671 с TLS/SSL-шифрованием
protocol: amqp
protocolVersion: 0-9-1
tags:
- name: "env:staging"
description: Тестовая среда для тестирования
production:
host: my_rmq_prod_host:my_port
description: производственный контур - брокер RabbitMQ. Порт 5672 или 5671 с TLS/SSL-шифрованием
protocol: amqp
protocolVersion: 0-9-1
tags:
- name: "env:production"
description: Производственная среда для промышленного использования
channels:
publish_app:
address: input_exchange
title: Публикация обращений
description: канал для публикации обращений от клиентов (заявок на покупку и вопросов)
messages:
question:
$ref: '#/components/messages/question'
ind_app:
$ref: '#/components/messages/ind_app'
corp_app:
$ref: '#/components/messages/corp_app'
servers:
- $ref: '#/servers/development'
- $ref: '#/servers/staging'
- $ref: '#/servers/production'
bindings:
amqp:
is: routingKey
exchange:
name: InputsFanoutExchange
type: fanout
durable: true
autoDelete: false
vhost: my_rmq_vhost
description: входной обменник типа Fanout
bindingVersion: 0.3.0
consume_ind_apps:
address: ind_apps_queue
description: очередь индивидуальных заявок
messages:
ind_app:
$ref: '#/components/messages/ind_app'
servers:
- $ref: '#/servers/development'
- $ref: '#/servers/staging'
- $ref: '#/servers/production'
bindings:
amqp:
is: queue
queue:
name: ind_apps_queue
durable: true
exclusive: false
autoDelete: false
vhost: my_rmq_vhost
bindingVersion: 0.3.0
consume_corp_apps:
address: corp_apps_queue
description: очередь корпоративных заявок
messages:
corp_app:
$ref: '#/components/messages/corp_app'
servers:
- $ref: '#/servers/development'
- $ref: '#/servers/staging'
- $ref: '#/servers/production'
bindings:
amqp:
is: queue
queue:
name: corp_apps_queue
durable: true
exclusive: false
autoDelete: false
vhost: my_rmq_vhost
bindingVersion: 0.3.0
consume_payment_questions:
address: payment_questions_queue
description: очередь вопросов по оплате
messages:
payment_question:
$ref: '#/components/messages/payment_question'
traits:
- $ref: '#/components/messageTraits/payment_question_header'
servers:
- $ref: '#/servers/development'
- $ref: '#/servers/staging'
- $ref: '#/servers/production'
bindings:
amqp:
is: queue
queue:
name: payment_questions_queue
durable: true
exclusive: false
autoDelete: false
vhost: my_rmq_vhost
bindingVersion: 0.3.0
consume_delivery_questions:
address: delivery_questions_queue
description: очередь вопросов по доставке
messages:
payment_question:
$ref: '#/components/messages/delivery_question'
traits:
- $ref: '#/components/messageTraits/delivery_question_header'
servers:
- $ref: '#/servers/development'
- $ref: '#/servers/staging'
- $ref: '#/servers/production'
bindings:
amqp:
is: queue
queue:
name: delivery_questions_queue
durable: true
exclusive: false
autoDelete: false
vhost: my_rmq_vhost
bindingVersion: 0.3.0
consume_other_questions:
address: other_questions_queue
description: очередь всех остальных вопросов
messages:
payment_question:
$ref: '#/components/messages/other_question'
traits:
- $ref: '#/components/messageTraits/other_question_header'
servers:
- $ref: '#/servers/development'
- $ref: '#/servers/staging'
- $ref: '#/servers/production'
bindings:
amqp:
is: queue
queue:
name: other_questions_queue
durable: true
exclusive: false
autoDelete: false
vhost: my_rmq_vhost
bindingVersion: 0.3.0
operations:
publish_apps:
channel:
$ref: '#/channels/publish_app'
action: send
bindings:
amqp:
cc: ['app', 'app.company.{company_name}', 'question']
deliveryMode: 2
mandatory: true
timestamp: true
ack: false
bindingVersion: 0.3.0
consume_ind_apps:
channel:
$ref: '#/channels/consume_ind_apps'
action: receive
bindings:
amqp:
cc: ['app']
priority: 2
deliveryMode: 2
mandatory: true
timestamp: true
ack: false
bindingVersion: 0.3.0
consume_corp_apps:
channel:
$ref: '#/channels/consume_corp_apps'
action: receive
bindings:
amqp:
cc: ['app.company.{company_name}']
priority: 1
deliveryMode: 2
mandatory: true
timestamp: true
ack: false
bindingVersion: 0.3.0
consume_payment_questions:
channel:
$ref: '#/channels/consume_payment_questions'
action: receive
bindings:
amqp:
cc: ['question']
priority: 3
deliveryMode: 2
mandatory: true
timestamp: true
ack: false
bindingVersion: 0.3.0
consume_delivery_questions:
channel:
$ref: '#/channels/consume_delivery_questions'
action: receive
bindings:
amqp:
cc: ['question']
priority: 3
deliveryMode: 2
mandatory: true
timestamp: true
ack: false
bindingVersion: 0.3.0
consume_other_questions:
channel:
$ref: '#/channels/consume_other_questions'
action: receive
bindings:
amqp:
cc: ['question']
priority: 3
deliveryMode: 2
mandatory: true
timestamp: true
ack: false
bindingVersion: 0.3.0
components:
messages:
question:
name: question
title: вопрос
description: вопросы клиентов магазина
contentType: application/json
traits:
- $ref: '#/components/messageTraits/question_header'
payload:
$ref: '#/components/schemas/question'
payment_question:
name: payment_question
title: вопрос по оплате
description: вопросы клиентов по оплате
contentType: application/json
traits:
- $ref: '#/components/messageTraits/payment_question_header'
payload:
$ref: '#/components/schemas/payment_question'
delivery_question:
name: delivery_question
title: вопрос по доставке
description: вопросы клиентов по доставке
contentType: application/json
traits:
- $ref: '#/components/messageTraits/delivery_question_header'
payload:
$ref: '#/components/schemas/delivery_question'
other_question:
name: other_question
title: другие вопросы
description: все остальные вопросы клиентов (по вип-обслуживанию, скидкам, работе с персоналом)
contentType: application/json
traits:
- $ref: '#/components/messageTraits/other_question_header'
payload:
$ref: '#/components/schemas/other_question'
ind_app:
name: ind_app
title: заявки на покупку от физлиц
description: индивидуальные заявки (от физлиц)
contentType: application/json
traits:
- $ref: '#/components/messageTraits/ind_app_header'
payload:
$ref: '#/components/schemas/ind_app'
corp_app:
name: corp_app
title: заявки на покупку от юрлиц
summary: корпоративные заявки (от юрлиц)
contentType: application/json
traits:
- $ref: '#/components/messageTraits/corp_app_header'
payload:
$ref: '#/components/schemas/corp_app'
schemas:
question:
type: object
description: вопрос по работе магазина
properties:
id:
type: string
description: идентификатор обращения
example: q-x-xx-xx-xxxx
name:
type: string
description: имя отправителя
content:
type: string
description: содержимое вопроса
sentAt:
$ref: '#/components/schemas/sentAt'
payment_question:
type: object
description: вопрос по оплате
properties:
id:
type: string
description: идентификатор обращения
example: q-x-xx-xx-xxxx
name:
type: string
description: имя отправителя
content:
type: string
description: содержимое вопроса
sentAt:
$ref: '#/components/schemas/sentAt'
delivery_question:
type: object
description: вопрос по доставке
properties:
id:
type: string
description: идентификатор обращения
example: q-x-xx-xx-xxxx
name:
type: string
description: имя отправителя
content:
type: string
description: содержимое вопроса
sentAt:
$ref: '#/components/schemas/sentAt'
other_question:
type: object
description: другой вопрос
properties:
id:
type: string
description: идентификатор обращения
example: q-x-xx-xx-xxxx
name:
type: string
description: имя отправителя
content:
type: string
description: содержимое вопроса
sentAt:
$ref: '#/components/schemas/sentAt'
ind_app:
type: object
description: индивидуальная заявка на покупку продукта
properties:
id:
type: string
description: идентификатор индивидуальной заявки
example: ia-x-xx-xxx-xxx
client_name:
type: string
description: имя клиента
example: Василь Василич
content:
$ref: '#/components/schemas/appContent'
sentAt:
$ref: '#/components/schemas/sentAt'
corp_app:
type: object
description: корпоративная заявка на покупку продукта
properties:
id:
type: string
description: идентификатор корпоративной заявки
example: ca-x-xx-xxx-xxx
company_name:
type: string
description: название компании
example: ООО "Ромашка"
content:
$ref: '#/components/schemas/appContent'
sentAt:
$ref: '#/components/schemas/sentAt'
sentAt:
type: string
format: date-time
description: Дата и время отправки сообщения
appContent:
type: array
description: Перечень продуктов и их количество, которые хочет купить покупатель
minItems: 1
maxItems: 10
items:
type: object
properties:
product:
type: string
description: название продукта
enum:
- яблоки
- сыр
- рыба
- чай
- хлеб
- кофе
- печенье
quantity:
type: integer
description: количество единиц продукта
minimum: 1
maximum: 10
messageTraits:
question_header:
headers:
type: object
description: Заголовок сообщения-вопроса
properties:
subject:
type: string
enum:
- question
description: предмет обращения - вопрос
theme:
type: string
description: тип вопроса
enum:
- payment
- delivery
- vip
- discount
- staff
payment_question_header:
headers:
type: object
description: Заголовок сообщения-вопроса по оплате
properties:
subject:
type: string
enum:
- question
description: предмет обращения - вопрос по оплате
theme:
type: string
description: вопрос по оплате
enum:
- payment
delivery_question_header:
headers:
type: object
description: Заголовок сообщения-вопроса по доставке
properties:
subject:
type: string
enum:
- question
description: предмет обращения - вопрос по доставке
theme:
type: string
description: вопрос по доставке
enum:
- delivery
other_question_header:
headers:
type: object
description: Заголовок сообщения-вопроса
properties:
subject:
type: string
enum:
- question
description: предмет обращения - вопрос
theme:
type: string
description: тип вопроса
enum:
- vip
- discount
- staff
ind_app_header:
headers:
type: object
description: Заголовок индивидуальной заявки на покупку (просто app)
enum:
- app
properties:
subject:
type: string
description: заголовок индивидуальной заявки (просто app)
corp_app_header:
headers:
type: object
description: Заголовок корпоративной заявки на покупку (app.company.{company_name})
properties:
subject:
type: string
description: заголовок корпоративной заявки на покупкe c названием компании
enum:
- app.company.{company_name}
example: app.company.oooromashka
parameters:
company_name:
description: название компании
location: $message.headers
Если в спецификации нет ошибок, редактор для разработки асинхронных спецификаций https://studio.asyncapi.com/, доступный без регистрации и смс, сразу отобразит внешний вид, похожий на Swagger UI.

Также эта спецификация доступна в моем Github по ссылке: https://github.com/AnnaVichugova/API_specifications/blob/main/AsyncAPI_RabbitMQ
Надеюсь, что этот пример поможет вам разобраться с особенностями описания асинхронных API и спецификой брокера сообщений RabbitMQ. А как получить подобную спецификацию автоматически, сгенерировав ее с помощью библиотеки FastStream, читайте в моей новой статье.
Подробнее все эти и другие связанные темы по проектированию информационных систем разбираются на моих курсах Школы прикладного бизнес-анализа и проектирования информационных систем в нашем лицензированном учебном центре обучения и повышения квалификации системных и бизнес-аналитиков в Москве:
- Проектирование потокового конвейера на RabbitMQ с разработкой спецификации AsyncAPI
- Основы архитектуры и интеграции информационных систем
Полезные ссылки


