Habr

Аналитика событий на опасном производстве, или зачем «Цифровому рабочему» Kafka, Esper и Clickhouse

Привет, Хабр! Я Алексей Коняев. Последние пару лет участвую в развитии платформы «Цифровой рабочий» в роли ведущего java-разработчика.

Представьте, что вы приехали на экскурсию на завод. Там огромная территория, и вы вместе с гидом передвигаетесь на машине. Он рассказывает: «Посмотрите направо, здесь новое здание литейного цеха, а вот слева старое здание, которое скоро должны снести…» Как вдруг через минуту это старое здание взрывают! Гид, конечно, в шоке, да и вы тоже, но, к счастью, всё обошлось. Спрашивается, какого черта машина с экскурсантами оказалась в месте проведения взрывных работ?! И наш «Цифровой рабочий» на этот вопрос тоже не ответит, но он поможет вовремя предупредить всех заинтересованных лиц о том, что в геозоне, где сейчас проводятся опасные работы, появились посторонние в машине местного гида.

Если в двух словах, то система позволяет предупреждать опасные ситуации на производстве благодаря носимым устройствам Outdoor/Indoor-навигации  и  видеоаналитике. «Цифровой рабочий» может определять местоположение, физическое состояние или опасное поведение людей,  строить различную аналитику, в том числе realtime, и выполнять «разбор полётов», т.е. воспроизводить историю событий, чтобы можно было выяснить, что привело к нежелательной ситуации.

Дальше расскажу про архитектуру нашей системы, как мы используем Kafka, Esper и Clickhouse и на какие грабли уже наступили.

Вообще делать свой продукт – это очень интересно, особенно когда ощущаешь пользу от его применения на промышленных объектах. Там бывает очень опасно работать, но люди привыкли рисковать и, как говорят наши заказчики, иногда теряют чувство самосохранения.

Пример из жизни «Цифрового рабочего» на пилоте в нефтянке. Работник залез на стремянку, чтобы затянуть вентиль газовой трубы. Резьба вдруг сорвалась, он не удержался и, упав с высоты 5 метров, от удара потерял сознание. А вентиль при этом открылся, и газ начал поступать в помещение. Но датчики падения и удара, которые встроены в носимое устройство (у него был модуль на каске) передали сигналы на сервер. Там эти два сигнала были обработаны на платформе, и алгоритм выявления внештатных ситуаций сформировал событие-тревогу. Оно было передано оператору, а также другим сотрудникам, которые в тот момент были недалеко от пострадавшего и смогли быстро прийти на помощь.

Архитектура

Когда мы только начинали проектировать «Цифровой рабочий», решили пойти по пути Событийно-ориентированной архитектуры. Рассуждали так: всё начинается с носимых устройств, или, как мы их называем, «меток». Они, по сути, просто умеют передавать с определённой частотой какую-то телеметрию или, другими словами, информацию об изменении своего внутреннего состояния. И вот это изменение состояния метки – и есть входное для системы событие.

Далее, нам нужно уметь обрабатывать поток этих входных событий, причём это должен быть не просто последовательный конвейер, а параллельная обработка разными модулями, которые в процессе работы будут порождать новые события, обрабатываемые другими модулями, и т.д. В итоге какие-то события будут передаваться в UI, чтобы отрисовывать перемещение объектов на карте.

В качестве шины или слоя передачи событий выбрали Apache Kafka. На тот момент Kafka уже зарекомендовала себя как зрелый и надежный продукт (мы начинали с версии 2.0.0). Кроме того, выбирали только среди Open-source решений, чтобы удовлетворить требованиям импортозамещения. А с функциональной точки зрения в Kafkа нам понравилась возможность независимо подключать различных консьюмеров к одному и тому же топику, возможность прочитать события из топика ещё раз за период в прошлом, механизм стриминговой обработки Kafka Streams, ну и, конечно, масштабируемость благодаря партиционированию топиков.

Архитектура системы включает следующие компоненты:

  • Есть различные носимые устройства (метки) и системы позиционирования, которые уже существуют на рынке и которые умеют работать с теми или иными девайсами.

  • Для каждой такой системы позиционирования, с которой мы решили интегрироваться, у нас есть свой модуль Адаптер, который публикует в Kafka события с телеметрией от меток.

  • Далее эти события обрабатываются Транслятором, который выполняет первичную обработку (связывание метки с сотрудником, вычисление геозоны, в которой находится сейчас метка и др.).

  • Модуль Complex Event Processing-а (CEP-процессор) обрабатывает события, которые порождает Транслятор; здесь мы занимаемся выявлением внештатных ситуаций, анализируя различные типы событий, в том числе от разных меток.

  • В UI поступают события как от Транслятора (для отрисовки перемещения сотрудников), так и от CEP-процессора (отображение алертов).

  • Для хранения справочных данных, как то список меток, сотрудников, геозон и пр., используем реляционную БД – PostgreSQL.

  • А для хранения данных, по которым строится аналитика – ClickHouse.

  • Но в ClickHouse напрямую никто не ходит – для этого используется модуль Reports, который выполняет обработку запросов к аналитическим данным (запросы на обновление данных виджетов на аналитической панели в UI, запросы на формирование различных отчётов и др.).

  • И ещё есть файловое хранилище S3, где мы храним файлы 3D-моделей и файлы сформированных отчётов.

Ну а теперь давайте расскажу поподробнее про все модули системы, как они устроены внутри.

Интегрируемся со всеми: адаптеры

Основная задача Адаптера – взаимодействие с системой позиционирования через её API для того, чтобы получать информацию о координатах метки и значения встроенных в метку датчиков. Например, заряд аккумулятора, статус нажатия тревожной кнопки, статус датчика падения и неподвижности, температура, влажность, уровень CO2 и др.

Системы позиционирования – это не наша разработка, а уже существующие на рынке системы, которые, как правило, умеют взаимодействовать с одним каким-то конкретным девайсом. Мы же, подключая через адаптеры такие системы, получаем возможность использовать в «Цифровом рабочем» большой ассортимент разных меток.

Хотя для некоторых меток мы всё-таки запилили систему позиционирования сами, когда не удавалось найти существующего решения или хотелось поэкспериментировать. А один из экспериментов привел к тому, кто мы разработали свою метку с креплением на каску:

Ещё одна важная особенность адаптеров – это сложность с их масштабированием. Когда меток много, то важно успевать обрабатывать информацию от всех, и решение в лоб – поставить несколько адаптеров. Но сделать это не всегда просто, т.к. мы можем упереться в ограничения API системы позиционирования, например, когда адаптер с определённой периодичностью дергает рест-апи и получает статус сразу всех меток.

Поэтому основное требование к адаптеру – быть максимально производительным, т.е. получил данные в чужом формате, преобразовал к нашему внутреннему и отправил событие в Kafka. Эти события, которые порождает адаптер, мы называем «Сырые».

Сырые события бывают такие:

  • TagMovement – перемещение метки, содержит координаты;

  • TagInfo – телеметрия со значениями датчиков метки;

  • TagAlert – события нажатия тревожной кнопки, события падения и удара.

Разделяй и властвуй: топики Kafka

Отдельно хочу остановиться на топиках.

Для каждого типа события в «Цифровом рабочем» используется свой отдельный топик. Такой подход позволяет:

  1. Индивидуально настраивать retention для каждого топика. Так, например, для сырых событий ретеншн – всего 1 сутки, потому что эти события практически сразу будут обработаны, и хранить их долго не нужно (но 1 сутки всё-таки храним на случай сбоя).

  2. Использовать надстройку над Kafka, которую мы сделали, чтобы модули, которым нужно читать или писать из Kafka, могли просто подписываться на события заданного типа, или просто публиковать события, не задумываясь о топике, который определяется автоматически.

  3. Реализовать автоматическую сборку топологии Kafka Streams процессоров – если вкратце, то работает она так:

    3.1. в прикладном модуле нужно объявить процессоры, указав тип «входного» события и тип «выходного»;

    3.2. также можно указать, будет ли данный процессор использовать состояние;

    3.3. все процессоры – это Spring Bean-ы;

    3.4. при запуске приложения сборщик топологии находит все процессоры и собирает их в граф, «стыкуя» процессоры так, чтобы тип «выхода» одного подходил под тип «входа» другого.

Примерно так выглядит топология процессоров Транслятора, о котором речь пойдет ниже:

Шеф-повар: транслятор

Транслятор – это модуль, который готовит «сырые» события, поступающие от адаптеров. При этом то, какой конкретно адаптер был источником события, на этом этапе обработки уже не важно. Все «сырые» события одинаковые.

Это, кстати, позволило реализовать адаптер-заглушку, который мы используем для отладки и тестирования. С его помощью можно управлять перемещением сотрудников клавишами на клавиатуре, так как бегать каждый раз по офису с реальной меткой – не очень прикольно, хотя с точки зрения физкультуры и полезно.

