Skip to content

dagengineAI Workflow Orchestration

Production-ready AI pipelines with intelligent dependency management and zero complexity

Build AI Workflows in Minutes

typescript
import { DagEngine, Plugin, type PromptContext, type ProviderSelection } from '@dagengine/core';

class ReviewAnalyzer extends Plugin {
	constructor() {
		super('analyzer', 'Review Analyzer', 'Analyzes reviews');
		this.dimensions = ['sentiment', 'topics', 'summary'];
	}

	defineDependencies(): Record<string, string[]> {
		return { summary: ['sentiment', 'topics'] };
	}

	createPrompt(context: PromptContext): string {
		const content = context.sections[0]?.content || '';

		if (context.dimension === 'sentiment') {
			return `Analyze sentiment: "${content}"
Return JSON: {"sentiment": "positive|negative|neutral", "score": 0-1}`;
		}

		if (context.dimension === 'topics') {
			return `Extract topics: "${content}"
Return JSON: {"topics": ["topic1", "topic2"]}`;
		}

		if (context.dimension === 'summary') {
			const sentiment = context.dependencies.sentiment?.data as { sentiment: string };
			const topics = context.dependencies.topics?.data as { topics: string[] };
			return `Create ${sentiment.sentiment} summary covering: ${topics.topics.join(', ')}`;
		}

		throw new Error(`Unknown dimension: ${ctx.dimension}`);
	}

	selectProvider(): ProviderSelection {
		return {
			provider: 'anthropic',
			options: { model: 'claude-3-5-haiku-20241022' }
		};
	}
}

const engine = new DagEngine({
	plugin: new ReviewAnalyzer(),
	providers: { anthropic: { apiKey: process.env.ANTHROPIC_API_KEY } }
});

engine.process([
	{ content: 'Amazing product! Highly recommended.', metadata: {} }
]).then((result) => {
	console.log(JSON.stringify(result.sections[0]?.results, null, 2));
});

Automatic Execution: sentiment + topics run in parallel → summary waits for both → all reviews processed simultaneously

Key Capabilities

Intelligent Parallelization

Define task dependencies once. The engine automatically calculates optimal execution order and runs independent tasks in parallel.

typescript
defineDependencies(): Record<string, string[]> {
  return {
    sentiment: [],           // No dependencies
    topics: [],              // No dependencies  
    summary: ['sentiment', 'topics']  // Waits for both
  };
}

Cost Optimization

Skip expensive analysis on low-value content. Route different tasks to different models based on complexity.

typescript
shouldSkipSectionDimension(ctx: SkipContext): boolean {
  if (ctx.dimension === 'deep_analysis') {
    const quality = ctx.dependencies.quality_check?.data as { score: number };
    return quality.score < 0.7;  // Skip low-quality items
  }
  return false;
}

selectProvider(dimension: string): ProviderSelection {
  if (dimension === 'quality_check') {
    return {
      provider: 'anthropic',
      options: { model: 'claude-3-5-haiku-20241022' }  // Cheap model
    };
  }
  return {
    provider: 'anthropic',
    options: { model: 'claude-3-7-sonnet-20250219' }  // Expensive model
  };
}

Result: 100 items → 40 high-quality → 60% cost reduction

Data Transformations

Reshape sections between processing stages. Group 100 reviews into 5 categories, then analyze categories.

typescript
transformSections(ctx: TransformContext): Section[] | undefined {
  if (ctx.dimension === 'analyze_categories') {
    const categories = ctx.result.data as { categories: Array<{ name: string; items: string[] }> };
    
    return categories.categories.map(cat => ({
      content: cat.items.join('\n\n'),
      metadata: { category: cat.name, count: cat.items.length }
    }));
  }
}

Result: 100 analyses → 5 analyses (95% fewer API calls)

Error Recovery

Automatic retry with exponential backoff. Provider fallback when failures occur. Graceful degradation with partial results.

typescript
selectProvider(dimension: string): ProviderSelection {
  return {
    provider: 'anthropic',
    options: { model: 'claude-3-7-sonnet-20250219' },
    fallbacks: [
      { provider: 'openai', options: { model: 'gpt-4o' } },
      { provider: 'gemini', options: { model: 'gemini-2.5-pro' } }
    ]
  };
}

Production Features

Multi-Provider Support

Route different tasks to different AI providers. Use Claude for filtering, GPT-4 for analysis, Gemini for synthesis.

Real-Time Cost Tracking

Track token usage and costs per dimension and provider. Export detailed breakdowns with results.

External Integration

All hooks support async/await. Integrate Redis caching, PostgreSQL logging, external APIs seamlessly.

Gateway Support

Built-in Portkey integration for advanced retry policies, load balancing, and semantic caching.

Get Started

Released under the MIT License.