createSubscription method

Future<Subscription> createSubscription({
  1. int? ackDeadlineSeconds,
  2. DeadLetterPolicy? deadLetterPolicy,
  3. bool? enableMessageOrdering,
  4. ExpirationPolicy? expirationPolicy,
  5. String? filter,
  6. Map<String, String>? labels,
  7. Duration? messageRetentionDuration,
  8. PushConfig? pushConfig,
  9. bool? retainAckedMessages,
  10. int retries = 5,
  11. RetryPolicy? retryPolicy,
  12. String? subscription,
  13. required String topic,
  14. Duration? topicMessageRetentionDuration,
})

Creates a subscription to a given topic. See the resource name rules. If the subscription already exists, returns ALREADY_EXISTS. If the corresponding topic doesn't exist, returns NOT_FOUND.

If the name is not provided in the request, the server will assign a random name for this subscription on the same project as the topic, conforming to the resource name format. The generated name is populated in the returned Subscription object. Note that for REST API requests, you must specify a name in the request.

If provided, the subscription name can be just the simple name or it can be the fully quantified name in the format: projects/{project}/subscriptions/{subscription}.

The topic can be just the simple name or it can be the fully quantified name in the format: projects/{project}/topics/{topic}.

If push delivery is used with this subscription, the pushConfig is used to configure it. An empty pushConfig signifies that the subscriber will pull and ack messages using API methods.

For more information on the options, see the official documentation here: https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.Subscription

Implementation

Future<Subscription> createSubscription({
  int? ackDeadlineSeconds,
  DeadLetterPolicy? deadLetterPolicy,
  bool? enableMessageOrdering,
  ExpirationPolicy? expirationPolicy,
  String? filter,
  Map<String, String>? labels,
  Duration? messageRetentionDuration,
  PushConfig? pushConfig,
  bool? retainAckedMessages,
  int retries = 5,
  RetryPolicy? retryPolicy,
  String? subscription,
  required String topic,
  Duration? topicMessageRetentionDuration,
}) async {
  assert(_initialized);
  _logger.fine('[createSubscription]: start -- [$subscription]');

  try {
    return await _execute(
      executor: () async {
        final result = await _subscriberClient.createSubscription(
          Subscription(
            ackDeadlineSeconds: ackDeadlineSeconds,
            deadLetterPolicy: deadLetterPolicy,
            enableMessageOrdering: enableMessageOrdering,
            expirationPolicy: expirationPolicy,
            filter: filter,
            labels: labels,
            messageRetentionDuration: messageRetentionDuration == null
                ? null
                : GrpcProtobufConvert.toDuration(messageRetentionDuration),
            name: subscription == null
                ? null
                : subscription.startsWith('projects/')
                    ? subscription
                    : 'projects/$_projectId/subscriptions/$subscription',
            pushConfig: pushConfig,
            retainAckedMessages: retainAckedMessages,
            retryPolicy: retryPolicy,
            topic: topic.startsWith('projects/')
                ? topic
                : 'projects/$_projectId/topics/$topic',
            topicMessageRetentionDuration:
                topicMessageRetentionDuration == null
                    ? null
                    : GrpcProtobufConvert.toDuration(
                        topicMessageRetentionDuration),
          ),
        );

        return result;
      },
      retries: retries,
    );
  } finally {
    _logger.fine('[createSubscription]: complete -- [$subscription]');
  }
}