Перейти к основному содержимому
Перейти к основному содержимому

Шаблон Dataflow Pub/Sub to ClickHouse

Шаблон Pub/Sub to ClickHouse — это потоковый конвейер, который считывает сообщения в кодировке JSON из подписки Pub/Sub и записывает их в таблицу ClickHouse. Сообщения, которые не удаётся разобрать или сопоставить с целевой схемой, направляются в пункт назначения dead-letter: таблицу ClickHouse, topic Pub/Sub или в оба сразу.

Требования к конвейеру

  • Исходная подписка Pub/Sub должна существовать.
  • Сообщения, публикуемые в подписку, должны быть валидным JSON.
  • Целевая таблица ClickHouse должна существовать, а имена ее столбцов должны совпадать с именами полей в JSON-полезной нагрузке.
  • Хост ClickHouse должен быть доступен с машин-воркеров Dataflow.
  • Должно быть указано как минимум одно назначение для dead-letter сообщений (clickHouseDeadLetterTable или deadLetterTopic). Если указаны оба, сообщения с ошибками одновременно направляются в оба назначения.
  • Если задан clickHouseDeadLetterTable, таблица для dead-letter сообщений уже должна существовать в ClickHouse со схемой, показанной в разделе Обработка dead-letter сообщений.
  • Если задан deadLetterTopic, топик Pub/Sub уже должен существовать.

Параметры шаблона



Имя параметраОписание параметраОбязательноПримечания
inputSubscriptionПодписка Pub/Sub, из которой считываются сообщения. Пример: projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.Сообщения должны быть закодированы в JSON.
clickHouseUrlURL конечной точки ClickHouse. Используйте https:// для SSL-соединений (ClickHouse Cloud) или http:// для соединений без SSL. Пример: https://<HOST>:8443 или http://<HOST>:8123.Для ClickHouse Cloud используйте конечную точку HTTPS на порту 8443.
clickHouseDatabaseИмя базы данных ClickHouse, в которой находится целевая таблица. Пример: default.
clickHouseTableИмя таблицы ClickHouse, в которую нужно записывать данные.Таблица должна существовать до запуска конвейера.
clickHouseUsernameИмя пользователя для аутентификации в ClickHouse.
clickHousePasswordПароль для аутентификации в ClickHouse.
clickHouseDeadLetterTableТаблица ClickHouse, в которую записываются сообщения с ошибками. Пример: my_table_dead_letter.Необходимо указать как минимум один из параметров: clickHouseDeadLetterTable или deadLetterTopic. Таблица должна существовать и иметь схему dead-letter, показанную в разделе Обработка dead-letter.
deadLetterTopicТопик Pub/Sub, в который публикуются сообщения с ошибками. Пример: projects/<PROJECT_ID>/topics/<TOPIC_NAME>.Необходимо указать как минимум один из параметров: clickHouseDeadLetterTable или deadLetterTopic. Ошибочные полезные нагрузки публикуются в топик с атрибутами сообщения errorMessage и failedAt.
windowSecondsДлительность в секундах для временных окон батчинга.Сведения о взаимодействии с batchRowCount см. в разделе Батчинг и оконная обработка. Если не задан ни один из параметров, в комбинированном режиме используются значения по умолчанию 30s и 1000 строк.
batchRowCountКоличество строк, которое нужно накопить перед записью в ClickHouse.Сведения о взаимодействии с windowSeconds см. в разделе Батчинг и оконная обработка.
maxInsertBlockSizeМаксимальное количество строк в одном операторе INSERT, отправляемом в ClickHouse. Значение по умолчанию — 1,000,000.Параметр ClickHouseIO.
maxRetriesМаксимальное количество повторных попыток для неудачных вставок в ClickHouse. Значение по умолчанию — 5.Параметр ClickHouseIO.
insertDeduplicateВключать ли дедупликацию для запросов INSERT в реплицируемых таблицах ClickHouse. Значение по умолчанию — true.Параметр ClickHouseIO.
insertQuorumДля запросов INSERT в реплицируемых таблицах ждать, пока указанное число реплик не подтвердит запись и не линеаризует добавление данных. Значение 0 отключает запись по кворуму.Параметр ClickHouseIO. Отключено в настройках сервера по умолчанию.
insertDistributedSyncЕсли параметр включен, запросы INSERT в distributed таблицы ожидают, пока данные не будут отправлены на все узлы кластера. Значение по умолчанию — true.Параметр ClickHouseIO.
Примечание

