New in v2.1: The CREATE CHANGEFEED statement creates a new changefeed, which provides row-level change subscriptions.
Changefeeds target an allowlist of tables, called the "watched rows." Every change to a watched row is emitted as a record in a configurable format (JSON) to a configurable sink (Kafka).
For more information, see Change Data Capture.
This feature is under active development and only works for a targeted a use case. Please file a Github issue if you have feedback on the interface.
CREATE CHANGEFEED is an enterprise-only. There will be a core version in a future version.
Required privileges
Changefeeds can only be created by superusers, i.e., members of the admin role. The admin role exists by default with root as the member.
Synopsis
Parameters
| Parameter | Description |
|---|---|
table_name |
The name of the table (or tables in a comma separated list) to create a changefeed for. |
sink |
The location of the configurable sink. The scheme of the URI indicates the type; currently, only kafka. There are query parameters that vary per type. Currently, the kafka scheme only has topic_prefix, which adds a prefix to all of the topic names.Sink URI scheme: '[scheme]://[host]:[port][?topic_prefix=[foo]]' For example, CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://...?topic_prefix=bar_' would emit rows under the topic bar_foo instead of foo. |
option / value |
For a list of available options and their values, see Options below. |
Options
| Option | Value | Description |
|---|---|---|
updated |
N/A | Include updated timestamps with each row. |
resolved |
INTERVAL |
Periodically emit resolved timestamps to the changefeed. Optionally, set a minimum duration between emitting resolved timestamps. If unspecified, all resolved timestamps are emitted. Example: resolved='10s' |
envelope |
key_only / row |
Use key_only to emit only the key and no value, which is faster if you only want to know when the key changes.Default: envelope=row |
cursor |
Timestamp | Emits any changes after the given timestamp, but does not output the current state of the table first. If cursor is not specified, the changefeed starts by doing a consistent scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.cursor can be used to start a new changefeed where a previous changefeed ended.Example: CURSOR='1536242855577149065.0000000000' |
format |
json / 'experimental-avro' |
Format of the emitted record. Currently, support for Avro is limited and experimental. Default: format=json. |
confluent_schema_registry |
Schema Registry address | The Schema Registry address is required to use 'experimental-avro'. |
Examples
Create a changefeed
> CREATE CHANGEFEED FOR TABLE name
INTO 'kafka://host:port'
WITH updated, resolved;
+--------------------+
| job_id |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)
For more information on how to create a changefeed connected to Kafka, see Change Data Capture.
Create a changefeed with Avro
> CREATE CHANGEFEED FOR TABLE name
INTO 'kafka://host:port'
WITH format = 'experimental-avro', confluent_schema_registry = '<schema_registry_address>';
+--------------------+
| job_id |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)
For more information on how to create a changefeed that emits an Avro record, see Change Data Capture.
Manage a changefeed
Use the following SQL statements to pause, resume, and cancel a changefeed.
Changefeed-specific SQL statements (e.g., CANCEL CHANGEFEED) will be added in the future.
Pause a changefeed
> PAUSE JOB job_id;
For more information, see PAUSE JOB.
Resume a paused changefeed
> RESUME JOB job_id;
For more information, see RESUME JOB.
Cancel a changefeed
> CANCEL JOB job_id;
For more information, see CANCEL JOB.
Start a new changefeed where another ended
Find the high-water timestamp for the ended changefeed:
> SELECT * FROM crdb_internal.jobs WHERE job_id = <job_id>;
job_id | job_type | ... | high_water_timestamp | error | coordinator_id
+--------------------+------------+ ... +--------------------------------+-------+----------------+
383870400694353921 | CHANGEFEED | ... | 1537279405671006870.0000000000 | | 1
(1 row)
Use the high_water_timestamp to start the new changefeed:
> CREATE CHANGEFEED FOR TABLE name
INTO 'kafka//host:port'
WITH cursor = '<high_water_timestamp>';