Как описать потоковый конвейер с асинхронным API: пример спецификации AsyncAPI для RabbitMQ

спецификация AsyncAPI пример, RabbitMQ примеры курсы обучение, обучение аналитиков и разработчиков RabbitMQ, RabbitMQ AsyncAPI

Что такое AsyncAPI, как эта спецификация позволяет описать серверы и операции асинхронного обмена сообщениями с учетом специфики протокола. Практический пример проектирования потокового конвейера на RabbitMQ и спецификации AsyncAPI 3.0.

Что такое AsyncAPI

В прошлой статье про масштабирование потокового конвейера на RabbitMQ я упоминала AsyncAPI — спецификацию описания асинхронных API. Она аналогична OpenAPI (Swagger), но предназначена для проектирования асинхронных архитектур. Подобно OpenAPI, спецификация AsyncAPI тоже представляет собой YAML-файл, который описывает операции публикации и потребления данных, а также их заголовки и полезную нагрузку.

OpenAPI 3 и AsyncAPI 2
Сравнение структуры OpenAPI 3 и AsyncAPI 2

AsyncAPI реализует подход Spec First, описывая, какие сообщения приложения-продюсеры отправляют на сервер асинхронного обмена данными и как приложения-потребители могут считать эти данные, используя асинхронные протоколы, например, Kafka, AMQP, MQTT, WebSocket и пр.

Для передачи сообщений сервер, т.е. брокер сообщений, использует канал, куда приложение-продюсер публикует сообщение. Приложение-потребитель подписывается на канал, чтобы потреблять оттуда опубликованные продюсером сообщения. Клиенты, т.е. продюсеры и потребители сообщений используют протоколы, поддерживаемые сервером. Протокол – это набор правил, которые определяют способы обмена информацией между клиентами и серверами. Особенности протокола описываются в привязках (binding), специфичных для каждой технологии. В спецификации AsyncAPI привязки позволяют детализировать, как именно сообщения должны быть переданы по сети с учетом конкретного протокола. Например, для RabbitMQ используется протокол AMQP, привязки для которого включают очереди, обменники, обязательность доставки сообщений, параметры маршрутизации и пр. Как и в спецификации OpenAPI, в AsyncAPI также описывается схема полезной нагрузки и заголовков сообщений, передаваемых от продюсера к потребителю через каналы.

Асинхронная интеграция через брокер сообщений
Асинхронная интеграция через брокер сообщений

Постановка задачи: потоковый конвейер на RabbitMQ

В качестве примера рассмотрим задачу приема клиентских обращений в интернет-магазин. Предположим, в магазин могут поступить заявки на покупку продуктов от физических или юридических лиц, а также вопросы по оплате, доставке, работе с vip-клиентами и скидкам. Все эти обращения обрабатывают разные приложения-потребители. Используя возможности гибкой маршрутизации RabbitMQ, потоковый конвейер можно спроектировать следующим образом. Все обращения публикуются на обменник типа Fanout, к которому привязаны 2 обменника: типа Topic и типа Headers. Обменник типа Topic маршрутизирует только заявки на покупку продуктов в 2 очереди: очередь корпоративных заявок и очередь индивидуальных заявок. Обменник типа Headers маршрутизирует вопросы по темам в 3 разные очереди: очередь вопросов по оплате, очередь вопросов по доставке и очередь всех остальных вопросов.

Топология потокового конвейера на RabbitMQ
Топология потокового конвейера на RabbitMQ

В табличной форме топология этого конвейера будет выглядеть так:

Обменник Очередь
Название Тип Назначение Название Назначение Привязка
InputsFanoutExchange Fanout Распараллеливание входящих сообщений на обменники
AppsTopicExchange Topic Маршрутизация по шаблону ключа corp_apps_queue Прием корпоративных заявок от юрлиц routing key =app.company.*
ind_apps_queue Прием частных заявок от физлиц Routing key = app
QuestionsHeadersExchange Headers Маршрутизация по заголовкам сообщения delivery_questions_queue Вопосы по доставке subject:       question