Значения по умолчанию для всех параметров ClickHouseIO приведены в ClickHouseIO Apache Beam Connector.

Формат сообщений и сопоставление схем

Сообщения Pub/Sub должны представлять собой объекты JSON, имена полей верхнего уровня в которых должны в точности совпадать с именами столбцов целевой таблицы ClickHouse.

Чтобы сопоставить входящие сообщения с целевой таблицей, конвейер при запуске выполняет следующие действия:

  1. Получает схему целевой таблицы ClickHouse.
  2. Строит схему Beam Row на основе схемы ClickHouse.
  3. Для каждого входящего сообщения Pub/Sub разбирает полезную нагрузку JSON и формирует строку, считывая поля, указанные в схеме ClickHouse.

Ссылки

Имена полей JSON должны в точности совпадать с именами столбцов ClickHouse (с учетом регистра). Поля сообщения, которым не соответствует ни один столбец ClickHouse, игнорируются. Если для столбца ClickHouse нет соответствующего поля в полезной нагрузке JSON, конвейер пытается записать NULL в этот столбец — это возможно только в том случае, если столбец объявлен как Nullable. Сообщения, которые не удается разобрать, чьи значения невозможно привести к типу столбца или которые привели бы к записи NULL в столбец, не допускающий NULL, направляются в dead-letter destination.

Преобразование типов

Значения JSON приводятся к соответствующему типу столбца ClickHouse:

Тип ClickHouseПримечания
Float32Разбирается через Float.valueOf.
Float64Разбирается через Double.valueOf.
DateРазбирается как строка даты в формате ISO-8601.
DateTimeРазбирается как строка даты и времени в формате ISO-8601 (например, 2026-01-15T12:34:56Z).
Array(T)Массив JSON; каждый элемент преобразуется к типу элемента T. Пустые или отсутствующие массивы приводят к пустому массиву.
Integer types (Int8/Int16/Int32/Int64, UInt8/UInt16/UInt32/UInt64)Разбираются из числового значения JSON или его строкового представления.
StringИспользуется как есть для текстовых полей; нетекстовые узлы JSON сериализуются в строковое представление JSON.

Батчинг и оконная обработка

Поскольку конвейер работает в режиме стриминга, входящие строки накапливаются в окнах перед записью в ClickHouse. Стратегия формирования окон выбирается на основе указанных вами параметров:

windowSecondsbatchRowCountПоведение
заданне заданФиксированные окна по времени длительностью windowSeconds.
не заданзаданГлобальное окно с триггером по количеству; срабатывает каждые batchRowCount строк.
заданы обазаданы обаГлобальное окно с комбинированным триггером; срабатывает по первому выполненному условию (время или количество строк).
не задан ни одинне задан ни одинКомбинированный режим со значениями по умолчанию: 30 секунд или 1000 строк — в зависимости от того, что наступит раньше.

Подбирая эти значения, можно найти баланс между задержкой и эффективностью вставки. Меньшие окна снижают сквозную задержку; большие окна формируют меньшее число более крупных батчей INSERT.

Обработка dead-letter-сообщений

Сообщения, для которых не удалось выполнить разбор JSON, сопоставление со схемой или приведение типов, направляются в настроенные dead-letter-приёмники. Необходимо указать как минимум один из параметров clickHouseDeadLetterTable или deadLetterTopic; если заданы оба, сообщения с ошибками отправляются в оба.

Таблица ClickHouse dead-letter

Если задан параметр clickHouseDeadLetterTable, таблица dead-letter уже должна существовать и иметь следующую фиксированную схему:

СтолбецТипОписание
raw_messageStringИсходное содержимое сообщения Pub/Sub в виде текста UTF-8.
error_messageStringСообщение об исключении, объясняющее, почему не удалось обработать строку.
stack_traceStringПолная трассировка стека Java, сохранённая в момент сбоя.
failed_atDateTimeВременная метка момента обработки, когда произошёл сбой строки.

Минимальное определение для развертывания на одном узле:

CREATE TABLE my_table_dead_letter (
    raw_message   String,
    error_message String,
    stack_trace   String,
    failed_at     DateTime
) ENGINE = MergeTree()
ORDER BY failed_at;
Примечание

Адаптируйте движок и секцию ORDER BY под ваше развертывание: используйте ReplicatedMergeTree для реплицируемых таблиц, добавьте ON CLUSTER для распределенных развертываний и при необходимости скорректируйте партиционирование или TTL.

Pub/Sub dead-letter топик

