CREATE SOURCES

CREATE SOURCES connects Materialize to some data source, and lets you interact with its data as if it were in a SQL table.

Conceptual framework 

To provide data to Materialze, you must create “sources”, which is a catchall term for a resource Materialize can read data from. For more detail about how sources work within the rest of Materialze, check out our architecture overview.

Materialize supports the following types of sources:

Streaming sources 

Materialize can ingest data from Kafka topics that publish a change feed from an underlying relational database, e.g. MySQL. In Materialize’s current iteration, this only works with databases set up to publish a change feed to Kafka through Debezium, a change data capture (CDC) tool for relational databases.

Materialize also needs to understand the structure of the source, so it relies on receiving the topic’s schema from either a Confluent Schema Registry or a user-supplied Avro schema. Debezium handles this automatically by using Confluent Schema Registry, so most users do not need to do anything for this step.

After you create the source, Materialize automatically collects all of the data that streams in, which is then used to supply your queries and views with data.

File sources 

Materialize can ingest data set from sources like .csv files or unstructured log file, in one of two ways:

Syntax 

Create multiple sources 

CREATE SOURCES can only be used to create streaming sources.

CREATE SOURCES LIKE schema_expr FROM kafka_src USING SCHEMA REGISTRY registry_src avro_schema
Field Use
LIKE schema_expr If using a Confluent schema registry, create sources from all Kafka sources that match schema_expr. If not used, Materialize creates sources for all topics published at kafka_src.

The name for all sources is the name of the table that originated from within your database.
FROM kafka_src The Kafka source you want to use (begins with kafka://)
REGISTRY registry_src Use the Confluent schema registry at registry_src to define the structure of the Kafka source.
avro_schema The Avro schema for the topic.

Create single source 

CREATE SOURCE can be used to create streaming or file sources.

CREATE SOURCE src_name FROM kafka_src USING SCHEMA REGISTRY registry_src avro_schema local_file WITH ( field = val , )
Field Use
src_name The name for the source, which is used as its table name within SQL.
FROM kafka_src The Kafka source you want to use (begins with kafka://).
FROM local_file The absolute path to the local file you want to use as a source (begins with file://).
REGISTRY registry_src Use the Confluent schema registry at registry_src to define the structure of the Kafka source.
avro_schema The Avro schema for the topic.
WITH ( option_list ) Instructions for parsing your file sources. For more detail, see WITH options.

WITH options 

The following options are valid within the WITH clause.

Field Value
tail Continually check the file for new content; as new content arrives, process it using other WITH options.
regex Use the provided regex to populate data (i.e. columns) in the source. For more info, see Regex on file sources.
format The file source’s format. Currently, csv is the only supported format. Unstructured files should not use this option and should rely on the regex option instead. For more info, see CSV sources.
columns The number of columns to read from the source; Materialize discards rows with other numbers of fields. When used with csv, this must be equal to the number of fields in the CSV’s row.

All field names are case-sensitive.

Streaming source details 

Overview 

Materialize receives data from Kafka sources, which are set up to publish a change feed from a relational database (also known as “change data capture” or CDC).

For Materialize to meaningfully process arbitrary changes from the upstream database, it requires a “diff envelope”, which describes records’ old and new values. In the current landscape, the only CDC tool that provides a diff envelope from relational databases is Debezium.

This means to create sources within Materialize, you must:

Note that this is handled automatically through Debezium.

After creating the source and connecting to Kafka, Materialize receives all of the data published to the Kafka topic.

Source requirements 

Materialize requires that the data it receives from Kafka have a structure that contains both the old and new values for any fields within a record (a “diff envelope”), which lets Differential dataflow process arbitrary changes to the underlying data, e.g. expressing a record’s deletion.

The only tool that we’re aware of that provide data to Kafka in this form is Debezium through its envelope structure that contains a before and after field (although CockroachDB’s change data capture feature is close to operational, as well).

Ultimately, this means that the only sources that you can create must originate from Debezium, which must be published through Kafka.

Source schema 

When creating a source, Materialize looks up stream’s structure using a Confluent Schema Registry or with the Avro schema you define. The stream’s schema is then used to create the source within Materialize, which is essentially equivalent to a table in the language of RDBMSes.

Once Materialize has determined the structure of the stream, it connects the underlying Kafka stream directly to its Differential dataflow engine.

Data storage 

As data streams in from Kafka, Materialize’s internal Differential instance builds an arrangement (which is roughly equivalent to an index in the language of RDBMSes). When the source is initially created this will receive all of the data that the Kafka stream contains for the topic, e.g. the last 24 hours of data, and construct its initial arrangement from that. As new data streams in, Materialize will collect that, as well.

However, Materialize has a separate garbage collection period for arrangements, which works independently from the Kafka stream’s retention period.

When creating views, they are populated with all of the data from their sources that are available from the arrangements within Differential.

File source details 

File sources have the following caveats, in addition to any details of the specific file type you’re using:

CSV sources 

Regex on file sources 

By using the regex flag in the WITH option list, you can apply a structure to a string of text that gets read from a file. This is particularly useful when processing unstructured log files.

Examples 

Using a Confluent schema registry 

CREATE SOURCES
LIKE 'mysql.simple.%'
FROM 'kafka://kafka:9092'
USING SCHEMA REGISTRY 'http://schema-registry:8081';

Manually defining Avro schema 

CREATE SOURCE 'mysql.simple.user'
FROM 'kafka://kafka:9092'
USING SCHEMA '{
  "type": "record",
  "name": "envelope",
  "fields": [
    {
      "name": "before",
      "type": [
        {
          "name": "row",
          "type": "record",
          "fields": [
            {"name": "a", "type": "long"},
            {"name": "b", "type": "long"}
          ]
        },
        "null"
      ]
    },
    { "name": "after", "type": ["row", "null"] }
  ]
}';

Creating a source from a static CSV 

CREATE SOURCE test
FROM 'file:///test.csv'
WITH (
    format='csv',
    columns=5
);

Creating a source from a dynamic, unstructured file 

In this example, we’ll assume we have xxd creating hex dumps for some incoming files. Its output might look like this:

00000000: 7f45 4c46 0201 0100 0000 0000 0000 0000  .ELF............
00000010: 0300 3e00 0100 0000 105b 0000 0000 0000  ..>......[......
00000020: 4000 0000 0000 0000 7013 0200 0000 0000  @.......p.......

We’ll create a source that takes in these entire lines and extracts the file offset, as well as the decoded value.

CREATE SOURCE hex
FROM  'file:///xxd.log'
WITH (
    regex='(?P<offset>[0-9a-f]{8}): (?:[0-9a-f]{4} ){8} (?P<decoded>.*)$',
    tail=true
);

This creates a source…

Using the above example, this would generate:

 offset  |     decoded
---------+------------------
00000000 | .ELF............
00000010 | ..>......[......
00000020 | @.......p.......