Что такое AsyncAPI, как эта спецификация позволяет описать серверы и операции асинхронного обмена сообщениями с учетом специфики протокола. Практический пример проектирования потокового конвейера на RabbitMQ и спецификации AsyncAPI 3.0.
Что такое AsyncAPI
В прошлой статье про масштабирование потокового конвейера на RabbitMQ я упоминала AsyncAPI — спецификацию описания асинхронных API. Она аналогична OpenAPI (Swagger), но предназначена для проектирования асинхронных архитектур. Подобно OpenAPI, спецификация AsyncAPI тоже представляет собой YAML-файл, который описывает операции публикации и потребления данных, а также их заголовки и полезную нагрузку.
AsyncAPI реализует подход Spec First, описывая, какие сообщения приложения-продюсеры отправляют на сервер асинхронного обмена данными и как приложения-потребители могут считать эти данные, используя асинхронные протоколы, например, Kafka, AMQP, MQTT, WebSocket и пр.
Для передачи сообщений сервер, т.е. брокер сообщений, использует канал, куда приложение-продюсер публикует сообщение. Приложение-потребитель подписывается на канал, чтобы потреблять оттуда опубликованные продюсером сообщения. Клиенты, т.е. продюсеры и потребители сообщений используют протоколы, поддерживаемые сервером. Протокол – это набор правил, которые определяют способы обмена информацией между клиентами и серверами. Особенности протокола описываются в привязках (binding), специфичных для каждой технологии. В спецификации AsyncAPI привязки позволяют детализировать, как именно сообщения должны быть переданы по сети с учетом конкретного протокола. Например, для RabbitMQ используется протокол AMQP, привязки для которого включают очереди, обменники, обязательность доставки сообщений, параметры маршрутизации и пр. Как и в спецификации OpenAPI, в AsyncAPI также описывается схема полезной нагрузки и заголовков сообщений, передаваемых от продюсера к потребителю через каналы.
Постановка задачи: потоковый конвейер на RabbitMQ
В качестве примера рассмотрим задачу приема клиентских обращений в интернет-магазин. Предположим, в магазин могут поступить заявки на покупку продуктов от физических или юридических лиц, а также вопросы по оплате, доставке, работе с vip-клиентами и скидкам. Все эти обращения обрабатывают разные приложения-потребители. Используя возможности гибкой маршрутизации RabbitMQ, потоковый конвейер можно спроектировать следующим образом. Все обращения публикуются на обменник типа Fanout, к которому привязаны 2 обменника: типа Topic и типа Headers. Обменник типа Topic маршрутизирует только заявки на покупку продуктов в 2 очереди: очередь корпоративных заявок и очередь индивидуальных заявок. Обменник типа Headers маршрутизирует вопросы по темам в 3 разные очереди: очередь вопросов по оплате, очередь вопросов по доставке и очередь всех остальных вопросов.
В табличной форме топология этого конвейера будет выглядеть так:
Обменник | Очередь | ||||
Название | Тип | Назначение | Название | Назначение | Привязка |
InputsFanoutExchange | Fanout | Распараллеливание входящих сообщений на обменники | |||
AppsTopicExchange | Topic | Маршрутизация по шаблону ключа | corp_apps_queue | Прием корпоративных заявок от юрлиц | routing key =app.company.* |
ind_apps_queue | Прием частных заявок от физлиц | Routing key = app | |||
QuestionsHeadersExchange | Headers | Маршрутизация по заголовкам сообщения | delivery_questions_queue | Вопосы по доставке | subject: question
theme: delivery x-match: all |
payment_questions_queue | Вопросы по оплате | subject: question
theme: payment x-match: all |
|||
other_questions_queue | Все остальные вопросы (по VIP-статусу, персоналу, скидкам) | subject: question
theme: vip theme: discount theme: staff x-match: all |
Пример спецификации AsyncAPI для RabbitMQ
Поскольку AsyncAPI является стандартизованной спецификацией, она пишется по определенным правилам. В спецификации 3-ей версии могут присутствовать следующие разделы:
- версия стандарта AsyncAPI (asyncapi);
- общая информация(info), где указывается название (title), версия самой спецификации (version) и краткое описание этого документа (description);
- серверы (servers) с указанием хоста (host), краткого описания (description), протокола (protocol) и тегов (tags) для каждого;
- каналы (channels) для обмена сообщениями с адресом (address), названием (title), кратким описанием (description), сообщениями (messages) и привязками (bindings) с их специфическими параметрами. Например, для RabbitMQ это указание протокола amqp, параметры очереди или обменника
- операции (operations) отправки и получения сообщений с использованием выбранного протокола, включая его специфические параметры. Например, для RabbitMQ это ключи маршрутизации, параметры доставки, автоматического подтверждения потребителя, приоритет.
- компоненты (components) с описанием схем данных полезной нагрузки сообщений (schemas) и их заголовков (messageTraits).
В моей спецификации я опишу 3 сервера (для разработки, для тестирования и для производства), а также каналы для публикации и потребления сообщений. YAML-файл для вышеописанной постановки задачи выглядит так:
asyncapi: '3.0.0' info: title: Прием клиентских обращений через RabbitMQ version: '1.0.0' description: AsyncAPI спецификация для приема клиентских обращений через RabbitMQ servers: development: host: my_rmq_dev_host:my_port description: контур разработчика - брокер RabbitMQ. Порт 5672 или 5671 с TLS/SSL-шифрованием protocol: amqp protocolVersion: 0-9-1 tags: - name: "env:development" description: Среда для разработки и отладки staging: host: my_rmq_stage_host:my_port description: тестовый контур - брокер RabbitMQ. Порт 5672 или 5671 с TLS/SSL-шифрованием protocol: amqp protocolVersion: 0-9-1 tags: - name: "env:staging" description: Тестовая среда для тестирования production: host: my_rmq_prod_host:my_port description: производственный контур - брокер RabbitMQ. Порт 5672 или 5671 с TLS/SSL-шифрованием protocol: amqp protocolVersion: 0-9-1 tags: - name: "env:production" description: Производственная среда для промышленного использования channels: publish_app: address: input_exchange title: Публикация обращений description: канал для публикации обращений от клиентов (заявок на покупку и вопросов) messages: question: $ref: '#/components/messages/question' ind_app: $ref: '#/components/messages/ind_app' corp_app: $ref: '#/components/messages/corp_app' servers: - $ref: '#/servers/development' - $ref: '#/servers/staging' - $ref: '#/servers/production' bindings: amqp: is: routingKey exchange: name: InputsFanoutExchange type: fanout durable: true autoDelete: false vhost: my_rmq_vhost description: входной обменник типа Fanout bindingVersion: 0.3.0 consume_ind_apps: address: ind_apps_queue description: очередь индивидуальных заявок messages: ind_app: $ref: '#/components/messages/ind_app' servers: - $ref: '#/servers/development' - $ref: '#/servers/staging' - $ref: '#/servers/production' bindings: amqp: is: queue queue: name: ind_apps_queue durable: true exclusive: false autoDelete: false vhost: my_rmq_vhost bindingVersion: 0.3.0 consume_corp_apps: address: corp_apps_queue description: очередь корпоративных заявок messages: corp_app: $ref: '#/components/messages/corp_app' servers: - $ref: '#/servers/development' - $ref: '#/servers/staging' - $ref: '#/servers/production' bindings: amqp: is: queue queue: name: corp_apps_queue durable: true exclusive: false autoDelete: false vhost: my_rmq_vhost bindingVersion: 0.3.0 consume_payment_questions: address: payment_questions_queue description: очередь вопросов по оплате messages: payment_question: $ref: '#/components/messages/payment_question' traits: - $ref: '#/components/messageTraits/payment_question_header' servers: - $ref: '#/servers/development' - $ref: '#/servers/staging' - $ref: '#/servers/production' bindings: amqp: is: queue queue: name: payment_questions_queue durable: true exclusive: false autoDelete: false vhost: my_rmq_vhost bindingVersion: 0.3.0 consume_delivery_questions: address: delivery_questions_queue description: очередь вопросов по доставке messages: payment_question: $ref: '#/components/messages/delivery_question' traits: - $ref: '#/components/messageTraits/delivery_question_header' servers: - $ref: '#/servers/development' - $ref: '#/servers/staging' - $ref: '#/servers/production' bindings: amqp: is: queue queue: name: delivery_questions_queue durable: true exclusive: false autoDelete: false vhost: my_rmq_vhost bindingVersion: 0.3.0 consume_other_questions: address: other_questions_queue description: очередь всех остальных вопросов messages: payment_question: $ref: '#/components/messages/other_question' traits: - $ref: '#/components/messageTraits/other_question_header' servers: - $ref: '#/servers/development' - $ref: '#/servers/staging' - $ref: '#/servers/production' bindings: amqp: is: queue queue: name: other_questions_queue durable: true exclusive: false autoDelete: false vhost: my_rmq_vhost bindingVersion: 0.3.0 operations: publish_apps: channel: $ref: '#/channels/publish_app' action: send bindings: amqp: cc: ['app', 'app.company.{company_name}', 'question'] deliveryMode: 2 mandatory: true timestamp: true ack: false bindingVersion: 0.3.0 consume_ind_apps: channel: $ref: '#/channels/consume_ind_apps' action: receive bindings: amqp: cc: ['app'] priority: 2 deliveryMode: 2 mandatory: true timestamp: true ack: false bindingVersion: 0.3.0 consume_corp_apps: channel: $ref: '#/channels/consume_corp_apps' action: receive bindings: amqp: cc: ['app.company.{company_name}'] priority: 1 deliveryMode: 2 mandatory: true timestamp: true ack: false bindingVersion: 0.3.0 consume_payment_questions: channel: $ref: '#/channels/consume_payment_questions' action: receive bindings: amqp: cc: ['question'] priority: 3 deliveryMode: 2 mandatory: true timestamp: true ack: false bindingVersion: 0.3.0 consume_delivery_questions: channel: $ref: '#/channels/consume_delivery_questions' action: receive bindings: amqp: cc: ['question'] priority: 3 deliveryMode: 2 mandatory: true timestamp: true ack: false bindingVersion: 0.3.0 consume_other_questions: channel: $ref: '#/channels/consume_other_questions' action: receive bindings: amqp: cc: ['question'] priority: 3 deliveryMode: 2 mandatory: true timestamp: true ack: false bindingVersion: 0.3.0 components: messages: question: name: question title: вопрос description: вопросы клиентов магазина contentType: application/json traits: - $ref: '#/components/messageTraits/question_header' payload: $ref: '#/components/schemas/question' payment_question: name: payment_question title: вопрос по оплате description: вопросы клиентов по оплате contentType: application/json traits: - $ref: '#/components/messageTraits/payment_question_header' payload: $ref: '#/components/schemas/payment_question' delivery_question: name: delivery_question title: вопрос по доставке description: вопросы клиентов по доставке contentType: application/json traits: - $ref: '#/components/messageTraits/delivery_question_header' payload: $ref: '#/components/schemas/delivery_question' other_question: name: other_question title: другие вопросы description: все остальные вопросы клиентов (по вип-обслуживанию, скидкам, работе с персоналом) contentType: application/json traits: - $ref: '#/components/messageTraits/other_question_header' payload: $ref: '#/components/schemas/other_question' ind_app: name: ind_app title: заявки на покупку от физлиц description: индивидуальные заявки (от физлиц) contentType: application/json traits: - $ref: '#/components/messageTraits/ind_app_header' payload: $ref: '#/components/schemas/ind_app' corp_app: name: corp_app title: заявки на покупку от юрлиц summary: корпоративные заявки (от юрлиц) contentType: application/json traits: - $ref: '#/components/messageTraits/corp_app_header' payload: $ref: '#/components/schemas/corp_app' schemas: question: type: object description: вопрос по работе магазина properties: id: type: string description: идентификатор обращения example: q-x-xx-xx-xxxx name: type: string description: имя отправителя content: type: string description: содержимое вопроса sentAt: $ref: '#/components/schemas/sentAt' payment_question: type: object description: вопрос по оплате properties: id: type: string description: идентификатор обращения example: q-x-xx-xx-xxxx name: type: string description: имя отправителя content: type: string description: содержимое вопроса sentAt: $ref: '#/components/schemas/sentAt' delivery_question: type: object description: вопрос по доставке properties: id: type: string description: идентификатор обращения example: q-x-xx-xx-xxxx name: type: string description: имя отправителя content: type: string description: содержимое вопроса sentAt: $ref: '#/components/schemas/sentAt' other_question: type: object description: другой вопрос properties: id: type: string description: идентификатор обращения example: q-x-xx-xx-xxxx name: type: string description: имя отправителя content: type: string description: содержимое вопроса sentAt: $ref: '#/components/schemas/sentAt' ind_app: type: object description: индивидуальная заявка на покупку продукта properties: id: type: string description: идентификатор индивидуальной заявки example: ia-x-xx-xxx-xxx client_name: type: string description: имя клиента example: Василь Василич content: $ref: '#/components/schemas/appContent' sentAt: $ref: '#/components/schemas/sentAt' corp_app: type: object description: корпоративная заявка на покупку продукта properties: id: type: string description: идентификатор корпоративной заявки example: ca-x-xx-xxx-xxx company_name: type: string description: название компании example: ООО "Ромашка" content: $ref: '#/components/schemas/appContent' sentAt: $ref: '#/components/schemas/sentAt' sentAt: type: string format: date-time description: Дата и время отправки сообщения appContent: type: array description: Перечень продуктов и их количество, которые хочет купить покупатель minItems: 1 maxItems: 10 items: type: object properties: product: type: string description: название продукта enum: - яблоки - сыр - рыба - чай - хлеб - кофе - печенье quantity: type: integer description: количество единиц продукта minimum: 1 maximum: 10 messageTraits: question_header: headers: type: object description: Заголовок сообщения-вопроса properties: subject: type: string enum: - question description: предмет обращения - вопрос theme: type: string description: тип вопроса enum: - payment - delivery - vip - discount - staff payment_question_header: headers: type: object description: Заголовок сообщения-вопроса по оплате properties: subject: type: string enum: - question description: предмет обращения - вопрос по оплате theme: type: string description: вопрос по оплате enum: - payment delivery_question_header: headers: type: object description: Заголовок сообщения-вопроса по доставке properties: subject: type: string enum: - question description: предмет обращения - вопрос по доставке theme: type: string description: вопрос по доставке enum: - delivery other_question_header: headers: type: object description: Заголовок сообщения-вопроса properties: subject: type: string enum: - question description: предмет обращения - вопрос theme: type: string description: тип вопроса enum: - vip - discount - staff ind_app_header: headers: type: object description: Заголовок индивидуальной заявки на покупку (просто app) enum: - app properties: subject: type: string description: заголовок индивидуальной заявки (просто app) corp_app_header: headers: type: object description: Заголовок корпоративной заявки на покупку (app.company.{company_name}) properties: subject: type: string description: заголовок корпоративной заявки на покупкe c названием компании enum: - app.company.{company_name} example: app.company.oooromashka parameters: company_name: description: название компании location: $message.headers
Если в спецификации нет ошибок, редактор для разработки асинхронных спецификаций https://studio.asyncapi.com/, доступный без регистрации и смс, сразу отобразит внешний вид, похожий на Swagger UI.
Также эта спецификация доступна в моем Github по ссылке: https://github.com/AnnaVichugova/API_specifications/blob/main/AsyncAPI_RabbitMQ
Надеюсь, что этот пример поможет вам разобраться с особенностями описания асинхронных API и спецификой брокера сообщений RabbitMQ.
Подробнее познакомиться со всеми рассмотренными темами, а также другими основами архитектуры и интеграции информационных систем вы сможете на курсах Школы прикладного бизнес-анализа и проектирования информационных систем в нашем лицензированном учебном центре обучения и повышения квалификации системных и бизнес-аналитиков в Москве:
Полезные ссылки