MCP Knowledge Ops

Support This Project

If you find this package useful, consider supporting ongoing development on PayPal.

Donate Support makemind via PayPal


MCP Knowledge Package Family

  • mcp_bundle: Bundle schema, loader, validator, and expression language for MCP ecosystem.
  • mcp_fact_graph: Temporal knowledge graph with evidence-based fact management and summarization.
  • mcp_skill: Skill definitions and runtime execution for AI capabilities.
  • mcp_profile: Profile definitions for AI personas with template rendering and appraisal.
  • mcp_knowledge_ops: Knowledge operations including pipelines, workflows, and scheduling.
  • mcp_knowledge: Unified integration package for the complete knowledge system.

A powerful Dart package for knowledge operations including pipelines, workflows, scheduling, and automated knowledge processing. Part of the MakeMind MCP ecosystem.

Features

Core Features

  • Pipelines: Define and execute multi-stage processing pipelines
  • Workflows: Complex workflow orchestration with branching and conditions
  • Scheduling: Automated job scheduling with cron-like expressions
  • Operations: Built-in operations for curation, summarization, and pattern mining

Pipeline Model

  • Stages: Sequential processing stages
  • Checkpoints: Save and restore pipeline state
  • Metrics: Execution metrics and monitoring
  • Error Handling: Retry logic and error recovery

Workflow Model

  • Actions: Discrete workflow actions
  • Conditions: Conditional branching
  • Variables: Workflow-scoped variables
  • Events: Event-driven execution

Built-in Operations

  • Curation: Candidate review and confirmation
  • Summarization: Entity summary generation
  • Pattern Mining: Pattern discovery in knowledge
  • Index Refresh: Knowledge index maintenance

Quick Start

Installation

Add to your pubspec.yaml:

dependencies:
  mcp_knowledge_ops: ^0.1.0

Basic Usage

import 'package:mcp_knowledge_ops/mcp_knowledge_ops.dart';

void main() async {
  // Create pipeline executor
  final executor = PipelineExecutor();

  // Define a curation pipeline
  final pipeline = Pipeline(
    id: 'daily_curation',
    name: 'Daily Curation',
    stages: [
      PipelineStage(
        id: 'fetch_candidates',
        type: StageType.fetch,
        config: {'status': 'pending', 'limit': 100},
      ),
      PipelineStage(
        id: 'auto_confirm',
        type: StageType.process,
        config: {'threshold': 0.95},
      ),
      PipelineStage(
        id: 'notify',
        type: StageType.output,
        config: {'channel': 'slack'},
      ),
    ],
  );

  // Execute pipeline
  final result = await executor.execute(
    pipeline,
    CurationInput(entityIds: ['user_123']),
    PipelineContext(runId: 'run_001'),
  );

  print('Success: ${result.success}');
  print('Metrics: ${result.metrics}');
}

Core Concepts

Pipelines

Pipelines define sequential processing stages:

final pipeline = Pipeline(
  id: 'summarization_pipeline',
  name: 'Entity Summarization',
  stages: [
    PipelineStage(
      id: 'fetch_facts',
      type: StageType.fetch,
      config: {'domain': 'preferences'},
    ),
    PipelineStage(
      id: 'generate_summary',
      type: StageType.process,
      config: {'model': 'claude-3-5-sonnet'},
    ),
    PipelineStage(
      id: 'store_summary',
      type: StageType.output,
      config: {'collection': 'summaries'},
    ),
  ],
  checkpointEnabled: true,
  retryPolicy: RetryPolicy(maxRetries: 3),
);

Workflows

Workflows orchestrate complex multi-step processes:

final workflow = Workflow(
  id: 'onboarding_workflow',
  name: 'User Onboarding',
  actions: [
    WorkflowAction(
      id: 'extract_preferences',
      type: ActionType.skill,
      skillId: 'preference_extractor',
    ),
    WorkflowAction(
      id: 'check_premium',
      type: ActionType.condition,
      condition: '{{user.isPremium}}',
      onTrue: 'premium_setup',
      onFalse: 'basic_setup',
    ),
    WorkflowAction(
      id: 'premium_setup',
      type: ActionType.skill,
      skillId: 'premium_onboarding',
    ),
    WorkflowAction(
      id: 'basic_setup',
      type: ActionType.skill,
      skillId: 'basic_onboarding',
    ),
  ],
);

// Execute workflow
final workflowExecutor = WorkflowExecutor(handlers: handlers);
final execution = await workflowExecutor.execute(
  workflow,
  {'user': userContext},
);

Scheduling

Schedule automated jobs:

final scheduler = Scheduler(
  config: SchedulerConfig(
    timezone: 'UTC',
    maxConcurrentJobs: 5,
  ),
);

// Schedule daily curation
await scheduler.schedule(
  ScheduledJob(
    id: 'daily_curation',
    name: 'Daily Candidate Curation',
    schedule: CronSchedule('0 6 * * *'), // 6 AM daily
    pipeline: curationPipeline,
    input: CurationInput(),
  ),
);

// Schedule hourly summarization
await scheduler.schedule(
  ScheduledJob(
    id: 'hourly_summary',
    name: 'Hourly Summary Refresh',
    schedule: CronSchedule('0 * * * *'), // Every hour
    pipeline: summarizationPipeline,
    input: SummarizationInput(staleOnly: true),
  ),
);

// Start scheduler
await scheduler.start();

Built-in Operations

Curation

final curationPipeline = CurationPipeline(
  factGraph: factGraphPort,
);

final result = await executor.execute(
  curationPipeline,
  CurationInput(
    entityIds: ['user_123', 'user_456'],
    autoConfirmThreshold: 0.95,
  ),
  context,
);

print('Confirmed: ${result.output.confirmedCount}');
print('Rejected: ${result.output.rejectedCount}');

Summarization

final summarizationPipeline = SummarizationPipeline(
  factGraph: factGraphPort,
  llm: llmPort,
);

final result = await executor.execute(
  summarizationPipeline,
  SummarizationInput(
    entityIds: ['user_123'],
    summaryTypes: ['preferences', 'history'],
  ),
  context,
);

print('Updated: ${result.output.updatedSummaries}');

Pattern Mining

final patternPipeline = PatternMiningPipeline(
  factGraph: factGraphPort,
  llm: llmPort,
);

final result = await executor.execute(
  patternPipeline,
  PatternMiningInput(
    domain: 'preferences',
    minSupport: 0.1,
  ),
  context,
);

print('Patterns: ${result.output.patterns}');

Port-Based Architecture

The package uses a port-based dependency injection pattern:

// Define custom operation port
class MyOpsPort implements OpsPort {
  @override
  Future<void> executeOperation(Operation op) async {
    // Your implementation
  }
}

// Use custom ports
final executor = PipelineExecutor(
  opsPort: MyOpsPort(),
);

API Reference

PipelineExecutor

Method Description
execute(pipeline, input, context) Execute a pipeline
pause(runId) Pause pipeline execution
resume(runId) Resume paused pipeline
cancel(runId) Cancel pipeline

WorkflowExecutor

Method Description
execute(workflow, context) Execute a workflow
getExecution(id) Get execution status
listExecutions(filter) List executions

Scheduler

Method Description
schedule(job) Schedule a job
unschedule(jobId) Remove a job
start() Start scheduler
stop() Stop scheduler

Examples

Complete Examples Available

  • example/basic_pipeline.dart - Basic pipeline execution
  • example/workflow.dart - Workflow orchestration
  • example/scheduling.dart - Job scheduling
  • example/curation.dart - Curation pipeline
  • example/summarization.dart - Summarization pipeline

Contributing

We welcome contributions! Please see our Contributing Guide for details.

Support

License

This project is licensed under the MIT License - see the LICENSE file for details.

Libraries

mcp_knowledge_ops
MCP Knowledge Operations - Orchestration layer for MakeMind.