Шаблон Dataflow Pub/Sub to ClickHouse
Шаблон Pub/Sub to ClickHouse — это потоковый конвейер, который считывает сообщения в кодировке JSON из подписки Pub/Sub и записывает их в таблицу ClickHouse. Сообщения, которые не удаётся разобрать или сопоставить с целевой схемой, направляются в пункт назначения dead-letter: таблицу ClickHouse, topic Pub/Sub или в оба сразу.
Требования к конвейеру
- Исходная подписка Pub/Sub должна существовать.
- Сообщения, публикуемые в подписку, должны быть валидным JSON.
- Целевая таблица ClickHouse должна существовать, а имена ее столбцов должны совпадать с именами полей в JSON-полезной нагрузке.
- Хост ClickHouse должен быть доступен с машин-воркеров Dataflow.
- Должно быть указано как минимум одно назначение для dead-letter сообщений (
clickHouseDeadLetterTableилиdeadLetterTopic). Если указаны оба, сообщения с ошибками одновременно направляются в оба назначения. - Если задан
clickHouseDeadLetterTable, таблица для dead-letter сообщений уже должна существовать в ClickHouse со схемой, показанной в разделе Обработка dead-letter сообщений. - Если задан
deadLetterTopic, топик Pub/Sub уже должен существовать.
Параметры шаблона
| Имя параметра | Описание параметра | Обязательно | Примечания |
|---|---|---|---|
inputSubscription | Подписка Pub/Sub, из которой считываются сообщения. Пример: projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>. | ✅ | Сообщения должны быть закодированы в JSON. |
clickHouseUrl | URL конечной точки ClickHouse. Используйте https:// для SSL-соединений (ClickHouse Cloud) или http:// для соединений без SSL. Пример: https://<HOST>:8443 или http://<HOST>:8123. | ✅ | Для ClickHouse Cloud используйте конечную точку HTTPS на порту 8443. |
clickHouseDatabase | Имя базы данных ClickHouse, в которой находится целевая таблица. Пример: default. | ✅ | |
clickHouseTable | Имя таблицы ClickHouse, в которую нужно записывать данные. | ✅ | Таблица должна существовать до запуска конвейера. |
clickHouseUsername | Имя пользователя для аутентификации в ClickHouse. | ✅ | |
clickHousePassword | Пароль для аутентификации в ClickHouse. | ✅ | |
clickHouseDeadLetterTable | Таблица ClickHouse, в которую записываются сообщения с ошибками. Пример: my_table_dead_letter. | Необходимо указать как минимум один из параметров: clickHouseDeadLetterTable или deadLetterTopic. Таблица должна существовать и иметь схему dead-letter, показанную в разделе Обработка dead-letter. | |
deadLetterTopic | Топик Pub/Sub, в который публикуются сообщения с ошибками. Пример: projects/<PROJECT_ID>/topics/<TOPIC_NAME>. | Необходимо указать как минимум один из параметров: clickHouseDeadLetterTable или deadLetterTopic. Ошибочные полезные нагрузки публикуются в топик с атрибутами сообщения errorMessage и failedAt. | |
windowSeconds | Длительность в секундах для временных окон батчинга. | Сведения о взаимодействии с batchRowCount см. в разделе Батчинг и оконная обработка. Если не задан ни один из параметров, в комбинированном режиме используются значения по умолчанию 30s и 1000 строк. | |
batchRowCount | Количество строк, которое нужно накопить перед записью в ClickHouse. | Сведения о взаимодействии с windowSeconds см. в разделе Батчинг и оконная обработка. | |
maxInsertBlockSize | Максимальное количество строк в одном операторе INSERT, отправляемом в ClickHouse. Значение по умолчанию — 1,000,000. | Параметр ClickHouseIO. | |
maxRetries | Максимальное количество повторных попыток для неудачных вставок в ClickHouse. Значение по умолчанию — 5. | Параметр ClickHouseIO. | |
insertDeduplicate | Включать ли дедупликацию для запросов INSERT в реплицируемых таблицах ClickHouse. Значение по умолчанию — true. | Параметр ClickHouseIO. | |
insertQuorum | Для запросов INSERT в реплицируемых таблицах ждать, пока указанное число реплик не подтвердит запись и не линеаризует добавление данных. Значение 0 отключает запись по кворуму. | Параметр ClickHouseIO. Отключено в настройках сервера по умолчанию. | |
insertDistributedSync | Если параметр включен, запросы INSERT в distributed таблицы ожидают, пока данные не будут отправлены на все узлы кластера. Значение по умолчанию — true. | Параметр ClickHouseIO. |
Значения по умолчанию для всех параметров ClickHouseIO приведены в ClickHouseIO Apache Beam Connector.
Формат сообщений и сопоставление схем
Сообщения Pub/Sub должны представлять собой объекты JSON, имена полей верхнего уровня в которых должны в точности совпадать с именами столбцов целевой таблицы ClickHouse.
Чтобы сопоставить входящие сообщения с целевой таблицей, конвейер при запуске выполняет следующие действия:
- Получает схему целевой таблицы ClickHouse.
- Строит схему Beam
Rowна основе схемы ClickHouse. - Для каждого входящего сообщения Pub/Sub разбирает полезную нагрузку JSON и формирует строку, считывая поля, указанные в схеме ClickHouse.
Имена полей JSON должны в точности совпадать с именами столбцов ClickHouse (с учетом регистра). Поля сообщения, которым не соответствует ни один столбец ClickHouse, игнорируются. Если для столбца ClickHouse нет соответствующего поля в полезной нагрузке JSON, конвейер пытается записать NULL в этот столбец — это возможно только в том случае, если столбец объявлен как Nullable. Сообщения, которые не удается разобрать, чьи значения невозможно привести к типу столбца или которые привели бы к записи NULL в столбец, не допускающий NULL, направляются в dead-letter destination.
Преобразование типов
Значения JSON приводятся к соответствующему типу столбца ClickHouse:
| Тип ClickHouse | Примечания |
|---|---|
Float32 | Разбирается через Float.valueOf. |
Float64 | Разбирается через Double.valueOf. |
Date | Разбирается как строка даты в формате ISO-8601. |
DateTime | Разбирается как строка даты и времени в формате ISO-8601 (например, 2026-01-15T12:34:56Z). |
Array(T) | Массив JSON; каждый элемент преобразуется к типу элемента T. Пустые или отсутствующие массивы приводят к пустому массиву. |
Integer types (Int8/Int16/Int32/Int64, UInt8/UInt16/UInt32/UInt64) | Разбираются из числового значения JSON или его строкового представления. |
String | Используется как есть для текстовых полей; нетекстовые узлы JSON сериализуются в строковое представление JSON. |
Батчинг и оконная обработка
Поскольку конвейер работает в режиме стриминга, входящие строки накапливаются в окнах перед записью в ClickHouse. Стратегия формирования окон выбирается на основе указанных вами параметров:
windowSeconds | batchRowCount | Поведение |
|---|---|---|
| задан | не задан | Фиксированные окна по времени длительностью windowSeconds. |
| не задан | задан | Глобальное окно с триггером по количеству; срабатывает каждые batchRowCount строк. |
| заданы оба | заданы оба | Глобальное окно с комбинированным триггером; срабатывает по первому выполненному условию (время или количество строк). |
| не задан ни один | не задан ни один | Комбинированный режим со значениями по умолчанию: 30 секунд или 1000 строк — в зависимости от того, что наступит раньше. |
Подбирая эти значения, можно найти баланс между задержкой и эффективностью вставки. Меньшие окна снижают сквозную задержку; большие окна формируют меньшее число более крупных батчей INSERT.
Обработка dead-letter-сообщений
Сообщения, для которых не удалось выполнить разбор JSON, сопоставление со схемой или приведение типов, направляются в настроенные dead-letter-приёмники. Необходимо указать как минимум один из параметров clickHouseDeadLetterTable или deadLetterTopic; если заданы оба, сообщения с ошибками отправляются в оба.
Таблица ClickHouse dead-letter
Если задан параметр clickHouseDeadLetterTable, таблица dead-letter уже должна существовать и иметь следующую фиксированную схему:
| Столбец | Тип | Описание |
|---|---|---|
raw_message | String | Исходное содержимое сообщения Pub/Sub в виде текста UTF-8. |
error_message | String | Сообщение об исключении, объясняющее, почему не удалось обработать строку. |
stack_trace | String | Полная трассировка стека Java, сохранённая в момент сбоя. |
failed_at | DateTime | Временная метка момента обработки, когда произошёл сбой строки. |
Минимальное определение для развертывания на одном узле:
Адаптируйте движок и секцию ORDER BY под ваше развертывание: используйте ReplicatedMergeTree для реплицируемых таблиц, добавьте ON CLUSTER для распределенных развертываний и при необходимости скорректируйте партиционирование или TTL.
Pub/Sub dead-letter топик
Если задан deadLetterTopic, каждое сообщение, обработка которого завершилась ошибкой, повторно публикуется в этот топик со следующими данными:
- Полезная нагрузка: исходные байты сообщения.
- Атрибут
errorMessage: сообщение исключения, зафиксированное в момент ошибки. - Атрибут
failedAt: временная метка обработки, в которую произошла ошибка строки.
Это упрощает повторную обработку сообщений с ошибками после устранения проблемы со схемой или на стороне продюсера.
Запуск шаблона
Шаблон Pub/Sub to ClickHouse доступен в консоли Google Cloud.
Обязательно ознакомьтесь с этим документом, особенно с разделами выше, чтобы полностью понять требования к конфигурации шаблона и необходимые предварительные условия.
Войдите в консоль Google Cloud и найдите Dataflow.
-
Нажмите кнопку
CREATE JOB FROM TEMPLATE.
-
Когда откроется форма шаблона, введите имя задачи и выберите нужный регион.
-
В поле
Dataflow TemplateвведитеClickHouseилиPub/Subи выберите шаблонPub/Sub to ClickHouse. -
После выбора форма развернётся. Заполните:
- Входную подписку Pub/Sub в формате
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>. - URL конечной точки ClickHouse — для ClickHouse Cloud используйте
https://<HOST>:8443. - Базу данных ClickHouse, целевую таблицу, имя пользователя и пароль.
- Как минимум один dead-letter адрес назначения: таблицу ClickHouse или topic Pub/Sub (либо оба варианта).
- Входную подписку Pub/Sub в формате
-
При необходимости настройте параметры батчинга (
windowSeconds,batchRowCount) и параметры тонкой настройкиClickHouseIO, как описано в разделе Параметры шаблона.
Мониторинг задачи
Перейдите на вкладку Dataflow Jobs в консоли Google Cloud, чтобы отслеживать состояние задачи. Там вы найдете сведения о задаче, включая прогресс и все ошибки:

Шаблон также отправляет следующие пользовательские метрики в пространстве имен PubSubToClickHouse; их можно просматривать на странице задачи Dataflow:
| Метрика | Тип | Описание |
|---|---|---|
messages-received | Counter | Общее количество сообщений Pub/Sub, полученных на этапе разбора. |
rows-parsed-ok | Counter | Сообщения, успешно преобразованные в строку и направленные в основной выход. |
rows-parse-failed | Counter | Сообщения, для которых не удалось выполнить разбор или сопоставление со схемой и которые были направлены в dead-letter-очередь. |
message-payload-bytes | Distribution | Распределение размеров полезной нагрузки входящих сообщений Pub/Sub в байтах. |
Устранение неполадок
Ошибка превышения общего лимита памяти (код 241)
Эта ошибка возникает, когда ClickHouse не хватает памяти при обработке больших батчей данных. Чтобы устранить проблему:
- Увеличьте ресурсы экземпляра: переведите сервер ClickHouse на более крупный экземпляр с большим объёмом памяти, чтобы он мог справляться с нагрузкой при обработке данных.
- Уменьшите размер батча: уменьшите
batchRowCount(и/илиmaxInsertBlockSize) в конфигурации вашей задачи Dataflow, чтобы отправлять в ClickHouse меньшие фрагменты данных и снизить потребление памяти на один батч.
Все сообщения отправляются в dead-letter-приемник
Наиболее распространённые причины:
- Имена полей в JSON не совпадают в точности с именами столбцов ClickHouse (сопоставление чувствительно к регистру).
- Тип столбца не удаётся привести из значения JSON (например, если в столбце
DateTimeнаходится строка не в формате ISO-8601). - Схема целевой таблицы изменилась после запуска конвейера — схема считывается один раз при запуске. После внесения изменений в схему перезапустите задачу.
Проверьте столбцы error_message и stack_trace в dead-letter-таблице ClickHouse (или атрибут errorMessage в dead-letter-сообщениях Pub/Sub), чтобы определить первопричину.
Конвейер запускается, но строки не поступают в ClickHouse
- Убедитесь, что подписка получает сообщения — проверьте метрику
messages-receivedна странице задачи Dataflow. - В режиме на основе времени (только
windowSeconds) строки сбрасываются только на границах окна. УменьшитеwindowSeconds, чтобы убедиться, что сброс действительно происходит. - Проверьте сетевую связность между воркерами Dataflow и конечной точкой ClickHouse (firewall, VPC peering или Private Service Connect).
Исходный код шаблона
Исходный код шаблона доступен в следующих репозиториях:
GoogleCloudPlatform/DataflowTemplates— основной репозиторий Google Cloud Platform.ClickHouse/DataflowTemplates— форк ClickHouse.