Если задан deadLetterTopic, каждое сообщение, обработка которого завершилась ошибкой, повторно публикуется в этот топик со следующими данными:

  • Полезная нагрузка: исходные байты сообщения.
  • Атрибут errorMessage: сообщение исключения, зафиксированное в момент ошибки.
  • Атрибут failedAt: временная метка обработки, в которую произошла ошибка строки.

Это упрощает повторную обработку сообщений с ошибками после устранения проблемы со схемой или на стороне продюсера.

Запуск шаблона

Шаблон Pub/Sub to ClickHouse доступен в консоли Google Cloud.

Примечание

Обязательно ознакомьтесь с этим документом, особенно с разделами выше, чтобы полностью понять требования к конфигурации шаблона и необходимые предварительные условия.

Войдите в консоль Google Cloud и найдите Dataflow.

  1. Нажмите кнопку CREATE JOB FROM TEMPLATE.

    Консоль Dataflow
  2. Когда откроется форма шаблона, введите имя задачи и выберите нужный регион.

  3. В поле Dataflow Template введите ClickHouse или Pub/Sub и выберите шаблон Pub/Sub to ClickHouse.

  4. После выбора форма развернётся. Заполните:

    • Входную подписку Pub/Sub в формате projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
    • URL конечной точки ClickHouse — для ClickHouse Cloud используйте https://<HOST>:8443.
    • Базу данных ClickHouse, целевую таблицу, имя пользователя и пароль.
    • Как минимум один dead-letter адрес назначения: таблицу ClickHouse или topic Pub/Sub (либо оба варианта).
  5. При необходимости настройте параметры батчинга (windowSeconds, batchRowCount) и параметры тонкой настройки ClickHouseIO, как описано в разделе Параметры шаблона.

Мониторинг задачи

Перейдите на вкладку Dataflow Jobs в консоли Google Cloud, чтобы отслеживать состояние задачи. Там вы найдете сведения о задаче, включая прогресс и все ошибки:

Консоль Dataflow с выполняющейся задачей Pub/Sub to ClickHouse

Шаблон также отправляет следующие пользовательские метрики в пространстве имен PubSubToClickHouse; их можно просматривать на странице задачи Dataflow:

МетрикаТипОписание
messages-receivedCounterОбщее количество сообщений Pub/Sub, полученных на этапе разбора.
rows-parsed-okCounterСообщения, успешно преобразованные в строку и направленные в основной выход.
rows-parse-failedCounterСообщения, для которых не удалось выполнить разбор или сопоставление со схемой и которые были направлены в dead-letter-очередь.
message-payload-bytesDistributionРаспределение размеров полезной нагрузки входящих сообщений Pub/Sub в байтах.

Устранение неполадок

Ошибка превышения общего лимита памяти (код 241)

Эта ошибка возникает, когда ClickHouse не хватает памяти при обработке больших батчей данных. Чтобы устранить проблему:

  • Увеличьте ресурсы экземпляра: переведите сервер ClickHouse на более крупный экземпляр с большим объёмом памяти, чтобы он мог справляться с нагрузкой при обработке данных.
  • Уменьшите размер батча: уменьшите batchRowCount (и/или maxInsertBlockSize) в конфигурации вашей задачи Dataflow, чтобы отправлять в ClickHouse меньшие фрагменты данных и снизить потребление памяти на один батч.

Все сообщения отправляются в dead-letter-приемник

Наиболее распространённые причины:

  • Имена полей в JSON не совпадают в точности с именами столбцов ClickHouse (сопоставление чувствительно к регистру).
  • Тип столбца не удаётся привести из значения JSON (например, если в столбце DateTime находится строка не в формате ISO-8601).
  • Схема целевой таблицы изменилась после запуска конвейера — схема считывается один раз при запуске. После внесения изменений в схему перезапустите задачу.

Проверьте столбцы error_message и stack_trace в dead-letter-таблице ClickHouse (или атрибут errorMessage в dead-letter-сообщениях Pub/Sub), чтобы определить первопричину.

Конвейер запускается, но строки не поступают в ClickHouse

  • Убедитесь, что подписка получает сообщения — проверьте метрику messages-received на странице задачи Dataflow.
  • В режиме на основе времени (только windowSeconds) строки сбрасываются только на границах окна. Уменьшите windowSeconds, чтобы убедиться, что сброс действительно происходит.
  • Проверьте сетевую связность между воркерами Dataflow и конечной точкой ClickHouse (firewall, VPC peering или Private Service Connect).

Исходный код шаблона

Исходный код шаблона доступен в следующих репозиториях: