Интеграция Google Pub/Sub с ClickHouse Cloud
Вы можете записаться в лист ожидания закрытой предварительной версии здесь.
Пайпы ClickPipes для Pub/Sub можно вручную развертывать и настраивать через интерфейс ClickPipes, а также создавать и управлять ими программно с помощью OpenAPI и Terraform.
Предварительные требования
Вы уже ознакомились с введением в ClickPipes, имеете доступ к проекту GCP, содержащему топик, из которого хотите получать данные, и создали сервисный аккаунт с соответствующими правами Pub/Sub. Точный набор прав, необходимых ClickPipes, см. в руководстве по разрешениям Pub/Sub IAM.
Создание первого ClickPipe
- Откройте SQL Console для вашего сервиса ClickHouse Cloud.

- В меню слева нажмите кнопку
Data Sources, затем — "Set up a ClickPipe"

- Выберите GCP Pub/Sub в качестве источника данных.

- Заполните форму: укажите имя ClickPipe, ваш GCP Project ID и JSON‑файл сервисного аккаунта для сервисного аккаунта, которому предоставлен доступ к Pub/Sub. Project ID должен содержать от 6 до 30 символов, может включать строчные буквы, цифры и дефисы, должен начинаться с буквы и не может заканчиваться дефисом.

-
Выберите топик Pub/Sub для приёма данных. Выпадающий список автоматически заполняется топиками из вашего проекта GCP (в алфавитном порядке), как только учётные данные будут успешно проверены.
- Формат данных. При выборе топика ClickPipes обращается к реестру схем Pub/Sub. Если к топику привязана встроенная схема Avro или Protobuf, формат данных и схема определяются автоматически, а селекторы фиксируются на последней схеме топика. Для топиков без встроенной схемы по умолчанию используется JSONEachRow.
- Начальное смещение. Выберите, откуда начинать чтение. Доступны варианты Latest (только новые сообщения), Earliest (самые старые сохранённые сообщения) и Seek to Timestamp (с выбором даты и времени в UTC).
- Выражение фильтрации (необязательно). Фильтр подписки Pub/Sub subscription filter по атрибутам сообщений — например,
attributes.type = "telemetry". Фильтры применяются только к атрибутам сообщений, а не к полезной нагрузке, и не могут быть изменены после создания пайпа (чтобы изменить фильтр, нужно пересоздать пайп). - В интерфейсе также будет показан пример сообщения из выбранного топика, а переключатель Flatten object позволит предварительно посмотреть, как вложенный JSON будет развёрнут на стороне назначения.

- На следующем шаге вы можете выбрать, хотите ли выполнять приём данных в новую таблицу ClickHouse или использовать существующую. Следуйте инструкциям на экране, чтобы изменить имя таблицы, схему и настройки. Вверху будет отображаться предварительный просмотр изменений в реальном времени на примере таблицы.

Вы также можете настроить дополнительные параметры с помощью доступных элементов управления

- Либо вы можете выбрать приём данных в существующую таблицу ClickHouse. В этом случае интерфейс позволит сопоставить поля источника с полями ClickHouse в выбранной целевой таблице.

- Наконец, вы можете настроить разрешения для внутреннего пользователя ClickPipes.
Разрешения: ClickPipes создаст отдельного пользователя для записи данных в целевую таблицу. Для этого внутреннего пользователя можно выбрать пользовательскую роль или одну из предопределённых ролей:
Full access: полный доступ к кластеру. Это может быть полезно, если вы используете materialized view или словарь с целевой таблицей.Only destination table: только разрешенияINSERTдля целевой таблицы.

- Нажмите "Complete Setup", чтобы система зарегистрировала ваш ClickPipe, после чего он появится в сводной таблице.


В сводной таблице доступны элементы управления для просмотра примеров данных из источника или из целевой таблицы в ClickHouse

А также элементы управления для удаления ClickPipe и просмотра сводки по задаче приёма данных.

