CREATE SOURCES

CREATE SOURCES connects Materialize to some data source, and lets you interact with its data as if it were in a SQL table.

Conceptual framework 

To provide data to Materialze, you must create “sources”, which is a catchall term for a resource Materialize can read data from.

There are two types of sources within Materialize:

Streaming sources 

Materialize can ingest data from Kafka topics that publish a change feed from an underlying relational database, e.g. MySQL. In Materialize’s current iteration, this only works with databases set up to publish a change feed to Kafka through Debezium, a change data capture (CDC) tool for relational databases.

Materialize also needs to understand the structure of the source, so it relies on receiving the topic’s schema from either a Confluent Schema Registry or a user-supplied Avro schema. Debezium handles this automatically by using Confluent Schema Registry, so most users do not need to do anything for this step.

After you create the source, Materialize automatically collects all of the data that streams in, which is then used to supply your queries and views with data.

Static sources 

Materialize can ingest a static data set from a source like a .csv file. In doing this, Materialize simply reads the file, and writes the contents to its underlying Differential dataflow engine. Once the data’s been ingested, you can query it as you would any normal relational data.

Syntax 

Create multiple sources 

CREATE SOURCES can only be used to create streaming sources.

CREATE SOURCES LIKE schema_expr FROM kafka_src USING SCHEMA REGISTRY registry_src avro_schema
Field Use
LIKE schema_expr If using a Confluent schema registry, create sources from all Kafka sources that match schema_expr. If not used, Materialize creates sources for all topics published at kafka_src.

The name for all sources is the name of the table that originated from within your database.
FROM kafka_src The Kafka source you want to use (begins with kafka://)
REGISTRY registry_src Use the Confluent schema registry at registry_src to define the structure of the Kafka source.
avro_schema The Avro schema for the topic.

Create single source 

CREATE SOURCE can be used to create streaming or static sources.

CREATE SOURCE src_name FROM kafka_src USING SCHEMA REGISTRY registry_src avro_schema local_file WITH ( field = val , )
Field Use
src_name The name for the source, which is used as its table name within SQL.
FROM kafka_src The Kafka source you want to use (begins with kafka://).
FROM local_file The absolute path to the local file you want to use as a source (begins with file://).
REGISTRY registry_src Use the Confluent schema registry at registry_src to define the structure of the Kafka source.
avro_schema The Avro schema for the topic.
WITH ( option_list ) Instructions for parsing your static source file. For more detail, see WITH options.

WITH options 

The following options are valid within the WITH clause.

Field Value
format (Required) The static source’s format. Currently, csv is the only supported format.
columns (Required) The number of columns to read from the source.

All field names are case-sensitive.

Streaming source details 

Overview 

Materialize receives data from Kafka sources, which are set up to publish a change feed from a relational database (also known as “change data capture” or CDC).

For Materialize to meaningfully process arbitrary changes from the upstream database, it requires a “diff envelope”, which describes records’ old and new values. In the current landscape, the only CDC tool that provides a diff envelope from relational databases is Debezium.

This means to create sources within Materialize, you must:

Note that this is handled automatically through Debezium.

After creating the source and connecting to Kafka, Materialize receives all of the data published to the Kafka topic.

Source requirements 

Materialize requires that the data it receives from Kafka have a structure that contains both the old and new values for any fields within a record (a “diff envelope”), which lets Differential dataflow process arbitrary changes to the underlying data, e.g. expressing a record’s deletion.

The only tool that we’re aware of that provide data to Kafka in this form is Debezium through its envelope structure that contains a before and after field (although CockroachDB’s change data capture feature is close to operational, as well).

Ultimately, this means that the only sources that you can create must originate from Debezium, which must be published through Kafka.

Source schema 

When creating a source, Materialize looks up stream’s structure using a Confluent Schema Registry or with the Avro schema you define. The stream’s schema is then used to create the source within Materialize, which is essentially equivalent to a table in the language of RDBMSes.

Once Materialize has determined the structure of the stream, it connects the underlying Kafka stream directly to its Differential dataflow engine.

Data storage 

As data streams in from Kafka, Materialize’s internal Differential instance builds an arrangement (which is roughly equivalent to an index in the language of RDBMSes). When the source is initially created this will receive all of the data that the Kafka stream contains for the topic, e.g. the last 24 hours of data, and construct its initial arrangement from that. As new data streams in, Materialize will collect that, as well.

However, Materialize has a separate garbage collection period for arrangements, which works independently from the Kafka stream’s retention period.

When creating views, they are populated with all of the data from their sources that are available from the arrangements within Differential.

Static source details 

Creating static sources is more straightforward than streaming ones, though there are still a number of caveats.

CSV sources 

Examples 

Using a Confluent schema registry 

CREATE SOURCES
LIKE 'mysql.simple.%'
FROM 'kafka://kafka:9092'
USING SCHEMA REGISTRY 'http://schema-registry:8081';

Manually defining Avro schema 

CREATE SOURCE 'mysql.simple.user'
FROM 'kafka://kafka:9092'
USING SCHEMA '{
  "type": "record",
  "name": "envelope",
  "fields": [
    {
      "name": "before",
      "type": [
        {
          "name": "row",
          "type": "record",
          "fields": [
            {"name": "a", "type": "long"},
            {"name": "b", "type": "long"}
          ]
        },
        "null"
      ]
    },
    { "name": "after", "type": ["row", "null"] }
  ]
}';

Creating CSV source 

CREATE SOURCE test FROM 'file:///test.csv' WITH (format = 'csv', columns = 5);