Внутри Транслятора несколько процессоров на Kafka Streams (см. processor-api), причём многие из них используют состояние.

Kafka Streams предоставляет API для работы с состоянием как с Key-Value таблицей. Тут важно понимать, что для каждого ключа входного события процессора существует свое состояние. Ключ у каждого типа события свой. Например, для события перемещения метки это будет серийный номер метки. Это позволяет выполнять обработку очередного события от какой-то конкретной метки с учетом истории обработки событий от этой же метки.

Транслятор решает следующие задачи:

  • Вычисление географических координат. Дело в том, что координаты, которые приходят в сырых событиях, могут содержать не широту и долготу, а, например, смещение по оси X и Y в метрах относительно внутренней системы координат, системы позиционирования. И помня о том, что Адаптеры должны работать максимально быстро, приведением координат к абсолютным или географическим занимается Транслятор.

  • Связывание метки и сотрудника. Здесь процессор Транслятора обращается к реляционной БД за справочной информацией, чтобы определить, какой именно сотрудник сейчас носит эту метку.

  • Определение остановки сотрудника. Здесь мы смотрим, если новые координаты метки не сильно отличаются от предыдущих в течение некоторого таймаута, то значит, сотрудник остановился.

  • Обнаружение потери сигнала от метки. Тут используем такую штуку, как Punctuator – это механизм Kafka Streams, который позволяет с заданной периодичностью просматривать состояния процессора по всем ключам. И логика простая: если нашли состояние, в котором время последних полученных координат позже, чем максимально разрешенное, то значит, сигнала от метки не было давно, и следует считать сигнал потерянным.

  • Ещё есть процессоры, которые работают с геозонами. Суть в том, что вся территория и здания размечаются на геозоны. Например, «Корпус Альфа», который, в свою очередь, разделяется на геозоны этажей, а каждый этаж – на коридоры и кабинеты. И есть процессор, который определяет, что координаты метки попали внутрь той или иной геозоны, а другой процессор выполняет подсчёт количества сотрудников внутри геозон.

События, которые создаются в результате работы Транслятора, мы называем бизнес-события, потому что:

  • эти события уже представляют интерес для пользователей системы – события перемещения сотрудников передаются в UI для отрисовки их на карте; также в UI отображается телеметрия метки, которая уже ассоциирована с определённым сотрудником;

  • почти все бизнес-события сохраняются в аналитическую БД;

  • многие бизнес-события используются для выявления внештатных ситуаций;

  • и ещё эти события мы храним в Kafka долго (1 месяц) для того, чтобы иметь возможность воспроизвести их и посмотреть, что происходило в определённый интервал времени в прошлом.

Спасительный кэш при потоковой обработке

Когда мы начали проводить нагрузочное тестирование, то оказалось, что транслятор сильно тормозит, и связано это было с тем, что многие его процессоры при обработке каждого события ходили в БД за справочной информацией. Естественным решением проблемы было кэширование доступа к БД. Но кэш нужен был не совсем уж простой, т.к. информация в справочниках хоть и меняется редко, но всё равно это может произойти в процессе работы. Например, сотрудник потерял метку, и ему выдали другую.

И, кроме того, некоторые процессоры в результате работы могут обновлять какие-то данные в БД. Поэтому нужен был кэш, который будет:

  • обновляться с некоторой периодичностью;

  • периодически «сливать» изменения в БД.

Также нужно было иметь в виду, что Трансляторы мы точно будет запускать в несколько экземпляров, чтобы масштабировать на нагрузку. И при этом всем нодам Транслятора должен быть доступен одинаковый кэш. Т.е. запаздывание при чтении справочников из БД не страшны, но если в ходе обработки сообщений какой-то процессор меняет значение в кэше, то это новое значение должно быть сразу же доступно всем нодам Транслятора.

Мы сделали выбор в пользу Hazelcast-а, потому что:

  • Его достаточно легко использовать – это просто библиотека, которую вы подключаете в проект.

  • Кэши Hazelcast-а автоматически синхронизируются на всех нодах. Но тут нужно быть осторожным – если не ограничить «область видимости» кэша через задание имени группы (в конфиге экземпляра Hazelcast-а), то он может незаметно для вас реплицироваться в каком-нибудь ещё модуле, где вы тоже решили использовать Hazelcast. Т.е. в нашем случае, все кэши, которые нужны Трансляторам и только им, объединены в группу «translator».

После добавления кэширования производительность Транслятора выросла на несколько порядков! Цифры получились такие: одна нода, которой выделены ресурсы, эквивалентные инстансу t4g.micro в облаке Amazon EC2 (2CPU + 2Gb), обрабатывала без задержки до 500 входящих «сырых» событий в секунду. 500 кажется не много, но метки разных производителей передают данные с разной частотой – от 3 событий в минуту до 5 событий в секунду. И высокопроизводительные метки, которые дают большую точность, могут использоваться не на всей территории объекта. Таким образом, в худшем случае одна нода Транслятора выдерживает нагрузку от 100 меток, а в лучшем – от 10 тысяч.

Высоко сижу – далеко гляжу: отображение объектов на карте

UI состоит из двух частей – серверная часть и клиент.

Серверная часть подписывается на определённые бизнес-события, в первую очередь – события перемещения сотрудников. Эти события через WebSocket передаются на клиента, но предварительно выполняется:

  • фильтрация – каждому клиенту отправляются только те события, которые относятся к текущей выбранной оператором геозоне, т.е. если оператор выбрал, например, 7 этаж, то клиент будет получать события только по этому этажу;

  • редукция событий – события накапливаются в буфере и отправляются на клиента 1 раз в секунду.

Это нужно для того, чтобы снизить нагрузку на клиента.

Клиентская часть – это web-приложение на React-е. Основную рабочую область UI занимает карта с 3D-моделями зданий, которые можно посмотреть «в разрезе» – провалиться на любой этаж и увидеть, что там происходит. Для отрисовки 3D-моделей мы используем библиотеку CesiumJS.

Complex Event Processing

Термин «Complex Event Processing» (CEP) придумал профессор Стенфордского университета David Luckham. Вкратце определение звучит так: «Complex Event Processing – это обработка потока различных событий в реальном времени с целью выявления паттернов значимых событий». Ещё часто CEP сравнивают с ESP (Event Stream Processing). И здесь David Luckham выделяет следующие отличия между ними:

ESP – это обработка упорядоченного потока события одного типа, которая выполняется, как правило, с целью фильтрации, трансформации событий и/или выполнения над потоком событий каких-либо математических операций, в том числе, агрегация и группировка.

CEP – это обработка нескольких потоков различных событий, причём не обязательно упорядоченных (т.е. допускается нарушение порядка поступления событий в сравнении с порядком их возникновения), которая выполняется внутри некоторого «окна» (временного, например) с учетом причинно-следственной связи между различными событиями.

Например, обработка событий от термометра с целью управления кондиционером (температура стала ниже порога – включили обогрев, и наоборот) – это ESP. А вот обработка событий от термометра и одновременно от датчика освещённости в течение суток позволяет сделать вывод, что на улице зима.

Но в тоже время, в одной из своих статей David Luckham с коллегой, видя, что современные инструменты Event Stream Processing-а всё больше и больше приобретают возможности CEP-инструментов, делают вывод, что со временем разница между ними будет стерта (см. The Future of Event Stream Analytics and CEP).

CEP-процессор

Давайте перейдем от теории к практике!

Как вы уже поняли, в «Цифровом рабочем» именно модуль CEP-процессор выполняет сложную обработку событий, которыми в контексте нашей системы являются внештатные ситуации, такие как:

  • Вход сотрудника в геозону, имеющую в данный момент статус «опасная».

  • Пульс выше или ниже нормы.

  • Срабатывание двух из трех датчиков: падение/удар/неподвижность (заставлять реагировать оператора на единичный сигнал падения неправильно, т.к. метка просто могла упасть, но, если в течение небольшого промежутка времени пришло сразу несколько таких сигналов, то регистрируется внештатная ситуация).

В качестве CEP-движка мы используем Open-source библеотеку Esper, которую с 2006 года разрабатывает компания EsperTech.

Esper мы выбрали по следующим соображениям:

  • Описание паттернов сложных событий (или «правил») выполняется на языке Event Processing Language (EPL), который является расширением стандарта SQL-92; соответственно, правила описываются в декларативном виде, а нам очень хотелось, чтобы эти правила могли понимать не только программисты, но и, например, аналитики (хотя, если правило действительно сложное, то без подготовки его будет трудно понять).

  • Esper – мощный инструмент, и достаточно сложные паттерны можно описать в несколько строк на EPL.

  • Есть интеграция с Kafka, которая позволяет описывать правила, оперируя событиями, потребляемыми из Kafka, а результат работы правила также оформлять в виде события и публиковать в Kafka.