- Поздравляем! Вы успешно настроили свой первый ClickPipe для Pub/Sub. Он будет непрерывно работать, выполняя приём данных в реальном времени из вашего топика Pub/Sub в сервис ClickHouse Cloud.
Управляемые подписки
Сообщения Pub/Sub потребляются через подписки, а не напрямую из топиков. ClickPipes создает и управляет отдельной подпиской для каждого пайпа — вам нужно выбрать только топик.
- Управляемая подписка называется
clickpipes-{pipeID}и создается в топике при запуске пайпа. - Для нее задаются срок подтверждения 60 секунд, хранение сообщений в течение 7 дней и включенное упорядочивание сообщений.
- Создание подписки идемпотентно — при перезапуске пайпа и повторном размещении реплики используется существующая подписка, если она уже указывает на настроенный топик.
- Во время обнаружения топиков и выборки сообщений ClickPipes также создает недолговечные эфемерные подписки (
clickpipes-discovery-{uuid}), которые удаляются сразу после завершения выборки. - При удалении пайпа ClickPipes удаляет и управляемую подписку в рамках этой процедуры.
Поэтому предоставленный вами сервисный аккаунт должен иметь права на создание и удаление подписок в проекте, а также на получение из них сообщений. Полный список см. в руководстве по разрешениям Pub/Sub IAM.
Поддерживаемые форматы данных
Поддерживаются следующие форматы:
- JSON
- Avro — через встроенные схемы Pub/Sub (двоичное кодирование)
- Protobuf — через встроенные схемы Pub/Sub (двоичное кодирование)
Для Avro и Protobuf схема берётся из реестра схем Pub/Sub для топика. Пайп всегда использует последнюю версию схемы топика; селектор схемы в UI намеренно доступен только для чтения.
Сжатие
ClickPipes for Pub/Sub автоматически определяет и распаковывает сжатые сообщения. Клиент Pub/Sub передаёт необработанные байты — ClickPipes выполняет распаковку без дополнительной настройки.
Поддерживаются следующие кодеки сжатия:
- gzip
- zstd
- lz4
- snappy (формат с кадрированием)
Сжатие определяется автоматически по магическим байтам в каждом сообщении. Если известная сигнатура сжатия не обнаружена, сообщение обрабатывается как несжатое. Определённый тип сжатия также отображается при определении схемы, поэтому в интерфейсе при предварительном просмотре образца данных будет корректно показана распакованная полезная нагрузка.
Автоопределение безопасно для текстовых форматов, таких как JSON, поскольку печатаемые символы ASCII не могут совпасть с магическими байтами сжатия. Размер распакованной полезной нагрузки ограничен 64 МБ.
Поддерживаемые типы данных
Поддержка стандартных типов
В настоящее время ClickPipes поддерживает следующие типы данных ClickHouse:
- Базовые числовые типы — [U]Int8/16/32/64, Float32/64 и BFloat16
- Большие целочисленные типы — [U]Int128/256
- Типы Decimal
- Boolean
- String
- FixedString
- Date, Date32
- DateTime, DateTime64 (только часовой пояс UTC)
- Enum8/Enum16
- UUID
- IPv4
- IPv6
- все типы ClickHouse LowCardinality
- Map с ключами и значениями любого из перечисленных выше типов (включая Nullable)
- Tuple и Array с элементами любого из перечисленных выше типов (включая Nullable, только один уровень вложенности)
- типы SimpleAggregateFunction (для целевых таблиц AggregatingMergeTree или SummingMergeTree)
Поддержка типа Variant
Вы можете вручную указать тип Variant (например, Variant(String, Int64, DateTime)) для любого JSON-поля
в исходном потоке данных. Поскольку ClickPipes определяет, какой подтип Variant использовать, в определении Variant можно указать только один целочисленный тип или тип DateTime —
например, Variant(Int64, UInt32) не поддерживается.
Поддержка типа JSON
Поля JSON, которые всегда содержат объект JSON, можно сопоставить с целевым столбцом типа JSON. Вам потребуется вручную изменить целевой столбец на нужный тип JSON, включая все фиксированные и пропускаемые пути.
Виртуальные столбцы Pub/Sub
Для топиков Pub/Sub поддерживаются следующие виртуальные столбцы. При создании новой целевой таблицы их можно добавить с помощью кнопки Add Column.
| Name | Description | Recommended Data Type |
|---|---|---|
| _message_id | ID сообщения Pub/Sub, назначенный брокером | String |
| _publish_time | временная метка публикации Pub/Sub (с точностью до миллисекунд, UTC) | DateTime64(3) |
| _ordering_key | ключ упорядочивания Pub/Sub (пустая строка, если для сообщения ключ не задан) | String |
| _attributes | пользовательские атрибуты сообщения Pub/Sub | Map(String, String) |
| _raw_message | полная полезная нагрузка сообщения Pub/Sub (отключено по умолчанию) | String |
Поле _raw_message можно использовать в случаях, когда требуется только полная полезная нагрузка сообщения Pub/Sub (например, при использовании функций ClickHouse JsonExtract* для заполнения materialized view на следующем этапе). Для таких пайпов удаление всех «невиртуальных» столбцов может повысить производительность ClickPipes.
Ограничения
- DEFAULT не поддерживается.
- По умолчанию размер отдельных сообщений ограничен 8 МБ (без сжатия) при использовании реплики минимального размера (XS) и 16 МБ (без сжатия) для более крупных реплик. Сообщения, превышающие этот лимит, будут отклонены с ошибкой. Если вам нужны сообщения большего размера, пожалуйста, обратитесь в службу поддержки.
- Фильтры подписки Pub/Sub неизменяемы — чтобы изменить выражение фильтра, необходимо заново создать pipe.
- Фильтры применяются только к атрибутам сообщения, а не к его полезной нагрузке.
Производительность
Формирование батчей
ClickPipes вставляет данные в ClickHouse батчами. Это позволяет избежать создания слишком большого количества частей в базе данных, что может привести к проблемам с производительностью кластера.
Батчи вставляются при выполнении одного из следующих условий:
- Размер батча достиг максимального значения (100 000 строк или 32 МБ на 1 ГБ памяти реплики)
- Батч оставался открытым в течение максимально допустимого времени (5 секунд)
Задержка
Задержка (то есть время между публикацией сообщения в Pub/Sub и моментом, когда оно становится доступным в ClickHouse) зависит от ряда факторов (задержки публикации, сетевой задержки, размера/формата сообщения). Формирование батчей, описанная в разделе выше, также влияет на задержку. Мы всегда рекомендуем протестировать ваш конкретный сценарий использования, чтобы понять, какой задержки ожидать.
Если у вас есть особые требования к низкой задержке, свяжитесь с нами.
Ключи упорядочивания
Pub/Sub гарантирует, что сообщения с одинаковым ключом упорядочивания доставляются одному подписчику в порядке публикации. В ClickPipes упорядочивание для управляемых подписок включено по умолчанию: если сообщения содержат ключи упорядочивания, подписчики получают их по порядку; если такие ключи отсутствуют, поведение не меняется.
Если ваш продьюсер публикует все сообщения с небольшим числом ключей упорядочивания (или с одним ключом), Pub/Sub будет направлять эти сообщения небольшому числу подписчиков, что может ограничить горизонтальную пропускную способность. Рекомендуется либо не использовать ключи упорядочивания, если упорядочивание не требуется, либо использовать ключ упорядочивания с высокой кардинальностью.
Масштабирование
ClickPipes for Pub/Sub поддерживает как горизонтальное, так и вертикальное масштабирование. Каждый пайп использует одну управляемую подписку Pub/Sub — это нельзя настроить. По умолчанию из этой подписки читает один потребитель; число потребителей можно увеличить при создании ClickPipe или в любой момент позже: Settings -> Advanced Settings -> Scaling. ClickPipes автоматически распределяет сообщения из подписки между запущенными потребителями — дополнительная координация не требуется.
ClickPipes обеспечивает высокую доступность за счет архитектуры, распределенной по зонам доступности; для этого нужно масштабирование как минимум до двух потребителей.
Независимо от числа запущенных потребителей отказоустойчивость обеспечивается на уровне архитектуры. Если потребитель или базовая инфраструктура, на которой он работает, выйдет из строя, ClickPipes автоматически перезапустит потребителя и продолжит обработку сообщений.
Семантика доставки
ClickPipes for Pub/Sub обеспечивает доставку как минимум один раз. Сообщение Pub/Sub подтверждается только после вставки соответствующей строки в ClickHouse (или записи в таблицу ошибок для некорректных записей); все сообщения подтверждаются после обработки, включая некорректные записи, направленные в таблицу ошибок, чтобы предотвратить бесконечную повторную доставку. Если реплика аварийно завершится после вставки, но до того, как подтверждение будет получено Pub/Sub, сообщение будет доставлено повторно после истечения срока подтверждения и снова вставлено, поэтому последующие потребители должны допускать наличие дубликатов. Если вам нужна семантика «ровно один раз», выполняйте дедупликацию далее по конвейеру с помощью виртуального столбца _message_id (каждый идентификатор сообщения Pub/Sub уникален в пределах топика).
Аутентификация
ClickPipes for Pub/Sub аутентифицируется в GCP с помощью JSON-ключа сервисного аккаунта. При создании пайпа вы загружаете файл ключа; ClickPipes шифрует его при хранении и использует во время работы для получения сообщений и управления жизненным циклом управляемой подписки.
Точный список необходимых разрешений IAM и рекомендуемое определение настраиваемой роли см. в руководстве по разрешениям IAM для Pub/Sub.