valkey_client 2.3.0
valkey_client: ^2.3.0 copied to clipboard
A high-performance, cluster-aware Dart client for Redis and Valkey.
example/valkey_client_example.dart
/*
* Copyright 2025-2026 Infradise Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import 'dart:async';
import 'package:valkey_client/valkey_client.dart';
// Here is a basic example of how to connect and close the client.
// For more examples, check the `/example` folder.
/// This function contains all the command examples.
/// It accepts any client that implements [ValkeyClientBase],
/// demonstrating how robust the interface is.
Future<void> runCommandExamples(ValkeyClientBase client) async {
try {
// 1. Connect and authenticate
// The client will use whichever configuration it was given.
await client.connect();
print('✅ Connection successful!');
// --- PING (v0.2.0) ---
print('\n--- PING ---');
print("Sending: PING 'Hello'");
final pingResponse = await client.ping('Hello');
print('Received: $pingResponse');
// --- SET/GET (v0.3.0) ---
print('\n--- SET/GET ---');
print("Sending: SET greeting 'Hello, Valkey!'");
final setResponse = await client.set('greeting', 'Hello, Valkey!');
print('Received: $setResponse');
print('Sending: GET greeting');
final getResponse = await client.get('greeting');
print('Received: $getResponse');
// --- MGET (v0.4.0) ---
print('\n--- MGET (Array Parsing) ---');
print('Sending: MGET greeting non_existent_key');
final mgetResponse = await client.mget(['greeting', 'non_existent_key']);
print('Received: $mgetResponse'); // Should be "[Hello, Valkey!, null]"
// --- HASH (v0.5.0) ---
print('\n--- HASH (Map/Object) ---');
print("Sending: HSET user:1 name 'Valkyrie'");
final hsetResponse = await client.hset('user:1', 'name', 'Valkyrie');
print('Received (1=new, 0=update): $hsetResponse');
print("Sending: HSET user:1 project 'valkey_client'");
await client.hset('user:1', 'project', 'valkey_client');
print('Sending: HGET user:1 name');
final hgetResponse = await client.hget('user:1', 'name');
print('Received: $hgetResponse'); // Should be "Valkyrie"
print('Sending: HGETALL user:1');
final hgetAllResponse = await client.hgetall('user:1');
// Should be {name: Valkyrie, project: valkey_client}
print('Received Map: $hgetAllResponse');
// --- LIST (v0.6.0) ---
print('\n--- LIST (Queue/Stack) ---');
print("Sending: LPUSH mylist 'item1'");
await client.lpush('mylist', 'item1');
print("Sending: LPUSH mylist 'item2'");
final length = await client.lpush('mylist', 'item2');
print('Received list length: $length'); // Should be 2
print('Sending: LRANGE mylist 0 -1');
final listResponse = await client.lrange('mylist', 0, -1);
// Should be [item2, item1] (LPUSH prepends)
print('Received list: $listResponse');
print('Sending: RPOP mylist');
final poppedItem = await client.rpop('mylist');
// Should be "item1" (RPOP removes from the end)
print('Received popped item: $poppedItem');
// --- SET / SORTED SET (v0.7.0) ---
print('\n--- SET (Unique Tags) / SORTED SET (Leaderboard) ---');
print("Sending: SADD users:1:tags 'dart'");
await client.sadd('users:1:tags', 'dart');
print("Sending: SADD users:1:tags 'valkey'");
await client.sadd('users:1:tags', 'valkey');
print('Sending: SMEMBERS users:1:tags');
final tags = await client.smembers('users:1:tags');
print('Received tags (unordered): $tags'); // Should contain [dart, valkey]
print("Sending: ZADD leaderboard 100 'PlayerOne'");
await client.zadd('leaderboard', 100, 'PlayerOne');
print("Sending: ZADD leaderboard 150 'PlayerTwo'");
await client.zadd('leaderboard', 150, 'PlayerTwo');
print('Sending: ZRANGE leaderboard 0 -1'); // Get all players by score
final leaderboard = await client.zrange('leaderboard', 0, -1);
print('Received leaderboard (score low to high): '
'$leaderboard'); // Should be [PlayerOne, PlayerTwo]
// --- KEY MANAGEMENT (v0.8.0) ---
print('\n--- KEY MANAGEMENT (Expiration & Deletion) ---');
print('Sending: EXPIRE greeting 10'); // Expire the 'greeting' key in 10s
final expireResponse = await client.expire('greeting', 10);
print('Received (1=set, 0=not set): $expireResponse');
print('Sending: TTL greeting');
final ttlResponse = await client.ttl('greeting');
print('Received TTL (seconds, -1=no expire, -2=not exist): '
'$ttlResponse'); // Should be <= 10
print('Sending: DEL mylist'); // Delete the list key
final delResponse = await client.del('mylist');
print('Received (number of keys deleted): $delResponse'); // Should be 1
print('Sending: EXISTS mylist');
final existsResponse = await client.exists('mylist');
print('Received (1=exists, 0=not exist): $existsResponse'); // Should be 0
// --- TRANSACTIONS (v0.11.0) ---
print('\n--- TRANSACTIONS (Atomic Operations) ---');
try {
print('Sending: MULTI');
await client.multi(); // Start transaction
print("Queueing: SET tx:1 'hello'");
final setFuture = client.set('tx:1', 'hello'); // Queued
print('Queueing: INCR tx:counter');
final incrFuture = client.execute(['INCR', 'tx:counter']); // Queued
// Await queued responses (optional)
print('Awaited SET response: ${await setFuture}'); // Should be 'QUEUED'
print('Awaited INCR response: ${await incrFuture}'); // Should be 'QUEUED'
print('Sending: EXEC');
final execResponse = await client.exec(); // Execute transaction
print('Received EXEC results: $execResponse'); // Should be [OK, 1]
// Example of DISCARD (uncomment to test)
print('Sending: MULTI... SET... DISCARD');
await client.multi();
await client.set('tx:2', 'discarded');
await client.discard(); // Cancel transaction
print("Value of tx:2 (should be null): ${await client.get('tx:2')}");
} catch (e) {
print('❌ Transaction Failed: $e');
// Ensure transaction state is reset if something went wrong
try {
await client.discard();
} catch (_) {}
}
} catch (e) {
// Handle connection or authentication errors
print('❌ Failed: $e');
} finally {
// 3. Always close the connection
print('\nClosing connection...');
await client.close();
}
}
/// Main entry point to demonstrate connection patterns.
Future<void> main() async {
// ---
// See README.md for Docker instructions on how to run Valkey
// using the 3 different authentication options.
// ---
// Choose ONE of the following client configurations
// to match your server setup from the README.
const host = '127.0.0.1';
const port = 6379;
const username = 'default';
const password = 'my-super-secret-password';
// ====================================================================
// Configuration for README Option 1: No Authentication
// ====================================================================
// final fixedClient = ValkeyClient(
// host: host,
// port: port,
// );
// ====================================================================
// Configuration for README Option 2: Password Only
// ====================================================================
// final fixedClient = ValkeyClient(
// host: host,
// port: port,
// password: password,
// );
// ====================================================================
// Configuration for README Option 3: Username + Password (ACL)
// ====================================================================
final fixedClient = ValkeyClient(
host: host,
port: port,
username: username,
password: password,
);
print('=' * 40);
print('Running Example with Constructor Config (fixedClient)');
print('=' * 40);
// Using the 'fixedClient' configured above
await runCommandExamples(fixedClient);
// ====================================================================
// Advanced: Using the flexibleClient (Method Config)
// ====================================================================
// This pattern is useful if you need to connect to different
// servers using the same client instance.
print('\n' * 2);
print('=' * 40);
print('Running Example with Method Config (flexibleClient)');
print('=' * 40);
final flexibleClient = ValkeyClient(); // No config in constructor
// Create a reusable connection object (e.g., from a config file)
const config = (
host: host,
port: port,
username: username,
password: password,
);
// We must re-wrap the logic in a try/catch
// because runCommandExamples handles *command* errors,
// but this client *instance* needs to be closed.
try {
// Pass config directly to the connect() method
await flexibleClient.connect(
host: config.host,
port: config.port,
username: config.username,
password: config.password,
);
// Once connected, run the same command logic
await runCommandExamples(flexibleClient);
} catch (e) {
print('❌ (flexibleClient) Failed: $e');
} finally {
await flexibleClient.close(); // Close this specific client
}
// ====================================================================
// Pub/Sub Example (v0.9.0 / v0.9.1)
// ====================================================================
print('\n' * 2);
print('=' * 40);
print('Running Pub/Sub Example');
print('=' * 40);
// Use two clients: one to subscribe, one to publish
final subscriber = ValkeyClient(host: host, port: port);
final publisher = ValkeyClient(host: host, port: port);
StreamSubscription<ValkeyMessage>? listener; // Keep track of the listener
const channel = 'news:updates';
try {
await Future.wait([subscriber.connect(), publisher.connect()]);
print('✅ Subscriber and Publisher connected!');
print('\nSubscribing to channel: $channel');
// 1. Subscribe and get the Subscription object
final sub = subscriber
.subscribe([channel]); // Returns Subscription{messages, ready}
// --- NEW: Wait for subscription ready ---
print('Waiting for subscription confirmation...');
await sub.ready.timeout(const Duration(seconds: 2));
print('Subscription confirmed!');
// ------------------------------------
// 2. Listen to the message stream AFTER subscription is ready
listener = sub.messages.listen(
// Listen to sub.messages
(message) {
print('📬 Received: ${message.message} '
'(from channel: ${message.channel})');
},
onError: (Object? e) => print('❌ Stream Error: $e'),
onDone: () => print('ℹ️ Subscription stream closed.'),
);
// 3. Publish messages AFTER awaiting sub.ready
print("\nSending: PUBLISH $channel 'First update!'");
await publisher.publish(channel, 'First update!');
print("Sending: PUBLISH $channel 'Second update!'");
await publisher.publish(channel, 'Second update!');
// Wait a bit to receive messages
await Future<void>.delayed(const Duration(seconds: 1));
// 4. Clean up (Need UNSUBSCRIBE command in the future)
print('\nClosing connections (will stop subscription)...');
await listener.cancel(); // Cancel the stream listener
} catch (e) {
print('❌ Pub/Sub Example Failed: $e');
} finally {
// Ensure listener is cancelled even on error
await listener?.cancel();
// await subscriber.unsubscribe([channel]);
await Future.wait([subscriber.close(), publisher.close()]);
print('Pub/Sub clients closed.');
}
// --- ADVANCED PUB/SUB (v0.10.0) ---
await runPatternSubscriptionExample(
host: host,
port: port,
username: username,
password: password,
);
// --- PUBSUB INTROSPECTION (v0.12.0) ---
// (Note: These commands are usually run from a *different* client
// than the one that is subscribed, as a subscribed client can't
// run most normal commands.)
print('\n--- PUBSUB INTROSPECTION (Admin/Info) ---');
await runPubSubIntrospectionExample(
host: host,
port: port,
username: username,
password: password,
);
} // End of main
// ====================================================================
// Advanced Pub/Sub Example (v0.10.0) - Pattern Subscription
// ====================================================================
Future<void> runPatternSubscriptionExample({
// Pass in connection details from main
required String host,
required int port,
String? username,
String? password,
}) async {
print('\n' * 2);
print('=' * 40);
print('Running Advanced Pub/Sub Example (Pattern Subscription)');
print('=' * 40);
final subscriber = ValkeyClient(host: host, port: port);
final publisher = ValkeyClient(host: host, port: port);
StreamSubscription<ValkeyMessage>? listener;
try {
await Future.wait([subscriber.connect(), publisher.connect()]);
print('✅ Subscriber and Publisher connected!');
const pattern = 'log:*'; // Subscribe to all channels starting with 'log:'
const channelInfo = 'log:info';
const channelError = 'log:error';
print('\nPSubscribing to pattern: $pattern');
// 1. PSubscribe and wait for ready
final sub = subscriber.psubscribe([pattern]);
print('Waiting for psubscribe confirmation...');
await sub.ready.timeout(const Duration(seconds: 2));
print('PSubscription confirmed!');
// 2. Listen to messages
listener = sub.messages.listen(
(message) {
// Now message includes the pattern
print('📬 Received: ${message.message} (Pattern: ${message.pattern}, '
'Channel: ${message.channel})');
},
onError: (Object? e) => print('❌ Stream Error: $e'),
onDone: () => print('ℹ️ Subscription stream closed.'),
);
// Give the subscription a moment
await Future<void>.delayed(const Duration(milliseconds: 200));
// 3. Publish to channels matching the pattern
print("\nSending: PUBLISH $channelInfo 'Application started'");
await publisher.publish(channelInfo, 'Application started');
print("Sending: PUBLISH $channelError 'Critical error occurred!'");
await publisher.publish(channelError, 'Critical error occurred!');
// Wait a bit to receive messages
await Future<void>.delayed(const Duration(seconds: 1));
// 4. Unsubscribe from the pattern
print('\nPUnsubscribing from pattern: $pattern');
await subscriber.punsubscribe([pattern]);
await Future<void>.delayed(
const Duration(milliseconds: 200)); // Allow processing
// 5. Publish again (should not be received)
print(
"Sending: PUBLISH $channelInfo 'This message should NOT be received'");
await publisher.publish(channelInfo, 'This message should NOT be received');
await Future<void>.delayed(const Duration(seconds: 1));
} catch (e) {
print('❌ Advanced Pub/Sub Example Failed: $e');
} finally {
// Ensure listener is cancelled even on error
await listener?.cancel();
await Future.wait([subscriber.close(), publisher.close()]);
print('Advanced Pub/Sub clients closed.');
}
}
// ====================================================================
// Pub/Sub Introspection Example (v0.12.0)
// ====================================================================
Future<void> runPubSubIntrospectionExample({
// Pass in connection details from main
required String host,
required int port,
String? username,
String? password,
}) async {
print('\n' * 2);
print('=' * 40);
print('Running Pub/Sub Introspection Example');
print('=' * 40);
// Use the config from main() for both clients
final adminClient = ValkeyClient(
host: host,
port: port,
username: username,
password: password,
);
final subClient = ValkeyClient(
host: host,
port: port,
username: username,
password: password,
);
StreamSubscription? listener;
StreamSubscription? pListener;
try {
// Connect both clients
await Future.wait([
adminClient.connect(),
subClient.connect(), // We need a subscriber client to check against
]);
print('✅ Admin and Subscriber clients connected!');
// Subscribe to a channel and a pattern
// Create a subscription on subClient
const channelName = 'inspect'; // or admin
final sub = subClient.subscribe(['channel:$channelName']);
final psub = subClient.psubscribe(['log:*']);
await Future.wait([sub.ready, psub.ready]);
listener = sub.messages.listen(null); // Keep subscription active
pListener = psub.messages.listen(null);
await Future<void>.delayed(
const Duration(milliseconds: 50)); // Give server time
// Run introspection commands on adminClient
print("Sending: PUBSUB CHANNELS 'channel:*'");
final channels = await adminClient.pubsubChannels('channel:*');
print('Received active channels: $channels');
// e.g., Should be [channel:inspect]
print("Sending: PUBSUB NUMSUB 'channel:$channelName'");
final numsub = await adminClient.pubsubNumSub(['channel:$channelName']);
print('Received subscriber count: $numsub');
// e.g., Should be {channel:inspect: 1}
print('Sending: PUBSUB NUMPAT');
final numpat = await adminClient.pubsubNumPat();
print('Received pattern subscription count: $numpat'); // Should be 1
} catch (e) {
print('❌ Pub/Sub Introspection Example Failed: $e');
} finally {
// Clean up the subscriber client
await listener?.cancel();
await pListener?.cancel();
await Future.wait([adminClient.close(), subClient.close()]);
print('Introspection clients closed.');
}
}