Materialize Logo

Deployment

This page is a work in progress and will have more detail in the coming months. If you have specific questions, feel free to file a GitHub issue.

Memory

Materialize stores the majority of its state in memory, and works best when the streamed data can be reduced in some way. For example, if you know that only a subset of your rows and columns are relevant for your queries, it helps to avoid materializing sources or views until you’ve expressed this to the system. Materialize can then avoid stashing the full set of rows and columns, which can in some cases dramatically reduce Materialize’s memory footprint.

Compaction

To prevent memory from growing without bound, Materialize periodically “compacts” data in arrangements. For example, if you have a source that tracks product inventory, you might receive periodic inventory updates throughout the day:

(T-shirts, 9:07am, +500)
(T-shirts, 11:32am, -1)
(T-shirts, 3:14pm, -2)

Logical compaction will fold historical updates that fall outside the compaction window into the state at the start of the window.

(T-shirts, 3:14pm, +497)

Materialize will only perform this compaction on data that falls outside the logical compaction window. The default compaction window is 60 seconds behind the current time, but the window can be adjusted via the --logical-compaction-window option.

Adjusting the compaction window involves making a tradeoff between historical detail and resource usage. A larger compaction window retains more historical detail, but requires more memory. A smaller compaction window uses less memory but also retains less historical detail. Larger compaction windows also increase CPU usage, as more detailed histories require more compute time to maintain.

WARNING! Setting the compaction window too small can prevent Materialize from finding a suitable timestamp at which to execute a SELECT query if that query depends upon multiple sources.

Note that compaction is triggered in response to updates arriving. As a result, if updates stop arriving for a source, Materialize may never compact the source fully. This can result in higher-than-expected memory usage.

This phenomenon is particularly evident when ingesting a source with a large amount of historical data (e.g, a several gigabyte Kafka topic that is no longer changing). With the default compaction window of 60 seconds, it is likely that the source will be fully ingested within the compaction window. By the time the data is eligible for compaction, the source is fully ingested, no new updates are arriving, and therefore no compaction is ever triggered.

If the increased memory usage is problematic, consider one of the following solutions:

Swap

To minimize the chances that Materialize runs out of memory in a production environment, we recommend you make additional memory available to Materialize via a SSD-backed swap file or swap partition.

This is particularly important in Linux and in Docker, where swap may not be automatically set up for you.

Docker

By default, a container has no resource constraints and can use as much memory and swap as the host allows, unless you have overridden this with the --memory or the --memory-swap flags.

Linux

Most cloud Linux distributions do not enable swap by default. However, you can enable it quite easily with the following steps.

  1. Create a swapfile using the fallocate command.

    The general syntax is: fallocate -l <swap size> filename. For a 1GB swap file, for example:

    sudo fallocate -l 1G /swapfile
    
  2. Make the swap file only accessible to root:

    chmod 600 /swapfile
    
  3. Mark the file as swap space:

    mkswap /swapfile
    
  4. Enable the swap file:

    swapon /swapfile
    
  5. Verify the swap is available:

    swapon --show
    
    NAME      TYPE SIZE  USED PRIO
    /swapfile file   1G  0M   -2
    
  6. Optional. To make the swap file permanent, add an entry for it to /etc/fstab:

    cat '/swapfile none swap sw 0 0' >> /etc/fstab
    

Persistence

EXPERIMENTAL! This feature is under construction and requires experimental mode. Available since v0.4.2.

To avoid re-reading data from Kafka on restart, Materialize lets you create persistent sources, which persist input messages from Kafka topics to files on the Materialize instance’s local hard drive. The current version of persistence is not intended to speed up Materialize’s restart time, as there are other factors beyond Kafka broker read performance that contribute to high restart times.

We recommend enabling persistence if you are using Kafka sources, need to relieve load on upstream Kafka brokers, and are comfortable using experimental features.

WARNING! Materialize currently does not delete persisted records when the source is dropped. Additionally, Materialize does not currently compact persisted data. If you enable persistence on sources from compacted Kafka topics, Materialize will store and re-read all records that have been persisted, even if some of them were compacted by the upstream source.

Details

Any Kafka source can be declared as persistent source, irrespective of its format (Avro, text/bytes, csv, Protobuf and JSON).

Materialize stores one copy of all input data for each persistent Kafka source. Materialize stores these files in:

{data-directory}/persistence/{source-id}

Within this directory, Materialize writes to files named

materialize-{source-id}-{partition-id}-{start-offset}-{end-offset}

Here, each file stores data for ranges of offsets per partition-id. Each file stores all the data from start-offset (inclusive) to end-offset (exclusive). Materialize buffers input records in memory and flushes them every 10 minutes or immediately if the number of buffered records per source exceeds the --persistence-max-pending-records parameter. Setting this flag to a higher value helps Materialize achieve higher ingest and disk write throughput, however this also increases the average latency before records are persisted.

On restart, Materialize reads back all of the records that had been previously persisted in offset order, and then continues reading from the upstream source for data after the last persisted record in each partition.