valkey_client 2.3.0 copy "valkey_client: ^2.3.0" to clipboard
valkey_client: ^2.3.0 copied to clipboard

A high-performance, cluster-aware Dart client for Redis and Valkey.

example/valkey_client_example.dart

/*
 * Copyright 2025-2026 Infradise Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import 'dart:async';

import 'package:valkey_client/valkey_client.dart';

// Here is a basic example of how to connect and close the client.
// For more examples, check the `/example` folder.

/// This function contains all the command examples.
/// It accepts any client that implements [ValkeyClientBase],
/// demonstrating how robust the interface is.
Future<void> runCommandExamples(ValkeyClientBase client) async {
  try {
    // 1. Connect and authenticate
    // The client will use whichever configuration it was given.
    await client.connect();
    print('✅ Connection successful!');

    // --- PING (v0.2.0) ---
    print('\n--- PING ---');
    print("Sending: PING 'Hello'");
    final pingResponse = await client.ping('Hello');
    print('Received: $pingResponse');

    // --- SET/GET (v0.3.0) ---
    print('\n--- SET/GET ---');
    print("Sending: SET greeting 'Hello, Valkey!'");
    final setResponse = await client.set('greeting', 'Hello, Valkey!');
    print('Received: $setResponse');

    print('Sending: GET greeting');
    final getResponse = await client.get('greeting');
    print('Received: $getResponse');

    // --- MGET (v0.4.0) ---
    print('\n--- MGET (Array Parsing) ---');
    print('Sending: MGET greeting non_existent_key');
    final mgetResponse = await client.mget(['greeting', 'non_existent_key']);
    print('Received: $mgetResponse'); // Should be "[Hello, Valkey!, null]"

    // --- HASH (v0.5.0) ---
    print('\n--- HASH (Map/Object) ---');
    print("Sending: HSET user:1 name 'Valkyrie'");
    final hsetResponse = await client.hset('user:1', 'name', 'Valkyrie');
    print('Received (1=new, 0=update): $hsetResponse');

    print("Sending: HSET user:1 project 'valkey_client'");
    await client.hset('user:1', 'project', 'valkey_client');

    print('Sending: HGET user:1 name');
    final hgetResponse = await client.hget('user:1', 'name');
    print('Received: $hgetResponse'); // Should be "Valkyrie"

    print('Sending: HGETALL user:1');
    final hgetAllResponse = await client.hgetall('user:1');
    // Should be {name: Valkyrie, project: valkey_client}
    print('Received Map: $hgetAllResponse');

    // --- LIST (v0.6.0) ---
    print('\n--- LIST (Queue/Stack) ---');
    print("Sending: LPUSH mylist 'item1'");
    await client.lpush('mylist', 'item1');
    print("Sending: LPUSH mylist 'item2'");
    final length = await client.lpush('mylist', 'item2');
    print('Received list length: $length'); // Should be 2

    print('Sending: LRANGE mylist 0 -1');
    final listResponse = await client.lrange('mylist', 0, -1);
    // Should be [item2, item1] (LPUSH prepends)
    print('Received list: $listResponse');

    print('Sending: RPOP mylist');
    final poppedItem = await client.rpop('mylist');
    // Should be "item1" (RPOP removes from the end)
    print('Received popped item: $poppedItem');

    // --- SET / SORTED SET (v0.7.0) ---
    print('\n--- SET (Unique Tags) / SORTED SET (Leaderboard) ---');
    print("Sending: SADD users:1:tags 'dart'");
    await client.sadd('users:1:tags', 'dart');
    print("Sending: SADD users:1:tags 'valkey'");
    await client.sadd('users:1:tags', 'valkey');

    print('Sending: SMEMBERS users:1:tags');
    final tags = await client.smembers('users:1:tags');
    print('Received tags (unordered): $tags'); // Should contain [dart, valkey]

    print("Sending: ZADD leaderboard 100 'PlayerOne'");
    await client.zadd('leaderboard', 100, 'PlayerOne');
    print("Sending: ZADD leaderboard 150 'PlayerTwo'");
    await client.zadd('leaderboard', 150, 'PlayerTwo');

    print('Sending: ZRANGE leaderboard 0 -1'); // Get all players by score
    final leaderboard = await client.zrange('leaderboard', 0, -1);
    print('Received leaderboard (score low to high): '
        '$leaderboard'); // Should be [PlayerOne, PlayerTwo]

    // --- KEY MANAGEMENT (v0.8.0) ---
    print('\n--- KEY MANAGEMENT (Expiration & Deletion) ---');
    print('Sending: EXPIRE greeting 10'); // Expire the 'greeting' key in 10s
    final expireResponse = await client.expire('greeting', 10);
    print('Received (1=set, 0=not set): $expireResponse');

    print('Sending: TTL greeting');
    final ttlResponse = await client.ttl('greeting');
    print('Received TTL (seconds, -1=no expire, -2=not exist): '
        '$ttlResponse'); // Should be <= 10

    print('Sending: DEL mylist'); // Delete the list key
    final delResponse = await client.del('mylist');
    print('Received (number of keys deleted): $delResponse'); // Should be 1

    print('Sending: EXISTS mylist');
    final existsResponse = await client.exists('mylist');
    print('Received (1=exists, 0=not exist): $existsResponse'); // Should be 0

    // --- TRANSACTIONS (v0.11.0) ---
    print('\n--- TRANSACTIONS (Atomic Operations) ---');
    try {
      print('Sending: MULTI');
      await client.multi(); // Start transaction

      print("Queueing: SET tx:1 'hello'");
      final setFuture = client.set('tx:1', 'hello'); // Queued
      print('Queueing: INCR tx:counter');
      final incrFuture = client.execute(['INCR', 'tx:counter']); // Queued

      // Await queued responses (optional)
      print('Awaited SET response: ${await setFuture}'); // Should be 'QUEUED'
      print('Awaited INCR response: ${await incrFuture}'); // Should be 'QUEUED'

      print('Sending: EXEC');
      final execResponse = await client.exec(); // Execute transaction

      print('Received EXEC results: $execResponse'); // Should be [OK, 1]

      // Example of DISCARD (uncomment to test)
      print('Sending: MULTI... SET... DISCARD');
      await client.multi();
      await client.set('tx:2', 'discarded');
      await client.discard(); // Cancel transaction
      print("Value of tx:2 (should be null): ${await client.get('tx:2')}");
    } catch (e) {
      print('❌ Transaction Failed: $e');
      // Ensure transaction state is reset if something went wrong
      try {
        await client.discard();
      } catch (_) {}
    }
  } catch (e) {
    // Handle connection or authentication errors
    print('❌ Failed: $e');
  } finally {
    // 3. Always close the connection
    print('\nClosing connection...');
    await client.close();
  }
}

/// Main entry point to demonstrate connection patterns.
Future<void> main() async {
  // ---
  // See README.md for Docker instructions on how to run Valkey
  // using the 3 different authentication options.
  // ---

  // Choose ONE of the following client configurations
  // to match your server setup from the README.

  const host = '127.0.0.1';
  const port = 6379;
  const username = 'default';
  const password = 'my-super-secret-password';

  // ====================================================================
  // Configuration for README Option 1: No Authentication
  // ====================================================================
  // final fixedClient = ValkeyClient(
  //   host: host,
  //   port: port,
  // );

  // ====================================================================
  // Configuration for README Option 2: Password Only
  // ====================================================================
  // final fixedClient = ValkeyClient(
  //   host: host,
  //   port: port,
  //   password: password,
  // );

  // ====================================================================
  // Configuration for README Option 3: Username + Password (ACL)
  // ====================================================================
  final fixedClient = ValkeyClient(
    host: host,
    port: port,
    username: username,
    password: password,
  );

  print('=' * 40);
  print('Running Example with Constructor Config (fixedClient)');
  print('=' * 40);
  // Using the 'fixedClient' configured above
  await runCommandExamples(fixedClient);

  // ====================================================================
  // Advanced: Using the flexibleClient (Method Config)
  // ====================================================================
  // This pattern is useful if you need to connect to different
  // servers using the same client instance.

  print('\n' * 2);
  print('=' * 40);
  print('Running Example with Method Config (flexibleClient)');
  print('=' * 40);

  final flexibleClient = ValkeyClient(); // No config in constructor

  // Create a reusable connection object (e.g., from a config file)
  const config = (
    host: host,
    port: port,
    username: username,
    password: password,
  );

  // We must re-wrap the logic in a try/catch
  // because runCommandExamples handles *command* errors,
  // but this client *instance* needs to be closed.
  try {
    // Pass config directly to the connect() method
    await flexibleClient.connect(
      host: config.host,
      port: config.port,
      username: config.username,
      password: config.password,
    );
    // Once connected, run the same command logic
    await runCommandExamples(flexibleClient);
  } catch (e) {
    print('❌ (flexibleClient) Failed: $e');
  } finally {
    await flexibleClient.close(); // Close this specific client
  }

  // ====================================================================
  // Pub/Sub Example (v0.9.0 / v0.9.1)
  // ====================================================================
  print('\n' * 2);
  print('=' * 40);
  print('Running Pub/Sub Example');
  print('=' * 40);

  // Use two clients: one to subscribe, one to publish
  final subscriber = ValkeyClient(host: host, port: port);
  final publisher = ValkeyClient(host: host, port: port);
  StreamSubscription<ValkeyMessage>? listener; // Keep track of the listener
  const channel = 'news:updates';

  try {
    await Future.wait([subscriber.connect(), publisher.connect()]);
    print('✅ Subscriber and Publisher connected!');

    print('\nSubscribing to channel: $channel');

    // 1. Subscribe and get the Subscription object
    final sub = subscriber
        .subscribe([channel]); // Returns Subscription{messages, ready}

    // --- NEW: Wait for subscription ready ---
    print('Waiting for subscription confirmation...');
    await sub.ready.timeout(const Duration(seconds: 2));
    print('Subscription confirmed!');
    // ------------------------------------

    // 2. Listen to the message stream AFTER subscription is ready
    listener = sub.messages.listen(
      // Listen to sub.messages
      (message) {
        print('📬 Received: ${message.message} '
            '(from channel: ${message.channel})');
      },
      onError: (Object? e) => print('❌ Stream Error: $e'),
      onDone: () => print('ℹ️ Subscription stream closed.'),
    );

    // 3. Publish messages AFTER awaiting sub.ready
    print("\nSending: PUBLISH $channel 'First update!'");
    await publisher.publish(channel, 'First update!');

    print("Sending: PUBLISH $channel 'Second update!'");
    await publisher.publish(channel, 'Second update!');

    // Wait a bit to receive messages
    await Future<void>.delayed(const Duration(seconds: 1));

    // 4. Clean up (Need UNSUBSCRIBE command in the future)
    print('\nClosing connections (will stop subscription)...');
    await listener.cancel(); // Cancel the stream listener
  } catch (e) {
    print('❌ Pub/Sub Example Failed: $e');
  } finally {
    // Ensure listener is cancelled even on error
    await listener?.cancel();
    // await subscriber.unsubscribe([channel]);
    await Future.wait([subscriber.close(), publisher.close()]);
    print('Pub/Sub clients closed.');
  }

  // --- ADVANCED PUB/SUB (v0.10.0) ---
  await runPatternSubscriptionExample(
    host: host,
    port: port,
    username: username,
    password: password,
  );

  // --- PUBSUB INTROSPECTION (v0.12.0) ---
  // (Note: These commands are usually run from a *different* client
  // than the one that is subscribed, as a subscribed client can't
  // run most normal commands.)
  print('\n--- PUBSUB INTROSPECTION (Admin/Info) ---');
  await runPubSubIntrospectionExample(
    host: host,
    port: port,
    username: username,
    password: password,
  );
} // End of main

// ====================================================================
// Advanced Pub/Sub Example (v0.10.0) - Pattern Subscription
// ====================================================================
Future<void> runPatternSubscriptionExample({
  // Pass in connection details from main
  required String host,
  required int port,
  String? username,
  String? password,
}) async {
  print('\n' * 2);
  print('=' * 40);
  print('Running Advanced Pub/Sub Example (Pattern Subscription)');
  print('=' * 40);

  final subscriber = ValkeyClient(host: host, port: port);
  final publisher = ValkeyClient(host: host, port: port);
  StreamSubscription<ValkeyMessage>? listener;

  try {
    await Future.wait([subscriber.connect(), publisher.connect()]);
    print('✅ Subscriber and Publisher connected!');

    const pattern = 'log:*'; // Subscribe to all channels starting with 'log:'
    const channelInfo = 'log:info';
    const channelError = 'log:error';

    print('\nPSubscribing to pattern: $pattern');

    // 1. PSubscribe and wait for ready
    final sub = subscriber.psubscribe([pattern]);
    print('Waiting for psubscribe confirmation...');
    await sub.ready.timeout(const Duration(seconds: 2));
    print('PSubscription confirmed!');

    // 2. Listen to messages
    listener = sub.messages.listen(
      (message) {
        // Now message includes the pattern
        print('📬 Received: ${message.message} (Pattern: ${message.pattern}, '
            'Channel: ${message.channel})');
      },
      onError: (Object? e) => print('❌ Stream Error: $e'),
      onDone: () => print('ℹ️ Subscription stream closed.'),
    );

    // Give the subscription a moment
    await Future<void>.delayed(const Duration(milliseconds: 200));

    // 3. Publish to channels matching the pattern
    print("\nSending: PUBLISH $channelInfo 'Application started'");
    await publisher.publish(channelInfo, 'Application started');

    print("Sending: PUBLISH $channelError 'Critical error occurred!'");
    await publisher.publish(channelError, 'Critical error occurred!');

    // Wait a bit to receive messages
    await Future<void>.delayed(const Duration(seconds: 1));

    // 4. Unsubscribe from the pattern
    print('\nPUnsubscribing from pattern: $pattern');
    await subscriber.punsubscribe([pattern]);
    await Future<void>.delayed(
        const Duration(milliseconds: 200)); // Allow processing

    // 5. Publish again (should not be received)
    print(
        "Sending: PUBLISH $channelInfo 'This message should NOT be received'");
    await publisher.publish(channelInfo, 'This message should NOT be received');
    await Future<void>.delayed(const Duration(seconds: 1));
  } catch (e) {
    print('❌ Advanced Pub/Sub Example Failed: $e');
  } finally {
    // Ensure listener is cancelled even on error
    await listener?.cancel();
    await Future.wait([subscriber.close(), publisher.close()]);
    print('Advanced Pub/Sub clients closed.');
  }
}

// ====================================================================
// Pub/Sub Introspection Example (v0.12.0)
// ====================================================================
Future<void> runPubSubIntrospectionExample({
  // Pass in connection details from main
  required String host,
  required int port,
  String? username,
  String? password,
}) async {
  print('\n' * 2);
  print('=' * 40);
  print('Running Pub/Sub Introspection Example');
  print('=' * 40);

  // Use the config from main() for both clients
  final adminClient = ValkeyClient(
    host: host,
    port: port,
    username: username,
    password: password,
  );
  final subClient = ValkeyClient(
    host: host,
    port: port,
    username: username,
    password: password,
  );
  StreamSubscription? listener;
  StreamSubscription? pListener;

  try {
    // Connect both clients
    await Future.wait([
      adminClient.connect(),
      subClient.connect(), // We need a subscriber client to check against
    ]);
    print('✅ Admin and Subscriber clients connected!');

    // Subscribe to a channel and a pattern
    // Create a subscription on subClient
    const channelName = 'inspect'; // or admin
    final sub = subClient.subscribe(['channel:$channelName']);
    final psub = subClient.psubscribe(['log:*']);
    await Future.wait([sub.ready, psub.ready]);
    listener = sub.messages.listen(null); // Keep subscription active
    pListener = psub.messages.listen(null);
    await Future<void>.delayed(
        const Duration(milliseconds: 50)); // Give server time

    // Run introspection commands on adminClient
    print("Sending: PUBSUB CHANNELS 'channel:*'");
    final channels = await adminClient.pubsubChannels('channel:*');
    print('Received active channels: $channels');
    // e.g., Should be [channel:inspect]

    print("Sending: PUBSUB NUMSUB 'channel:$channelName'");
    final numsub = await adminClient.pubsubNumSub(['channel:$channelName']);
    print('Received subscriber count: $numsub');
    // e.g., Should be {channel:inspect: 1}

    print('Sending: PUBSUB NUMPAT');
    final numpat = await adminClient.pubsubNumPat();
    print('Received pattern subscription count: $numpat'); // Should be 1
  } catch (e) {
    print('❌ Pub/Sub Introspection Example Failed: $e');
  } finally {
    // Clean up the subscriber client
    await listener?.cancel();
    await pListener?.cancel();
    await Future.wait([adminClient.close(), subClient.close()]);
    print('Introspection clients closed.');
  }
}
3
likes
160
points
665
downloads

Publisher

verified publisherk8spodo.dev

Weekly Downloads

A high-performance, cluster-aware Dart client for Redis and Valkey.

Repository (GitHub)
View/report issues

Documentation

API reference

License

Apache-2.0 (license)

More

Packages that depend on valkey_client