Skip to content

Streaming API

The streaming API allows processing data in chunks without loading everything into memory. This is essential for large files or when integrating with I/O systems.

compressionz provides streaming wrappers that implement Zig’s standard Reader and Writer interfaces:

  • Compressor — Wraps a writer, compresses data written to it
  • Decompressor — Wraps a reader, decompresses data read from it
CodecStreaming Support
gzip✅ Full
zlib✅ Full
deflate✅ Full
zstd✅ Full
brotli✅ Full
lz4✅ Full
lz4_raw❌ No
snappy❌ No

Create a decompressor that wraps a reader.

pub fn decompressor(
codec: Codec,
allocator: std.mem.Allocator,
source: anytype, // Must be a Reader
) !Decompressor(@TypeOf(source))
const std = @import("std");
const cz = @import("compressionz");
pub fn decompressFile(allocator: std.mem.Allocator, path: []const u8) ![]u8 {
// Open compressed file
const file = try std.fs.cwd().openFile(path, .{});
defer file.close();
// Create decompressor
var decomp = try cz.decompressor(.gzip, allocator, file.reader());
defer decomp.deinit();
// Read all decompressed data
return decomp.reader().readAllAlloc(allocator, 100 * 1024 * 1024);
}
pub fn processCompressedStream(allocator: std.mem.Allocator, source: anytype) !void {
var decomp = try cz.decompressor(.zstd, allocator, source);
defer decomp.deinit();
var buffer: [4096]u8 = undefined;
const reader = decomp.reader();
while (true) {
const n = try reader.read(&buffer);
if (n == 0) break;
// Process chunk
processChunk(buffer[0..n]);
}
}
pub fn Decompressor(comptime ReaderType: type) type {
return struct {
/// Get a reader interface
pub fn reader(self: *@This()) Reader { ... }
/// Clean up resources
pub fn deinit(self: *@This()) void { ... }
};
}

Create a compressor that wraps a writer.

pub fn compressor(
codec: Codec,
allocator: std.mem.Allocator,
dest: anytype, // Must be a Writer
options: CompressOptions,
) !Compressor(@TypeOf(dest))
const std = @import("std");
const cz = @import("compressionz");
pub fn compressToFile(
allocator: std.mem.Allocator,
data: []const u8,
output_path: []const u8,
) !void {
// Create output file
const file = try std.fs.cwd().createFile(output_path, .{});
defer file.close();
// Create compressor
var comp = try cz.compressor(.gzip, allocator, file.writer(), .{});
defer comp.deinit();
// Write data (automatically compressed)
try comp.writer().writeAll(data);
// IMPORTANT: Must call finish() to flush remaining data
try comp.finish();
}
pub fn compressLargeFile(
allocator: std.mem.Allocator,
input_path: []const u8,
output_path: []const u8,
) !void {
const input = try std.fs.cwd().openFile(input_path, .{});
defer input.close();
const output = try std.fs.cwd().createFile(output_path, .{});
defer output.close();
var comp = try cz.compressor(.zstd, allocator, output.writer(), .{
.level = .default,
});
defer comp.deinit();
// Stream in chunks
var buffer: [65536]u8 = undefined;
while (true) {
const n = try input.read(&buffer);
if (n == 0) break;
try comp.writer().writeAll(buffer[0..n]);
}
try comp.finish();
}
pub fn Compressor(comptime WriterType: type) type {
return struct {
/// Get a writer interface
pub fn writer(self: *@This()) Writer { ... }
/// Flush remaining data and finalize stream
pub fn finish(self: *@This()) !void { ... }
/// Clean up resources
pub fn deinit(self: *@This()) void { ... }
};
}

When using streaming compression, you must call finish() before closing the output:

var comp = try cz.compressor(.gzip, allocator, file.writer(), .{});
defer comp.deinit();
try comp.writer().writeAll(data);
try comp.finish(); // ← Required! Flushes internal buffers

Without finish(), the compressed output may be incomplete or corrupt.


var comp = try cz.compressor(.zstd, allocator, writer, .{
.level = .best, // Maximum compression
});
var comp = try cz.compressor(.zstd, allocator, writer, .{
.dictionary = my_dictionary,
});

Streaming uses internal buffers. Memory usage depends on the codec:

CodecCompression BufferDecompression Buffer
Gzip~256 KB~32 KB
Zstd~128 KB~128 KB
Brotli~1 MB~256 KB
LZ4~64 KB~64 KB

For memory-constrained environments, prefer Gzip or LZ4.


Decompress and recompress in one pass:

pub fn recompress(
allocator: std.mem.Allocator,
input: anytype,
output: anytype,
from_codec: cz.Codec,
to_codec: cz.Codec,
) !void {
var decomp = try cz.decompressor(from_codec, allocator, input);
defer decomp.deinit();
var comp = try cz.compressor(to_codec, allocator, output, .{});
defer comp.deinit();
var buffer: [65536]u8 = undefined;
const reader = decomp.reader();
while (true) {
const n = try reader.read(&buffer);
if (n == 0) break;
try comp.writer().writeAll(buffer[0..n]);
}
try comp.finish();
}

Streaming operations can fail at any point:

var decomp = try cz.decompressor(.gzip, allocator, reader);
defer decomp.deinit();
const data = decomp.reader().readAllAlloc(allocator, max_size) catch |err| switch (err) {
error.InvalidData => {
// Stream is corrupted
},
error.ChecksumMismatch => {
// Data integrity check failed
},
error.StreamTooLong => {
// Exceeded max_size
},
else => return err,
};

A command-line tool that compresses or decompresses files:

const std = @import("std");
const cz = @import("compressionz");
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
const args = try std.process.argsAlloc(allocator);
defer std.process.argsFree(allocator, args);
if (args.len < 4) {
std.debug.print("Usage: {s} <compress|decompress> <input> <output>\n", .{args[0]});
return;
}
const mode = args[1];
const input_path = args[2];
const output_path = args[3];
const input = try std.fs.cwd().openFile(input_path, .{});
defer input.close();
const output = try std.fs.cwd().createFile(output_path, .{});
defer output.close();
if (std.mem.eql(u8, mode, "compress")) {
var comp = try cz.compressor(.gzip, allocator, output.writer(), .{});
defer comp.deinit();
var buf: [65536]u8 = undefined;
while (true) {
const n = try input.read(&buf);
if (n == 0) break;
try comp.writer().writeAll(buf[0..n]);
}
try comp.finish();
} else {
var decomp = try cz.decompressor(.gzip, allocator, input.reader());
defer decomp.deinit();
var buf: [65536]u8 = undefined;
while (true) {
const n = try decomp.reader().read(&buf);
if (n == 0) break;
try output.writeAll(buf[0..n]);
}
}
std.debug.print("Done!\n", .{});
}