Streaming API
The streaming API allows processing data in chunks without loading everything into memory. This is essential for large files or when integrating with I/O systems.
Streaming Interfaces
Section titled “Streaming Interfaces”compressionz provides streaming wrappers that implement Zig’s standard Reader and Writer interfaces:
- Compressor — Wraps a writer, compresses data written to it
- Decompressor — Wraps a reader, decompresses data read from it
Supported Codecs
Section titled “Supported Codecs”| Codec | Streaming Support |
|---|---|
gzip | ✅ Full |
zlib | ✅ Full |
deflate | ✅ Full |
zstd | ✅ Full |
brotli | ✅ Full |
lz4 | ✅ Full |
lz4_raw | ❌ No |
snappy | ❌ No |
Streaming Decompression
Section titled “Streaming Decompression”decompressor
Section titled “decompressor”Create a decompressor that wraps a reader.
pub fn decompressor( codec: Codec, allocator: std.mem.Allocator, source: anytype, // Must be a Reader) !Decompressor(@TypeOf(source))Example: Decompress a File
Section titled “Example: Decompress a File”const std = @import("std");const cz = @import("compressionz");
pub fn decompressFile(allocator: std.mem.Allocator, path: []const u8) ![]u8 { // Open compressed file const file = try std.fs.cwd().openFile(path, .{}); defer file.close();
// Create decompressor var decomp = try cz.decompressor(.gzip, allocator, file.reader()); defer decomp.deinit();
// Read all decompressed data return decomp.reader().readAllAlloc(allocator, 100 * 1024 * 1024);}Example: Process in Chunks
Section titled “Example: Process in Chunks”pub fn processCompressedStream(allocator: std.mem.Allocator, source: anytype) !void { var decomp = try cz.decompressor(.zstd, allocator, source); defer decomp.deinit();
var buffer: [4096]u8 = undefined; const reader = decomp.reader();
while (true) { const n = try reader.read(&buffer); if (n == 0) break;
// Process chunk processChunk(buffer[0..n]); }}Decompressor Type
Section titled “Decompressor Type”pub fn Decompressor(comptime ReaderType: type) type { return struct { /// Get a reader interface pub fn reader(self: *@This()) Reader { ... }
/// Clean up resources pub fn deinit(self: *@This()) void { ... } };}Streaming Compression
Section titled “Streaming Compression”compressor
Section titled “compressor”Create a compressor that wraps a writer.
pub fn compressor( codec: Codec, allocator: std.mem.Allocator, dest: anytype, // Must be a Writer options: CompressOptions,) !Compressor(@TypeOf(dest))Example: Compress to File
Section titled “Example: Compress to File”const std = @import("std");const cz = @import("compressionz");
pub fn compressToFile( allocator: std.mem.Allocator, data: []const u8, output_path: []const u8,) !void { // Create output file const file = try std.fs.cwd().createFile(output_path, .{}); defer file.close();
// Create compressor var comp = try cz.compressor(.gzip, allocator, file.writer(), .{}); defer comp.deinit();
// Write data (automatically compressed) try comp.writer().writeAll(data);
// IMPORTANT: Must call finish() to flush remaining data try comp.finish();}Example: Stream Large File
Section titled “Example: Stream Large File”pub fn compressLargeFile( allocator: std.mem.Allocator, input_path: []const u8, output_path: []const u8,) !void { const input = try std.fs.cwd().openFile(input_path, .{}); defer input.close();
const output = try std.fs.cwd().createFile(output_path, .{}); defer output.close();
var comp = try cz.compressor(.zstd, allocator, output.writer(), .{ .level = .default, }); defer comp.deinit();
// Stream in chunks var buffer: [65536]u8 = undefined; while (true) { const n = try input.read(&buffer); if (n == 0) break; try comp.writer().writeAll(buffer[0..n]); }
try comp.finish();}Compressor Type
Section titled “Compressor Type”pub fn Compressor(comptime WriterType: type) type { return struct { /// Get a writer interface pub fn writer(self: *@This()) Writer { ... }
/// Flush remaining data and finalize stream pub fn finish(self: *@This()) !void { ... }
/// Clean up resources pub fn deinit(self: *@This()) void { ... } };}Important: Always Call finish()
Section titled “Important: Always Call finish()”When using streaming compression, you must call finish() before closing the output:
var comp = try cz.compressor(.gzip, allocator, file.writer(), .{});defer comp.deinit();
try comp.writer().writeAll(data);try comp.finish(); // ← Required! Flushes internal buffersWithout finish(), the compressed output may be incomplete or corrupt.
Streaming with Options
Section titled “Streaming with Options”Compression Level
Section titled “Compression Level”var comp = try cz.compressor(.zstd, allocator, writer, .{ .level = .best, // Maximum compression});With Dictionary (Zstd only)
Section titled “With Dictionary (Zstd only)”var comp = try cz.compressor(.zstd, allocator, writer, .{ .dictionary = my_dictionary,});Memory Considerations
Section titled “Memory Considerations”Streaming uses internal buffers. Memory usage depends on the codec:
| Codec | Compression Buffer | Decompression Buffer |
|---|---|---|
| Gzip | ~256 KB | ~32 KB |
| Zstd | ~128 KB | ~128 KB |
| Brotli | ~1 MB | ~256 KB |
| LZ4 | ~64 KB | ~64 KB |
For memory-constrained environments, prefer Gzip or LZ4.
Piping Streams
Section titled “Piping Streams”Decompress and recompress in one pass:
pub fn recompress( allocator: std.mem.Allocator, input: anytype, output: anytype, from_codec: cz.Codec, to_codec: cz.Codec,) !void { var decomp = try cz.decompressor(from_codec, allocator, input); defer decomp.deinit();
var comp = try cz.compressor(to_codec, allocator, output, .{}); defer comp.deinit();
var buffer: [65536]u8 = undefined; const reader = decomp.reader();
while (true) { const n = try reader.read(&buffer); if (n == 0) break; try comp.writer().writeAll(buffer[0..n]); }
try comp.finish();}Error Handling
Section titled “Error Handling”Streaming operations can fail at any point:
var decomp = try cz.decompressor(.gzip, allocator, reader);defer decomp.deinit();
const data = decomp.reader().readAllAlloc(allocator, max_size) catch |err| switch (err) { error.InvalidData => { // Stream is corrupted }, error.ChecksumMismatch => { // Data integrity check failed }, error.StreamTooLong => { // Exceeded max_size }, else => return err,};Complete Example
Section titled “Complete Example”A command-line tool that compresses or decompresses files:
const std = @import("std");const cz = @import("compressionz");
pub fn main() !void { var gpa = std.heap.GeneralPurposeAllocator(.{}){}; defer _ = gpa.deinit(); const allocator = gpa.allocator();
const args = try std.process.argsAlloc(allocator); defer std.process.argsFree(allocator, args);
if (args.len < 4) { std.debug.print("Usage: {s} <compress|decompress> <input> <output>\n", .{args[0]}); return; }
const mode = args[1]; const input_path = args[2]; const output_path = args[3];
const input = try std.fs.cwd().openFile(input_path, .{}); defer input.close();
const output = try std.fs.cwd().createFile(output_path, .{}); defer output.close();
if (std.mem.eql(u8, mode, "compress")) { var comp = try cz.compressor(.gzip, allocator, output.writer(), .{}); defer comp.deinit();
var buf: [65536]u8 = undefined; while (true) { const n = try input.read(&buf); if (n == 0) break; try comp.writer().writeAll(buf[0..n]); } try comp.finish(); } else { var decomp = try cz.decompressor(.gzip, allocator, input.reader()); defer decomp.deinit();
var buf: [65536]u8 = undefined; while (true) { const n = try decomp.reader().read(&buf); if (n == 0) break; try output.writeAll(buf[0..n]); } }
std.debug.print("Done!\n", .{});}