Zero Infrastructure
Define dependencies, get automatic parallelization. No queues, workers, or orchestration logic required.
Production-ready AI pipelines with intelligent dependency management and zero complexity
import { DagEngine, Plugin, type PromptContext, type ProviderSelection } from '@dagengine/core';
class ReviewAnalyzer extends Plugin {
constructor() {
super('analyzer', 'Review Analyzer', 'Analyzes reviews');
this.dimensions = ['sentiment', 'topics', 'summary'];
}
defineDependencies(): Record<string, string[]> {
return { summary: ['sentiment', 'topics'] };
}
createPrompt(context: PromptContext): string {
const content = context.sections[0]?.content || '';
if (context.dimension === 'sentiment') {
return `Analyze sentiment: "${content}"
Return JSON: {"sentiment": "positive|negative|neutral", "score": 0-1}`;
}
if (context.dimension === 'topics') {
return `Extract topics: "${content}"
Return JSON: {"topics": ["topic1", "topic2"]}`;
}
if (context.dimension === 'summary') {
const sentiment = context.dependencies.sentiment?.data as { sentiment: string };
const topics = context.dependencies.topics?.data as { topics: string[] };
return `Create ${sentiment.sentiment} summary covering: ${topics.topics.join(', ')}`;
}
throw new Error(`Unknown dimension: ${ctx.dimension}`);
}
selectProvider(): ProviderSelection {
return {
provider: 'anthropic',
options: { model: 'claude-3-5-haiku-20241022' }
};
}
}
const engine = new DagEngine({
plugin: new ReviewAnalyzer(),
providers: { anthropic: { apiKey: process.env.ANTHROPIC_API_KEY } }
});
engine.process([
{ content: 'Amazing product! Highly recommended.', metadata: {} }
]).then((result) => {
console.log(JSON.stringify(result.sections[0]?.results, null, 2));
});Automatic Execution: sentiment + topics run in parallel → summary waits for both → all reviews processed simultaneously
Define task dependencies once. The engine automatically calculates optimal execution order and runs independent tasks in parallel.
defineDependencies(): Record<string, string[]> {
return {
sentiment: [], // No dependencies
topics: [], // No dependencies
summary: ['sentiment', 'topics'] // Waits for both
};
}Skip expensive analysis on low-value content. Route different tasks to different models based on complexity.
shouldSkipSectionDimension(ctx: SkipContext): boolean {
if (ctx.dimension === 'deep_analysis') {
const quality = ctx.dependencies.quality_check?.data as { score: number };
return quality.score < 0.7; // Skip low-quality items
}
return false;
}
selectProvider(dimension: string): ProviderSelection {
if (dimension === 'quality_check') {
return {
provider: 'anthropic',
options: { model: 'claude-3-5-haiku-20241022' } // Cheap model
};
}
return {
provider: 'anthropic',
options: { model: 'claude-3-7-sonnet-20250219' } // Expensive model
};
}Result: 100 items → 40 high-quality → 60% cost reduction
Reshape sections between processing stages. Group 100 reviews into 5 categories, then analyze categories.
transformSections(ctx: TransformContext): Section[] | undefined {
if (ctx.dimension === 'analyze_categories') {
const categories = ctx.result.data as { categories: Array<{ name: string; items: string[] }> };
return categories.categories.map(cat => ({
content: cat.items.join('\n\n'),
metadata: { category: cat.name, count: cat.items.length }
}));
}
}Result: 100 analyses → 5 analyses (95% fewer API calls)
Automatic retry with exponential backoff. Provider fallback when failures occur. Graceful degradation with partial results.
selectProvider(dimension: string): ProviderSelection {
return {
provider: 'anthropic',
options: { model: 'claude-3-7-sonnet-20250219' },
fallbacks: [
{ provider: 'openai', options: { model: 'gpt-4o' } },
{ provider: 'gemini', options: { model: 'gemini-2.5-pro' } }
]
};
}Route different tasks to different AI providers. Use Claude for filtering, GPT-4 for analysis, Gemini for synthesis.
Track token usage and costs per dimension and provider. Export detailed breakdowns with results.
All hooks support async/await. Integrate Redis caching, PostgreSQL logging, external APIs seamlessly.
Built-in Portkey integration for advanced retry policies, load balancing, and semantic caching.