Пример реализации правила на Esper-е

Давайте рассмотрим небольшую задачу и её решение на Esper-е. В КРОКе есть свой ЦОД и газотурбинные генераторы, которые используются как резервный источник питания. Эти штуки нужно периодически запускать, но оставлять их включёнными без присмотра надолго нельзя. Предположим, что сотрудник, который их обслуживает, не всегда соблюдает регламент и может отлучиться на более продолжительное время, чем это разрешено.

Задача: если оборудование включено, и в помещении, где оно расположено, нет ни одного сотрудника больше 10 минут, то необходимо сформировать событие-тревогу.

Для отладки решения будем использовать Esper Notebook, в котором можно описать правило на EPL-е и сценарий с входными данными.

Чтобы Esper Notebook понял, что текст – это правило, в самом начале нужно написать ключевое слово %esperepl.

Далее, нужно определить схемы или таблицы, строки которых будут представлять входные события, в потоке которых мы будем искать интересующий нас паттерн:

@Description('Кол-во людей в геозоне')
create schema PersonsInZone(zoneId string, number int);

@Description('Статус устройства (вкл / выкл)')
create schema DeviceStatus(deviceId string, zoneId string, turnedOn bool);

Нас интересует, что происходит, когда оборудование включено, поэтому определим контекст, который «откроется» при включении оборудования, и «закроется» при его выключении. При этом, для каждой единицы оборудования будет свой собственный контекст:

create context TurnerOnDeviceContext
    partition by deviceId from DeviceStatus
    initiated by DeviceStatus(turnedOn = true)
    terminated by DeviceStatus(turnedOn = false);

Контекст – это своего рода «окно», в рамках которого мы будет наблюдать входящие события.

// для выражения можно задать имя, чтобы видеть его в логах
@Name('Unattended device')
// указывая контекст, мы говорим, чтобы данное выражение
// выполнялось только для событий из контекста
context TurnerOnDeviceContext
select
    ds.zoneId,
    ds.deviceId
from
    // в качестве «источника» событий задаем шаблон,
    // который словами можно сформулировать так:
    // «Устройство включили» → потом «Все ушли» → потом («Прошло 10 минут» И «Никто не вернулся»)
    pattern [
        ds = DeviceStatus(turnedOn = true)
        -> every (
            PersonsInZone(zoneId = ds.zoneId and number = 0)
            -> (timer:interval(10 minutes)
                    and not PersonsInZone(zoneId = ds.zoneId and number > 0)
               )
        )
    ];

Здесь ещё используется ключевое слово «every», которое нужно для того, чтобы следующая за ним часть шаблона продолжала «находиться» даже после того, как один раз целый шаблон уже был найден.

Например, если сотрудник включил генератор, потом ушел, и прошло больше 10 минут, – сработает правило, и мы пошлем нотификацию оператору, тот позвонит сотруднику, отругает его, и тот вернется. Но генератор сотрудник так и не выключит, поработает с ним ещё какое то время и, опять всё забыв, снова уйдет больше, чем на 10 минут.

