ribs_rill_io
ribs_rill_io provides purely functional file system and network I/O for Dart, built on top of ribs_rill and ribs_effect. All operations are expressed as IO, Resource, or Rill values, giving you composable, resource-safe I/O that integrates naturally with the rest of the ribs ecosystem.
Full documentation is available at the ribs website.
Key Features
- Streaming file I/O: Read and write files as
Rill<int>byte streams orRill<String>line streams. - Resource-safe handles: File handles and sockets are managed as
Resource, ensuring they are always closed. - Type-safe paths: The
Pathextension type wraps strings with path manipulation utilities. - File watching: React to file system events as a
Rill<WatcherEvent>stream. - TCP & UDP networking: Connect, bind, and accept connections with
Network, backed by streamingSocketandServerSockettypes. - Platform abstraction: Implementations target
dart:iowith stub fallbacks for unsupported platforms.
File Operations
Reading Files
import 'package:ribs_rill_io/ribs_rill_io.dart';
// Stream raw bytes
final bytes = Files.readAll(Path('data.bin'));
// Stream UTF-8 text lines
final lines = Files.readUtf8Lines(Path('log.txt'));
await lines
.evalMap((line) => IO.print(line))
.compile
.drain
.unsafeRunFuture();
Writing Files
// Write bytes via a Pipe
await Rill.emits(Chunk.fromList([104, 101, 108, 108, 111]))
.through(Files.writeAll(Path('out.bin')))
.compile
.drain
.unsafeRunFuture();
// Write UTF-8 lines
await Rill.emits(Chunk.fromList(['line one', 'line two']))
.through(Files.writeUtf8Lines(Path('out.txt')))
.compile
.drain
.unsafeRunFuture();
Rotating Log Files
writeRotate switches to a new file whenever the current one reaches the size limit.
int fileIndex = 0;
final rotatePipe = Files.writeRotate(
IO.delay(() => Path('log-${fileIndex++}.txt')),
limit: 1024 * 1024, // 1 MB per file
flags: Flags.Append,
);
Tailing a File
Stream the contents of a file and continue emitting as it grows, like tail -f.
final tail = Files.tail(
Path('app.log'),
pollDelay: const Duration(milliseconds: 250),
);
Low-Level File Handles
For precise control use Files.open directly.
final program = Files.open(Path('data.bin'), Flags.Read).use((handle) {
return handle.size.flatMap((size) {
return handle.read(size, 0).flatMap((chunk) {
return IO.print('Read ${chunk.map((c) => c.size).getOrElse(() => 0)} bytes');
});
});
});
Directory Operations
// List a directory recursively
await Files.list(Path('src'))
.evalMap((path) => IO.print(path.toString()))
.compile
.drain
.unsafeRunFuture();
// Common file system operations
await Files.createDirectory(Path('output')).unsafeRunFuture();
await Files.copy(Path('a.txt'), Path('b.txt')).unsafeRunFuture();
await Files.move(Path('old.txt'), Path('new.txt')).unsafeRunFuture();
await Files.delete(Path('temp.txt')).unsafeRunFuture();
Watching for Changes
await Files.watch(Path('config'), types: [WatcherEventType.modified])
.evalMap((event) => IO.print('Changed: ${event.path}'))
.compile
.drain
.unsafeRunFuture();
Temporary Files
final result = Files.tempFile.use((path) {
return Files.writeUtf8Lines(path)
.apply(Rill.emit('hello'))
.compile
.drain
.productR(Files.readUtf8Lines(path).compile.toIList());
});
Network Operations
TCP Client
import 'package:ribs_ip/ribs_ip.dart';
final address = SocketAddress.fromParts(ip4"127.0.0.1", port"8080");
final program = Network.connect(address).use((socket) {
final send = Rill.emits(Chunk.fromList('hello'.codeUnits))
.through(socket.writes)
.compile
.drain;
final receive = socket.reads
.through(utf8Decode)
.evalMap((msg) => IO.print('Received: $msg'))
.compile
.drain;
return send.productR(receive);
});
TCP Server
final address = SocketAddress.fromParts(ip4"0.0.0.0", port"9000");
final server = Network.bindAndAccept(address).evalMap((socket) {
// Echo server: pipe reads back to writes
return socket.reads
.through(socket.writes)
.compile
.drain
.start(); // handle each connection in its own fiber
});
await server.compile.drain.unsafeRunFuture();
UDP
final address = SocketAddress.fromParts(ip4"0.0.0.0", port"5000");
final program = Network.bindDatagramSocket(address).use((udp) {
return udp.reads
.evalMap((datagram) {
final msg = String.fromCharCodes(datagram.bytes.toList());
return IO.print('From ${datagram.remote}: $msg');
})
.compile
.drain;
});
Example
See a complete working example in example/example.dart.
Libraries
- ribs_rill_io
- Purely functional, streaming I/O for Dart.