theme:        delivery

x-match:     all

payment_questions_queue Вопросы по оплате subject:       question

theme:        payment

x-match:     all

other_questions_queue Все остальные вопросы (по VIP-статусу, персоналу, скидкам) subject:       question

theme:        vip

theme:        discount

theme:        staff

x-match:     all

Пример спецификации AsyncAPI для RabbitMQ

Поскольку AsyncAPI является стандартизованной спецификацией, она пишется по определенным правилам. В спецификации 3-ей версии могут присутствовать следующие разделы:

  • версия стандарта AsyncAPI (asyncapi);
  • общая информация(info), где указывается название (title), версия самой спецификации (version) и краткое описание этого документа (description);
  • серверы (servers) с указанием хоста (host), краткого описания (description), протокола (protocol) и тегов (tags) для каждого;
  • каналы (channels) для обмена сообщениями с адресом (address), названием (title), кратким описанием (description), сообщениями (messages) и привязками (bindings) с их специфическими параметрами. Например, для RabbitMQ это указание протокола amqp, параметры очереди или обменника
  • операции (operations) отправки и получения сообщений с использованием выбранного протокола, включая его специфические параметры. Например, для RabbitMQ это ключи маршрутизации, параметры доставки, автоматического подтверждения потребителя, приоритет.
  • компоненты (components) с описанием схем данных полезной нагрузки сообщений (schemas) и их заголовков (messageTraits).

В моей спецификации я опишу 3 сервера (для разработки, для тестирования и для производства), а также каналы для публикации и потребления сообщений. YAML-файл для вышеописанной постановки задачи выглядит так:

asyncapi: '3.0.0'
info:
  title: Прием клиентских обращений через RabbitMQ
  version: '1.0.0'
  description: AsyncAPI спецификация для приема клиентских обращений через RabbitMQ

servers:
 development:
  host: my_rmq_dev_host:my_port
  description: контур разработчика - брокер RabbitMQ. Порт 5672 или 5671 с TLS/SSL-шифрованием 
  protocol: amqp
  protocolVersion: 0-9-1
  tags:
    - name: "env:development"
      description: Среда для разработки и отладки
 staging:
  host: my_rmq_stage_host:my_port
  description: тестовый контур - брокер RabbitMQ. Порт 5672 или 5671 с TLS/SSL-шифрованием
  protocol: amqp
  protocolVersion: 0-9-1
  tags:
    - name: "env:staging"
      description: Тестовая среда для тестирования
 production:
  host: my_rmq_prod_host:my_port
  description: производственный контур - брокер RabbitMQ. Порт 5672 или 5671 с TLS/SSL-шифрованием
  protocol: amqp
  protocolVersion: 0-9-1
  tags:
    - name: "env:production"
      description: Производственная среда для промышленного использования


channels:
  publish_app:
    address: input_exchange
    title: Публикация обращений
    description: канал для публикации обращений от клиентов (заявок на покупку и вопросов)
    messages:
      question:
        $ref: '#/components/messages/question'
      ind_app:
        $ref: '#/components/messages/ind_app'
      corp_app:
        $ref: '#/components/messages/corp_app'        
    servers:
      - $ref: '#/servers/development'
      - $ref: '#/servers/staging'
      - $ref: '#/servers/production'      
    bindings:
      amqp:
        is: routingKey
        exchange:
          name: InputsFanoutExchange
          type: fanout
          durable: true
          autoDelete: false
          vhost: my_rmq_vhost
          description: входной обменник типа Fanout 
        bindingVersion: 0.3.0
  consume_ind_apps:
    address: ind_apps_queue
    description: очередь индивидуальных заявок
    messages:
      ind_app:
        $ref: '#/components/messages/ind_app'    
    servers:
      - $ref: '#/servers/development'
      - $ref: '#/servers/staging'
      - $ref: '#/servers/production'      
    bindings:            
      amqp:
        is: queue
        queue:
          name: ind_apps_queue
          durable: true
          exclusive: false
          autoDelete: false
          vhost: my_rmq_vhost
        bindingVersion: 0.3.0
  
  consume_corp_apps:
    address: corp_apps_queue
    description: очередь корпоративных заявок
    messages:
      corp_app:
        $ref: '#/components/messages/corp_app'        
    servers:
      - $ref: '#/servers/development'
      - $ref: '#/servers/staging'
      - $ref: '#/servers/production'  
    bindings:            
      amqp:
        is: queue
        queue:
          name: corp_apps_queue
          durable: true
          exclusive: false
          autoDelete: false
          vhost: my_rmq_vhost
        bindingVersion: 0.3.0        
  
  consume_payment_questions:
    address: payment_questions_queue
    description: очередь вопросов по оплате
    messages:
      payment_question:
        $ref: '#/components/messages/payment_question'
        traits:
        - $ref: '#/components/messageTraits/payment_question_header'    
    servers:
      - $ref: '#/servers/development'
      - $ref: '#/servers/staging'
      - $ref: '#/servers/production'      
    bindings:            
      amqp:
        is: queue
        queue:
          name: payment_questions_queue
          durable: true
          exclusive: false
          autoDelete: false
          vhost: my_rmq_vhost
        bindingVersion: 0.3.0

  consume_delivery_questions:
    address: delivery_questions_queue
    description: очередь вопросов по доставке
    messages:
      payment_question:
        $ref: '#/components/messages/delivery_question'
        traits:
        - $ref: '#/components/messageTraits/delivery_question_header'    
    servers:
      - $ref: '#/servers/development'
      - $ref: '#/servers/staging'
      - $ref: '#/servers/production'      
    bindings:            
      amqp:
        is: queue
        queue:
          name: delivery_questions_queue
          durable: true
          exclusive: false
          autoDelete: false
          vhost: my_rmq_vhost
        bindingVersion: 0.3.0

  consume_other_questions:
    address: other_questions_queue
    description: очередь всех остальных вопросов
    messages:
      payment_question:
        $ref: '#/components/messages/other_question'
        traits:
        - $ref: '#/components/messageTraits/other_question_header'    
    servers:
      - $ref: '#/servers/development'
      - $ref: '#/servers/staging'
      - $ref: '#/servers/production'      
    bindings:            
      amqp:
        is: queue
        queue:
          name: other_questions_queue
          durable: true
          exclusive: false
          autoDelete: false
          vhost: my_rmq_vhost
        bindingVersion: 0.3.0


operations:
  publish_apps:
    channel: 
      $ref: '#/channels/publish_app'
    action: send  
    bindings:
      amqp:
        cc: ['app', 'app.company.{company_name}', 'question']
        deliveryMode: 2
        mandatory: true
        timestamp: true
        ack: false
        bindingVersion: 0.3.0
  
  consume_ind_apps:
    channel: 
      $ref: '#/channels/consume_ind_apps'
    action: receive  
    bindings:
      amqp:
        cc: ['app']
        priority: 2
        deliveryMode: 2
        mandatory: true
        timestamp: true
        ack: false
        bindingVersion: 0.3.0
  consume_corp_apps:
    channel: 
      $ref: '#/channels/consume_corp_apps'
    action: receive  
    bindings:
      amqp:
        cc: ['app.company.{company_name}']
        priority: 1
        deliveryMode: 2
        mandatory: true
        timestamp: true
        ack: false
        bindingVersion: 0.3.0        
  
  consume_payment_questions:
    channel: 
      $ref: '#/channels/consume_payment_questions'
    action: receive  
    bindings:
      amqp:
        cc: ['question']
        priority: 3
        deliveryMode: 2
        mandatory: true
        timestamp: true
        ack: false
        bindingVersion: 0.3.0  

  consume_delivery_questions:
    channel: 
      $ref: '#/channels/consume_delivery_questions'
    action: receive  
    bindings:
      amqp:
        cc: ['question']
        priority: 3
        deliveryMode: 2
        mandatory: true
        timestamp: true
        ack: false
        bindingVersion: 0.3.0  

  consume_other_questions:
    channel: 
      $ref: '#/channels/consume_other_questions'
    action: receive  
    bindings:
      amqp:
        cc: ['question']
        priority: 3
        deliveryMode: 2
        mandatory: true
        timestamp: true
        ack: false
        bindingVersion: 0.3.0  