Таким образом, мы получим ситуацию, когда часть шаблона «Все ушли» → ("Прошло 10 минут» И «Никто не вернулся») будет найден снова, в то время, как первая часть «Устройство включили» уже была найдена ранее. И, чтобы целый шаблон найти ещё раз, нужно написать «every» перед второй его чатью.

Теперь нам нужно протестировать наше правило. Для этого используется сценарий (начинается с ключевого слова %esperscenario), в котором можно «публиковать» входные события, увеличивая текущее время сценария:

%esperscenario
// задаем начальное время
t = «2020-12-10 12:00:00.000»

// публикуем событие «в помещение A вошёл 1 сотрудник»
PersonsInZone = {zoneId=«room-A», number=1}

// через 1 минуту включаем генератор 1 в помещении А
t = t.plus(1 minute)
DeviceStatus = {deviceId=«generator-1», zoneId=«room-A», turnedOn=true}

// через 4 часа сотрудник вышел из помещения А, не выключив генератор
t = t.plus(4 hours)
PersonsInZone = {zoneId=«room-A», number=0}

// прошло ещё 10 минут – и должно сработать наше правило!
t = t.plus(10 minute)

Если в правиле сработает какое-то выражение, то в блоке сценария мы должны увидеть его вывод. В нашем случае выражение – это select, а вывод будет такой:

Unattended device-output={ds.zoneId='room-A', ds.deviceId='generator-1'}

Вот ещё один пример сценария, посложнее:

%esperscenario
t = «2020-12-10 12:00:00.000»

// сотрудник вошёл в помещение А
PersonsInZone = {zoneId=«room-A», number=1}

// через 1 минуту включил генератор 1
t = t.plus(1 minute)
DeviceStatus = {deviceId=«generator-1», zoneId=«room-A», turnedOn=true}

// через 30 минут вышел, не выключив генератор
t = t.plus(30 minutes)
PersonsInZone = {zoneId=«room-A», number=0}

// но через 5 минут вернулся
t = t.plus(5 minute)
PersonsInZone = {zoneId=«room-A», number=1}

// прошло ещё 5 минут – тревоги не должно быть, сотрудник ведь вернулся
t = t.plus(5 minute)

// ещё через 3 часа ушёл, а генератор всё также остался включённым
t = t.plus(3 hour)
PersonsInZone = {zoneId=«room-A», number=0}

// через 10 минут – тревога!
t = t.plus(10 minute)

Этот же пример, но с полноценной интеграцией с Kafka, который можно собрать и запустить, доступен по ссылке digital-worker-architecture.

В двух словах, его отличие в том, что нужно:

  1. настроить движок Esper-а – подключить к нему плагины для взаимодействия с Kafka;

  2. зарегистрировать типы классов, которые будут представлять события в Kafka, и десериализатор для них.

И тогда при описании правил можно уже не создавать схемы событий, а использовать имена классов событий. А результат работы выражений (select-ов) «оборачивать» в создание выходных событий, которые публиковать в соответствующие топики Kafka.

Всех посчитали: регистрация внештатных ситуаций

CEP-процессор при срабатывании правил делает следующее:

  • Регистрирует внештатную ситуацию в журнале событий. Мы предполагаем, что события в этот журнал не должны попадать часто, но если уж попали, то требуется их явная обработка оператором (написать резолюцию, закрыть событие).

  • Формирует событие-нотификацию, которое, как и все остальные события в «Цифровом рабочем», публикуются в свой топик в Kafka. На этот топик подписан модуль Нотификаций, который выполняет их маршрутизацию. В результате нотификация передается в UI (оператор видит уведомление) или конкретному сотруднику через тот канал, который для него определён (например, на почту, в Telegram или на метку, которую носит сотрудник, если она поддерживает «обратный» канал).

Летим по приборам: аналитика

Аналитика доступна оператору в виде панели инструментов, на которую он может вынести интересующие его виджеты, и в виде механизма формирования отчётов.

Виджеты – это те же отчёты, у которых есть какие-то свои входные параметры, но результат выводится не в Excel или PDF-файл, а в виде графика или диаграммы. Еще виджеты обновляются раз в 15 секунд, что позволяет оператору видеть актуальные на текущий момент времени данные.

Также существуют «отчёты» со специализированным отображением информации непосредственно на карте (тепловая карта и маршруты):

Данные, по которым строится аналитика, хранятся в БД ClickHouse. Но прежде, чем попасть в ClickHouse, они проходят дополнительную обработку в модуле «Подготовки данных». А, чтобы достать эти данные из БД и передать потребителю (в данном случае, в UI), используется модуль «Формирования отчётов».

ClickHouse

К аналитической БД у нас были следующие требования:

  • нужно уметь хранить очень много данных. Наши данные – это события, у которых есть ключ и временная метка;

  • нужно быстро выполнять запросы с условием «С – ПО», т.е. за определённый период времени.

Когда мы выбирали БД, то понимали, что здесь точно нужна не реляционка, а, скорее, NoSql, которая умеет линейно масштабироваться и обеспечивает отказоустойчивость. На момент проектирования «Цифрового рабочего» у нас был опыт применения NoSql БД Cassandra, но ClickHouse был на слуху и, изучив документацию и посмотрев несколько презентаций от ребят, которые уже успешно используют его в проде, мы тоже решили попробовать.

Из особенностей ClickHouse мы для себя выделили следующие:

  • Это колоночная БД – наши события имеют много атрибутов, которые удобно разложить по колонкам.

  • Хорошо подходит для Time-serias данных – ClickHouse может партиционировать таблицу по ключу, который является функцией от временной метки события, что позволяет хранить в общей партиции все события полученные, например, за один и тот же день. А это позволяет эффективно выполнять запросы, у которых задан период времени (сперва будут выбраны только нужные партиции, а уже потом только по ним выполняется остальная часть запроса).

  • Есть встроенная интеграция с Kafka – ClickHouse напрямую из Kafka умеет загружать данные в таблицы, при этом можно дополнительно выполнять обработку данных непосредственно в момент их загрузки.

  • Есть возможность подключить внешнюю реляционную БД – мы подключаем справочники, которые храним в PostgreSql, после чего их можно использовать как обычные таблицы в Sql-выражениях.

  • Богатый набор встроенных аналитических функций – например, для формирования «Тепловой карты» мы используем функцию Квантиль. А ещё мне очень нравится работа с массивами в ClickHouse – можно джойнить элементы колонки-массива одной таблицы со строками другой таблицы или трансформировать строки таблицы в колонку-массив.

  • Масштабируемая и отказоустойчивая – чем больше нод базы поднимете, тем больше данных можно будет хранить. Плюс есть встроенный механизм сжатия данных.

Что касается схемы БД, то она устроена примерно так:

  • Есть таблицы, которые содержат данные в более или менее исходном виде, т.е. в том, в котором поступают события из Kafka. Эти таблицы используются для формирования большинства отчётов.

  • Но также есть специализированные таблицы, которые хранят данные в ином виде, удобном для формирования каких-то конкретных отчётов.

  • И придерживаемся принципа: данные в БД готовим исходя из сценария их использования (какие именно запросы будем выполнять к БД и с какими условиями фильтрации).

Подготовка данных

Подготовка данных выполняется, по большому счёту, для того, чтобы сделать максимально больше работы заранее, а не в тот момент, когда придет запрос в ClickHouse для формирования отчёта.

Модуль подготовки данных – это Kafka Streams приложение, которое обрабатывает бизнес-события и выполняет следующие действия:

  • фильтрация – бизнес-событий может быть очень много (например, события перемещения сотрудника), а для некоторых отчётов все они не нужны, и их можно проредить;

  • денормализация и обогащение – чтобы при формировании отчёта лишний раз не выполнять join-ы, здесь мы заранее собираем все необходимые данные, например, по ИД сотрудника получаем его ФИО и название должностной позиции;

  • форматирование – приведение дат к нужному формату, форматирование дробных чисел, локализация текста и т.п.

Формирование отчётов

Для того, чтобы получить отчёт, «заказчик» отчёта отправляет в топик Kafka report-request событие-запрос. Здесь мы отходим от принципа «один тип события – один топик» – для каждого конкретного отчёта существует свой тип события, который содержит специфичные для данного отчёта параметры. Но при этом все они унаследованы от базового класса ReportRequest.

Эти ReportRequest-ы обрабатывает модуль формирования отчётов:

  • по типу запроса определяется генератор запроса;

  • генератор формирует специфичный для отчёта SQL-запрос в ClickHouse, и, при необходимости, выполняет дополнительную обработку результата SQL-запроса уже на стороне java-кода;

  • результат работы генератора, если нужно, преобразуется в Excel или Pdf файл, который сохраняется в файловое хранилище;

  • создается событие-ответ, унаследованное от базового ReportResponse-а, которое содержит данные отчёта или ссылку на файл в хранилище.

  • в событие-ответ добавляется информация о событии-запросе, чтобы заказчик отчёта мог связать ответ с ранее отправленных запросов, после чего оно публикуется в топик report-response.

Мы выбрали такую схему формирования отчётов, потому что:

  • хотели сделать запрос отчётов асинхронным, чтобы не блокировать заказчика отчёта, пока тот формируется (обычно отчёты формируются быстро, но если задать очень большой временной интервал, за который нужно получить данные, то придется подождать);

  • хотели предоставить возможность формировать отчёты любым клиентам, у которых будет доступ к соответствующим топикам Kafka; это могут быть как модули «Цифрового рабочего», так и внешние системы, с которыми мы интегрируемся;

  • использование Kafka автоматически позволяет масштабировать модули формирования – просто поднимаем несколько узлов и получаем балансировку нагрузки (каждый узел обрабатывает свои партиции топика report-request).

Файловое хранилище

Изначально мы не очень понимали, какой именно интерфейс файлового хранилища нам подойдет, но было понятно, что нужно уметь хранить в нем:

  • 3D-модели зданий, которые загружаются на клиента при отрисовке карты.

  • Сформированные отчёты.

  • Возможно, какой-то ещё статический контент.

Сначала выбрали Apache Sling. Это контент-система, которая хранит данные в иерархической структуре и позволяет обращаться к файлам по URL-у, в котором отражен путь до соответствующего узла в иерархии. С точки зрения работы клиента, Sling довольно удобное решение – просто загружаешь файл по URL-у напрямую. Со стороны backend-а тоже особо проблем не было – java-api довольно простой.

Но через некоторое время мы столкнулись с тем, что Sling начал тормозить – и тем сильнее, чем больше файлов в нем хранилось. Оказалось, для нашего случая с получением списка всех отчётов, которые сформировал пользователь, мы заставляли Sling обходить какой-то кусок его иерархии файлов и вычитывать каждый файл, чтобы посчитать его размер. Варианты, как это оптимизировать, виделись такие:

  • Научиться получать большой список файлов из Sling-а так, чтобы не вычитывать каждый файл всякий раз для получения его размера;

  • Попробовать другое хранилище.

Мы пошли сразу двумя путями, и второй путь оказался пройден быстрее, а также давал ещё другие преимущества.

Заменили Sling на S3. У него не было проблем с получением большого списка файлов, а как бонус – мы перестали поднимать свое файловое хранилище на стендах, а стали использовать S3, поднятое в облаке КРОК. Т.е. немного упростили процесс развертывания системы.

Как это было: потоки событий и воспроизведение истории

Если посмотреть на все модули системы вместе, то можно связать их потоками событий, которые протекают между ними:

Потоки событий, которые приходят в UI, используются для отображения перемещения сотрудников, отрисовки статуса геозон, вывода предупреждений и другой информации, которая отражает состояние дел в текущий момент времени. Но когда происходит внештатная ситуация, то бывает очень полезно посмотреть, а что происходило в тот момент, когда эта ситуация возникла, и за некоторое время до неё.

Для этого в «Цифровом рабочем» используется механизм воспроизведения истории, где оператор может задать период времени в прошлом, и система начнет воспроизводить перемещения сотрудников и прочую информацию, как если бы это происходило сейчас. При этом оператор может изменять скорость воспроизведения.

Этот механизм реализован следующим образом:

  • UI подписывается на нужные ему события с помощью нашего KafkaConsumer-а, который инкапсулирует взаимодействие с Kafka и имеет два режима работы: REAL_TIME и PAST_TIME.

  • Если включен режим REAL_TIME, то KafkaConsumer просто передает события, которые в данный момент поступают в топики Kafka.

  • А когда включен режим PAST_TIME (при его включении необходимо задать период времени в прошлом), то KafkaConsumer вычитывает события из топиков, начиная с offset-а, соответствующего началу периода, и заканчивая offset-ом конца периода. При этом KafkaConsumer делает задержки между событиями, чтобы передавать их потребителю с тем же темпом, с которым они поступали в реальном времени.

Плюсы этого решения такие:

  • для истории не нужно отдельного хранилища – вся история хранится в Kafka; при этом retention можно задавать большим только для тех топиков, которые нужны для воспроизведения истории;

  • KafkaConsumer с двумя режимами работы обеспечивает прозрачность для потребителя, основная логика которого никак не зависит от выбранного режима.

Но есть и минус: в начальный момент времени периода в прошлом мы не можем получить полное состояние системы. Предположим, в 12:00 в кабинет вошло 10 сотрудников, а к 13:00 у половины из них сел аккумулятор метки, и она перестала передавать события со своим местоположением. Тогда воспроизводя историю с 13:00, мы увидим в этом кабинете только 5 сотрудников, у которых метки продолжили работать, т.к. только от этих меток в топиках будут события, начиная с 13:00.

Я подозреваю, что решить эту проблему только на базе Kafka не получится. Но сейчас текущее решение удовлетворяет нашим потребностям, поэтому мы его не трогаем. А как вариант на будущее, можно будет перенести хранение истории в ClickHouse и сделать Consumer, который будет для REAL_TIME-режима ходить в Kafka, а для PAST_TIME – в ClickHouse.

А что у вас?

Несмотря на то, что «Цифровой рабочий» уже много чего умеет, продукт продолжает развиваться. Мы постоянно пилим какие-то новые фичи, подключаем новые системы позиционирования, что-то переделываем и оптимизируем, экспериментируем с разными технологиями потоковой обработки, прикручиваем ML (вот здесь, например, о том, что сделали интеграцию с системой видеоаналитики). В общем, куча идей и планов на будущее!

Было бы интересно услышать в комментариях, разрабатываете ли вы что-то подобное, например, в области IoT с потоковой обработкой данных, какие технологии применяете и как?

И, кстати, вот: «Оцифровка рабочего в режиме реального времени» – мой доклад на SmartData conf.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *