Под капотом Apache Kafka: пишем продюсер и консумер к своему топику

интеграция информационных систем простыми словами для начинающих примеры курсы обучение, интеграция Apache Kafka примеры курсы обучение, основы архитектуры и интеграции информационных систем для бизнес-аналитика, архитектура информационных систем основы введение, Apache Kafka для аналитика краткий ликбез, обучение системных и бизнес-аналитиков, курсы системного и бизнес-анализа, Школа прикладного бизнес-анализа Учебный Центр Коммерсант

Сегодня я на практическом примере покажу, как именно реализуются принципы работы Apache Kafka при интеграции информационных систем. Для этого напишем и запустим в Google Colab 2 небольших Python-приложения. Продюсер будет публиковать сообщения в топик экземпляра Kafka, развернутого на облачной платформе upstash.com, а приложение-потребитель будет считывать и обрабатывать эти данные.

Постановка задачи и создание кластера Apache Kafka в облачной платформе Upstash

О том, что такое Apache Kafka и как эта распределенная платформа потоковой передачи событий используется для асинхронного общения между несколькими сервисами в проектах интеграции информационных систем, я писала здесь.

В качестве демонстрационного стенда возьмем облачную платформу Upstash (upstash.com), где можно на бесплатном плане развернуть собственный кластер Apache Kafka и подключиться к нему с помощью собственных клиентов через TCP и REST API через HTTP. Разумеется, бесплатный тариф имеет ряд ограничений: не более 10 000 команд в день и общий объем данных 256 МБ, а также отсутствие шифрования в REST API. Впрочем, этих возможностей более чем достаточно для нашего демонстрационного примера, чтобы познакомиться с этой технологией.

Для демонстрации возьмем пример из моего недавнего материала про JMS-брокер RabbitMQ. Приложением-продюсером, которое создает данные и отправляет их в Kafka, будет умный датчик с 2-мя видами измерений: температура и давление. Эти данные и обрабатывает считывает одно приложение-потребитель.

Kafka интеграция. интеграция информационных систем, архитектура потоковая передача событий, архитектура ИС для аналитика пример
Интеграция приложений с помощью Apache Kafka

Данные от умного датчика, т.е. полезная нагрузка сообщения от приложения-продюсера, отправляются в JSON-формате по следующей схеме:

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "type": "object",
  "properties": {
    " device_id": {
      "type": "integer"
    },
    "measure": {
      "type": "string"
    },
    "value": {
      "type": "integer"
    }
  },
  "required": [
    " device_id",
    "measure",
    "value"
  ]
}

Определившись с решаемой задачей, приступим к технологиям ее реализации. Сперва создадим собственный кластер в облачном сервисе Upstash, выбрав бесплатный план. Также создадим топик Kafka, куда будут публиковаться сообщения от продюсера.

serverless free Upstah Kafka, бессерверная платформа Apache Kafka, облачный сервис Kafka
Создание кластера и топика Apache Kafka в Upstash

Подготовив инфраструктуру, можно перейти к созданию своих приложений: продюсера и потребителя. Как обычно, я буду писать код на Python и запускать его в Google Colab –интерактивной облачной среде. Как она работает, я ранее рассказывала здесь и здесь.

Основы архитектуры и интеграции информационных систем

Код курса
OAIS
Ближайшая дата курса
20 мая, 2024
Продолжительность
12 ак.часов
Стоимость обучения
27 000 руб.

Пишем приложение-продюсер

Благодаря отличной документации и визуализации возможностей платформы Upstash в ее пользовательском интерфейсе писать код вручную почти не нужно. Заготовки для продюсера и консумера с уже предзаполненными учетными данными для подключения к своему облачному инстансу Kafka можно взять на вкладке Details созданного кластера. А учетные данные нужно скопировать с вкладки Credentials.

serverless free Upstah Kafka, бессерверная платформа Apache Kafka, облачный сервис Kafka, Python consumer producer Kafka
Подключение Python-продюсера и консумера к кластеру Kafka на платформе Upstah

Теперь напишем Python-код своего приложения-продюсера, который будет каждые 3 секунды отправлять данные по температуре и давлению в Kafka. Причем измерения по давления будут отправляться в раздел 1, а измерения температуры – в раздел 2 одного и того же топика. Эти данные (полезная нагрузка) будут в формате JSON с кодировкой utf-8.

!pip install kafka-python 
import json
import random
import time

from kafka import KafkaProducer
# Connect to Kafka server
producer = KafkaProducer(
  bootstrap_servers=['URL-адрес_вашего_инстанса_Kafka:порт_обычно_9092'],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username='имя_вашего пользователя',
  sasl_plain_password='пароль_вашего_пользователя',
  value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Prepare a list of devices
devices =[random.randint(0, 100) for i in range(100)]

# Prepare a list of possible routing keys
measures = ['pressure', 'temperature']

while True:
    # Prepare random message in JSON format
    measure=random.choice(measures)
    data = {'device': random.choice(devices), 'measure': measure, 'value': random.randint(0,100)}
    message = json.dumps(data)
    if measure=='pressure':
     partition_key = 1
    else:
     partition_key = 2

    print(f' [x] partition {partition_key}')  
    print(f' [x] Message {message}')
    
    # produce json messages
    future = producer.send('anna_demo_topic', value=data, partition=partition_key)

    record_metadata = future.get(timeout=60)
    print(f' [x] Sent {record_metadata}')

    # Sleep for 3 seconds
    time.sleep(3)

# close producer
producer.close()

В Google Colab этот код лучше распределить по 3-м ячейкам:

  • в 1-ой ячейке будет установка и импорт Python-библиотек для работы с Kafka (kafka-python), а также форматом JSON, операторами random и времени;
  • во 2-ой ячейке код подключения к Kafka и публикации сообщений;
  • в 3-ей ячейке закрытие активного подключения и продюсера.
Kafka продюсер на Python в Google Colab
Python-код приложения продюсера в Google Colab для публикации сообщений в топик Kafka на платформе Upstash

Для публикации сообщений последовательно запустим первые 2 ячейки и увидим, как дополнительно к полезной нагрузке, т.е. исходному JSON-сообщению с данными о номере устройства, измерении (температура или давление) и значении этого измерения, продюсер добавляет в сообщение название топика, раздел, позицию смещения, отметку времени и другие служебные метаданные.

Kafka продюсер на Python в Google Colab
Публикация сообщений в топик Kafka

Далее напишем код приложения-потребителя и посмотрим, как все работает.

Основы архитектуры и интеграции информационных систем

Код курса
OAIS
Ближайшая дата курса
20 мая, 2024
Продолжительность
12 ак.часов
Стоимость обучения
27 000 руб.

Пишем потребитель

Пусть наше приложение-потребитель не просто потребляет данные из разделов топика Kafka под названием anna_demo_topic, но и обрабатывает принятые значения измерений, маркируя состояние устройства отметкой OK или ошибкой ERROR. Для этого придется в цикле приема сообщений выполнить парсинг полученной полезной нагрузки и написать проверку значений этого JSON.

!pip install kafka-python 
import json
import random
import time

from kafka import KafkaConsumer
import json
from json import loads
 
consumer = KafkaConsumer(
  bootstrap_servers=['URL-адрес_вашего_инстанса_Kafka:порт(обычно_9092)'],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username='имя_вашего пользователя',
  sasl_plain_password='пароль_вашего_пользователя',
  group_id='$group',
  auto_offset_reset='earliest'
)

consumer.subscribe(['anna_demo_topic'])
for message in consumer:
  print (message)
  payload=message.value.decode("utf-8")
  data=json.loads(payload)

  device_state=''
  device_measure=''
  if (data['measure']== 'pressure'):
    device_measure='pressure'
    if ((data['value']<= 50) and (data['value'] >= 10)):
      device_state='OK'
    else:
      device_state='ERROR'
  else:
    device_measure='temperature'
    if ((data['value']<= 80) and (data['value'] >= 20)):
      device_state='OK'
    else:
      device_state='ERROR'
  print("Устройство № ", data["device"], device_measure, device_state, "значение ", data['value'])

#unsubscribe and close  consumer
consumer.unsubscribe()
consumer.close()

Аналогично продюсеру, разделим этот код на 3 ячейки в Google Colab, сохранив в отдельном блокноте. Чтобы потребление сообщений работало, следует параллельно запустить их публикацию. Поэтому будет 2 разных ipynb-файла. Посмотрим, как работает потребление сообщений, выполнив первые 2 ячейки вышеприведенного Python-кода.

Kafka потребитель на Python в Google Colab
Потребление и обработка сообщений из топика Kafka

Посмотреть, как меняются данные об опубликованные и потребленных сообщениях, можно в GUI-платформы Upstash.

бессерверный сервис Kafka бесплатно облако Upstash
Статистика по опубликованным и потребленным сообщениям в GUI Upstash

Как этот код модифицирован для записи данных об ошибках измерений в резидентную NoSQL-СУБД Redis, читайте в моей новой статье.

В заключение посмотрим, как получить сведения о своем Kafka-кластере через REST API этой облачной платформы. Следует простой найти нужный метод в документации и отправить запрос в Postman или другом инструменте тестирования REST API. Например, GET-запрос к конечной точке /consumers с базовой аутентификацией по логину и паролю позволяет узнать про потребителей в группе потребителей, название инстанса в кластере Kafka, топик и количество разделов в нем. О том, что такое конечная точка, а также про HTTP-запросы я писала здесь и здесь.

Postman REST API Kafka примеры курсы обучение
Обращение к своему инстансу Kafka через REST API платформы Upstash в Postman

В заключение отмечу, что далеко не на каждом проекте аналитику нужно знать все эти подробности, с которыми взаимодействует архитектор, разработчик и администратор Kafka. Однако, именно такая демонстрация, проделанная собственноручно с помощью открытых инструментов позволяет понять основные принципы этой замечательной технологии, чтобы использовать ее в качестве средства интеграции информационных систем.

Основы архитектуры и интеграции информационных систем

Код курса
OAIS
Ближайшая дата курса
20 мая, 2024
Продолжительность
12 ак.часов
Стоимость обучения
27 000 руб.

Тест по Apache Kafka и RabbitMQ для аналитика

Как это выполнить все это на практике, а также познакомиться с другими важными аспектами архитектуры и интеграции информационных систем, вы узнаете на курсах Школы прикладного бизнес-анализа в нашем лицензированном учебном центре обучения и повышения квалификации системных и бизнес-аналитиков в Москве:

 

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Добавить комментарий

Поиск по сайту

Напишите нам, мы онлайн!