components:
  messages:
    question:
      name: question
      title: вопрос
      description: вопросы клиентов магазина
      contentType: application/json
      traits:
        - $ref: '#/components/messageTraits/question_header'       
      payload:
        $ref: '#/components/schemas/question'

    payment_question:
      name: payment_question
      title: вопрос по оплате
      description: вопросы клиентов по оплате
      contentType: application/json
      traits:
        - $ref: '#/components/messageTraits/payment_question_header'       
      payload:
        $ref: '#/components/schemas/payment_question'

    delivery_question:
      name: delivery_question
      title: вопрос по доставке
      description: вопросы клиентов по доставке
      contentType: application/json
      traits:
        - $ref: '#/components/messageTraits/delivery_question_header'       
      payload:
        $ref: '#/components/schemas/delivery_question'        

    other_question:
      name: other_question
      title: другие вопросы
      description:  все остальные вопросы клиентов (по вип-обслуживанию, скидкам, работе с персоналом)
      contentType: application/json
      traits:
        - $ref: '#/components/messageTraits/other_question_header'       
      payload:
        $ref: '#/components/schemas/other_question'  

    ind_app:
      name: ind_app
      title: заявки на покупку от физлиц
      description: индивидуальные заявки (от физлиц)
      contentType: application/json
      traits:
        - $ref: '#/components/messageTraits/ind_app_header'
      payload:
        $ref: '#/components/schemas/ind_app'

    corp_app:
      name: corp_app
      title: заявки на покупку от юрлиц
      summary: корпоративные заявки (от юрлиц)
      contentType: application/json
      traits:
        - $ref: '#/components/messageTraits/corp_app_header'
      payload:
        $ref: '#/components/schemas/corp_app'

  schemas:
    question:
      type: object
      description: вопрос по работе магазина
      properties:
        id:
          type: string
          description: идентификатор обращения
          example: q-x-xx-xx-xxxx
        name:
          type: string
          description: имя отправителя          
        content:
          type: string
          description: содержимое вопроса             
        sentAt:
          $ref: '#/components/schemas/sentAt'

    payment_question:
      type: object
      description: вопрос по оплате
      properties:
        id:
          type: string
          description: идентификатор обращения
          example: q-x-xx-xx-xxxx
        name:
          type: string
          description: имя отправителя          
        content:
          type: string
          description: содержимое вопроса             
        sentAt:
          $ref: '#/components/schemas/sentAt'

    delivery_question:
      type: object
      description: вопрос по доставке
      properties:
        id:
          type: string
          description: идентификатор обращения
          example: q-x-xx-xx-xxxx
        name:
          type: string
          description: имя отправителя          
        content:
          type: string
          description: содержимое вопроса             
        sentAt:
          $ref: '#/components/schemas/sentAt'

    other_question:
      type: object
      description: другой вопрос
      properties:
        id:
          type: string
          description: идентификатор обращения
          example: q-x-xx-xx-xxxx
        name:
          type: string
          description: имя отправителя          
        content:
          type: string
          description: содержимое вопроса             
        sentAt:
          $ref: '#/components/schemas/sentAt'

    ind_app:
      type: object
      description: индивидуальная заявка на покупку продукта
      properties:
        id:
          type: string
          description: идентификатор индивидуальной заявки
          example: ia-x-xx-xxx-xxx
        client_name:
          type: string
          description: имя клиента
          example: Василь Василич          
        content:
          $ref: '#/components/schemas/appContent'
        sentAt:
          $ref: '#/components/schemas/sentAt'          

    corp_app:
      type: object
      description: корпоративная заявка на покупку продукта
      properties:
        id:
          type: string
          description: идентификатор корпоративной заявки
          example: ca-x-xx-xxx-xxx
        company_name:
          type: string
          description: название компании
          example: ООО "Ромашка"         
        content:
          $ref: '#/components/schemas/appContent'
        sentAt:
          $ref: '#/components/schemas/sentAt'
    sentAt:
      type: string
      format: date-time
      description: Дата и время отправки сообщения

    appContent:
      type: array
      description: Перечень продуктов и их количество, которые хочет купить покупатель
      minItems: 1
      maxItems: 10
      items:
        type: object
        properties:
          product:
            type: string
            description: название продукта
            enum:
              - яблоки
              - сыр
              - рыба
              - чай
              - хлеб
              - кофе
              - печенье
          quantity:
            type: integer
            description: количество единиц продукта
            minimum: 1
            maximum: 10
            
  messageTraits:
    question_header:
      headers:
        type: object
        description: Заголовок сообщения-вопроса
        properties:
          subject:
            type: string
            enum:
                - question
            description: предмет обращения - вопрос    
          theme:
            type: string
            description: тип вопроса 
            enum:
                - payment
                - delivery
                - vip
                - discount
                - staff

    payment_question_header:
      headers:
        type: object
        description: Заголовок сообщения-вопроса по оплате
        properties:
          subject:
            type: string
            enum:
                - question
            description: предмет обращения - вопрос по оплате    
          theme:
            type: string
            description: вопрос по оплате
            enum:
                - payment     

    delivery_question_header:
      headers:
        type: object
        description: Заголовок сообщения-вопроса по доставке
        properties:
          subject:
            type: string
            enum:
                - question
            description: предмет обращения - вопрос по доставке    
          theme:
            type: string
            description: вопрос по доставке 
            enum:
                - delivery   

    other_question_header:
      headers:
        type: object
        description: Заголовок сообщения-вопроса
        properties:
          subject:
            type: string
            enum:
                - question
            description: предмет обращения - вопрос    
          theme:
            type: string
            description: тип вопроса 
            enum:
                - vip
                - discount
                - staff

    ind_app_header:
      headers:
        type: object
        description: Заголовок индивидуальной заявки на покупку (просто app)
        enum:
          - app
        properties:
          subject:
            type: string
            description: заголовок индивидуальной заявки (просто app)

    corp_app_header:
      headers:
        type: object
        description: Заголовок корпоративной заявки на покупку (app.company.{company_name})
        properties:
          subject:
            type: string
            description: заголовок корпоративной заявки на покупкe c названием компании
            enum:
              - app.company.{company_name}
            example: app.company.oooromashka  
            parameters:
            company_name:
              description: название компании
              location: $message.headers

Если в спецификации нет ошибок, редактор для разработки асинхронных спецификаций https://studio.asyncapi.com/, доступный без регистрации и смс, сразу отобразит внешний вид, похожий на Swagger UI.

Разработка спецификации AsyncAPI в AsyncAPIStudio
Разработка спецификации AsyncAPI в AsyncAPIStudio

Также эта спецификация доступна в моем Github по ссылке: https://github.com/AnnaVichugova/API_specifications/blob/main/AsyncAPI_RabbitMQ

Надеюсь, что этот пример поможет вам разобраться с особенностями описания асинхронных API и спецификой брокера сообщений RabbitMQ.

Подробнее познакомиться со всеми рассмотренными темами, а также другими основами архитектуры и интеграции информационных систем вы сможете на курсах Школы прикладного бизнес-анализа и проектирования информационных систем в нашем лицензированном учебном центре обучения и повышения квалификации системных и бизнес-аналитиков в Москве:

 

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Полезные ссылки

  1. https://www.asyncapi.com/docs/reference/specification/v3.0.0
  2. https://github.com/asyncapi/bindings/blob/master/amqp/README.md
  3. https://studio.asyncapi.com
  4. https://bigdataschool.ru/blog/asyncapi-specification-for-kafka-practical-example.html