WorkflowForge Operations Guide
Complete guide to creating and using operations in WorkflowForge.
Table of Contents
- Overview
- Built-in Operations
- Creating Custom Operations
- Operation Patterns
- Data Flow Between Operations
- Compensation and Rollback
- Best Practices
Overview
Operations are the fundamental building blocks of WorkflowForge workflows. Each operation represents a discrete task that transforms data, performs side effects, or makes decisions.
IWorkflowOperation Interface
public interface IWorkflowOperation : IDisposable
{
Guid Id { get; }
string Name { get; }
bool SupportsRestore { get; }
Task<object?> ForgeAsync(object? inputData, IWorkflowFoundry foundry, CancellationToken cancellationToken);
Task RestoreAsync(object? outputData, IWorkflowFoundry foundry, CancellationToken cancellationToken);
}
Key Concepts
- ForgeAsync: Main execution method
- RestoreAsync: Compensation/rollback logic (optional)
- SupportsRestore: Indicates if operation can be rolled back
- Foundry: Provides execution context, logging, and services
Built-in Operations
WorkflowForge provides 7 built-in operation types:
1. DelegateWorkflowOperation
Lambda-based operations for quick, inline logic.
var workflow = WorkflowForge.CreateWorkflow()
.WithName("ProcessOrder")
.AddOperation(new DelegateWorkflowOperation(
"ValidateOrder",
async (input, foundry, ct) => {
var order = (Order)input;
foundry.Logger.LogInformation("Validating order {OrderId}", order.Id);
if (order.Amount <= 0)
throw new InvalidOperationException("Invalid order amount");
return order;
}
))
.Build();
When to Use: Simple operations, prototyping, one-off logic
Features:
- Inline lambda syntax
- Quick to write
- Good for simple transformations
2. ActionWorkflowOperation
Side-effect operations that don’t return values.
var workflow = WorkflowForge.CreateWorkflow()
.WithName("Notifications")
.AddOperation(new ActionWorkflowOperation(
"SendEmail",
async (input, foundry, ct) => {
var email = foundry.Properties["CustomerEmail"] as string;
await _emailService.SendAsync(email, "Order Confirmed");
foundry.Logger.LogInformation("Email sent to {Email}", email);
}
))
.Build();
When to Use: Logging, notifications, audit trails, cleanup
Features:
- No return value (returns input unchanged)
- Focus on side effects
- Clean separation of concerns
3. ConditionalWorkflowOperation
If-then-else decision logic.
var workflow = WorkflowForge.CreateWorkflow()
.WithName("OrderProcessing")
.AddOperation(new ConditionalWorkflowOperation(
name: "CheckOrderValue",
condition: (input, foundry, ct) => {
var amount = (decimal)foundry.Properties["OrderAmount"];
return Task.FromResult(amount > 1000);
},
trueOperation: new DelegateWorkflowOperation(
"HighValueProcessing",
async (input, foundry, ct) => {
foundry.Logger.LogInformation("High-value order processing");
foundry.Properties["RequiresApproval"] = true;
return input;
}
),
falseOperation: new DelegateWorkflowOperation(
"StandardProcessing",
async (input, foundry, ct) => {
foundry.Logger.LogInformation("Standard order processing");
return input;
}
)
))
.Build();
When to Use: Branching logic, routing, decision points
Features:
- Clean if-then-else semantics
- Nested operations
- Condition evaluation with foundry access
4. ForEachWorkflowOperation
Execute multiple operations concurrently with configurable data distribution.
var workflow = WorkflowForge.CreateWorkflow()
.WithName("ProcessOrderItems")
.AddOperation(ForEachWorkflowOperation.CreateSharedInput(
new IWorkflowOperation[] {
new ValidateInventoryOperation(),
new ReserveInventoryOperation(),
new NotifyWarehouseOperation()
},
maxConcurrency: 2, // Throttle to 2 concurrent operations
name: "ProcessItems"
))
.Build();
// Or split input collection among operations
var splitWorkflow = WorkflowForge.CreateWorkflow()
.WithName("DistributedProcessing")
.AddOperation(ForEachWorkflowOperation.CreateSplitInput(
itemOperations,
maxConcurrency: 4
))
.Build();
Factory Methods:
CreateSharedInput: All operations receive the same input dataCreateSplitInput: Input collection is split and distributed among operationsCreateNoInput: Operations receive null input
When to Use: Parallel processing, batch operations, concurrent tasks
Features:
- Configurable concurrency (
maxConcurrency) - Timeout support
- Data distribution strategies
- Result aggregation
4b. AddParallelOperations (WorkflowBuilder Helper)
A convenient fluent API for adding parallel operations directly on the workflow builder.
// Simple parallel execution (all operations get the same input)
var workflow = WorkflowForge.CreateWorkflow()
.WithName("ParallelValidation")
.AddParallelOperations(
new ValidateInventoryOperation(),
new CheckFraudOperation(),
new VerifyCustomerOperation()
)
.AddOperation(new ProcessOrderOperation())
.Build();
// With concurrency control, timeout, and naming
var controlledWorkflow = WorkflowForge.CreateWorkflow()
.WithName("ControlledParallel")
.AddParallelOperations(
operations: new[] { op1, op2, op3, op4 },
maxConcurrency: 2, // Max 2 concurrent
timeout: TimeSpan.FromSeconds(30), // 30s timeout
name: "ParallelValidations" // Named group
)
.Build();
Method Signatures:
// Simple params overload
WorkflowBuilder AddParallelOperations(params IWorkflowOperation[] operations)
// Full control overload
WorkflowBuilder AddParallelOperations(
IEnumerable<IWorkflowOperation> operations,
int? maxConcurrency = null,
TimeSpan? timeout = null,
string? name = null)
When to Use: Quick parallel operation setup without manually creating ForEachWorkflowOperation
Features:
- Fluent API integration
- Uses
ForEachWorkflowOperation.CreateSharedInputinternally - Concurrency and timeout control
- Named operation groups for debugging
5. DelayOperation
Introduce async delays into workflows.
var workflow = WorkflowForge.CreateWorkflow()
.WithName("PollingWorkflow")
.AddOperation(new DelegateWorkflowOperation(
"CheckStatus",
async (input, foundry, ct) => {
var status = await _service.GetStatusAsync();
foundry.Properties["Status"] = status;
return status;
}
))
.AddOperation(new DelayOperation(TimeSpan.FromSeconds(5), "WaitBeforeRetry"))
.AddOperation(new DelegateWorkflowOperation(
"RetryCheck",
async (input, foundry, ct) => {
// Retry logic
return input;
}
))
.Build();
When to Use: Polling, rate limiting, scheduled delays
Features:
- Configurable delay duration
- Async/await compatible
- Cancellation token support
6. LoggingOperation
Structured logging at specific workflow points.
var workflow = WorkflowForge.CreateWorkflow()
.WithName("AuditedWorkflow")
.AddOperation(new LoggingOperation(
"Workflow started for order processing",
WorkflowForgeLogLevel.Information,
"LogStart" // optional name
))
.AddOperation(new DelegateWorkflowOperation(
"ProcessOrder",
async (input, foundry, ct) => {
// Processing logic
return input;
}
))
.AddOperation(new LoggingOperation("Workflow completed successfully"))
.Build();
// Alternative: Use static factory methods for cleaner code
var workflow2 = WorkflowForge.CreateWorkflow()
.WithName("AuditedWorkflow")
.AddOperation(LoggingOperation.Info("Starting order processing"))
.AddOperation(new DelegateWorkflowOperation("ProcessOrder", async (input, foundry, ct) => input))
.AddOperation(LoggingOperation.Info("Workflow completed"))
.Build();
Constructor:
LoggingOperation(string message, WorkflowForgeLogLevel logLevel = Information, string? name = null)
Static Factory Methods:
LoggingOperation.Trace(message)LoggingOperation.Debug(message)LoggingOperation.Info(message)LoggingOperation.Warning(message)LoggingOperation.Error(message)LoggingOperation.Critical(message)
When to Use: Audit points, debugging, progress tracking
Features:
- Structured logging
- Log level control
- Property access for dynamic messages
7. Custom Operations (WorkflowOperationBase)
For complex business logic, create custom operation classes.
public class ValidateOrderOperation : WorkflowOperationBase<Order, ValidationResult>
{
private readonly IOrderValidator _validator;
public ValidateOrderOperation(IOrderValidator validator)
{
_validator = validator;
}
public override string Name => "ValidateOrder";
public override bool SupportsRestore => false;
protected override async Task<ValidationResult> ForgeAsyncCore(
Order input,
IWorkflowFoundry foundry,
CancellationToken cancellationToken)
{
foundry.Logger.LogInformation("Validating order {OrderId}", input.Id);
var result = await _validator.ValidateAsync(input, cancellationToken);
foundry.Properties["ValidationResult"] = result;
return result;
}
}
// Usage
var workflow = WorkflowForge.CreateWorkflow()
.WithName("TypeSafeWorkflow")
.AddOperation(new ValidateOrderOperation(orderValidator))
.Build();
When to Use: Complex business logic, testable operations, reusable components
Features:
- Type safety
- Dependency injection
- Unit testable
- Clean separation of concerns
Creating Custom Operations
Method 1: Inherit from WorkflowOperationBase
For untyped operations, implement ForgeAsyncCore:
public class CustomOperation : WorkflowOperationBase
{
public override string Name => "CustomOperation";
public override bool SupportsRestore => true;
protected override async Task<object?> ForgeAsyncCore(
object? inputData,
IWorkflowFoundry foundry,
CancellationToken cancellationToken)
{
// Your logic here
foundry.Logger.LogInformation("Executing custom operation");
// Access foundry properties
foundry.Properties["Result"] = "Success";
// Return result
return inputData;
}
public override async Task RestoreAsync(
object? outputData,
IWorkflowFoundry foundry,
CancellationToken cancellationToken)
{
// Compensation logic
foundry.Logger.LogInformation("Rolling back custom operation");
foundry.Properties.TryRemove("Result", out _);
}
}
Method 1b: Using Lifecycle Hooks
Add setup/teardown logic without polluting your core business logic:
public class AuditedOperation : WorkflowOperationBase
{
public override string Name => "AuditedOperation";
protected override Task OnBeforeExecuteAsync(
object? inputData,
IWorkflowFoundry foundry,
CancellationToken ct)
{
foundry.Logger.LogInformation("Starting {Operation}", Name);
foundry.Properties["StartTime"] = DateTime.UtcNow;
return Task.CompletedTask;
}
protected override async Task<object?> ForgeAsyncCore(
object? inputData,
IWorkflowFoundry foundry,
CancellationToken ct)
{
// Pure business logic
return await ProcessDataAsync(inputData, ct);
}
protected override Task OnAfterExecuteAsync(
object? inputData,
object? outputData,
IWorkflowFoundry foundry,
CancellationToken ct)
{
var duration = DateTime.UtcNow - (DateTime)foundry.Properties["StartTime"]!;
foundry.Logger.LogInformation("Completed {Operation} in {Duration}ms", Name, duration.TotalMilliseconds);
return Task.CompletedTask;
}
}
Method 2: Inherit from WorkflowOperationBase<TInput, TOutput>
For typed operations, implement ForgeAsyncCore with typed parameters:
public class ProcessOrderOperation : WorkflowOperationBase<Order, ProcessResult>
{
private readonly IOrderService _orderService;
public ProcessOrderOperation(IOrderService orderService)
{
_orderService = orderService;
}
public override string Name => "ProcessOrder";
public override bool SupportsRestore => true;
protected override async Task<ProcessResult> ForgeAsyncCore(
Order input,
IWorkflowFoundry foundry,
CancellationToken cancellationToken)
{
var result = await _orderService.ProcessAsync(input, cancellationToken);
// Store for restoration
foundry.Properties["ProcessedOrderId"] = result.OrderId;
return result;
}
public override async Task RestoreAsync(
ProcessResult output,
IWorkflowFoundry foundry,
CancellationToken cancellationToken)
{
var orderId = (string)foundry.Properties["ProcessedOrderId"];
await _orderService.CancelAsync(orderId, cancellationToken);
}
}
Operation Patterns
Pattern 1: Chain of Transformations
Each operation transforms data and passes it to the next.
var workflow = WorkflowForge.CreateWorkflow()
.WithName("DataPipeline")
.AddOperation(new DelegateWorkflowOperation(
"LoadData",
async (input, foundry, ct) => {
var data = await _repository.LoadAsync();
foundry.Properties["RawData"] = data;
return data;
}
))
.AddOperation(new DelegateWorkflowOperation(
"TransformData",
async (input, foundry, ct) => {
var raw = foundry.Properties["RawData"] as RawData;
var transformed = Transform(raw);
foundry.Properties["TransformedData"] = transformed;
return transformed;
}
))
.AddOperation(new DelegateWorkflowOperation(
"SaveData",
async (input, foundry, ct) => {
var data = foundry.Properties["TransformedData"] as TransformedData;
await _repository.SaveAsync(data);
return data;
}
))
.Build();
Pattern 2: Aggregation
Collect results from multiple operations.
var workflow = WorkflowForge.CreateWorkflow()
.WithName("Aggregation")
.AddOperation(new DelegateWorkflowOperation(
"FetchUserData",
async (input, foundry, ct) => {
var user = await _userService.GetAsync(userId);
foundry.Properties["User"] = user;
return input;
}
))
.AddOperation(new DelegateWorkflowOperation(
"FetchOrderData",
async (input, foundry, ct) => {
var orders = await _orderService.GetForUserAsync(userId);
foundry.Properties["Orders"] = orders;
return input;
}
))
.AddOperation(new DelegateWorkflowOperation(
"AggregateResults",
async (input, foundry, ct) => {
var user = foundry.Properties["User"] as User;
var orders = foundry.Properties["Orders"] as List<Order>;
var result = new AggregatedData {
User = user,
Orders = orders,
TotalSpent = orders.Sum(o => o.Amount)
};
return result;
}
))
.Build();
Pattern 3: Conditional Routing
Route workflow based on runtime conditions.
var workflow = WorkflowForge.CreateWorkflow()
.WithName("ConditionalRouting")
.AddOperation(new DelegateWorkflowOperation(
"ClassifyRequest",
async (input, foundry, ct) => {
var request = input as Request;
foundry.Properties["RequestType"] = request.Type;
return input;
}
))
.AddOperation(new ConditionalWorkflowOperation(
"RouteByType",
(input, foundry, ct) => {
var type = (RequestType)foundry.Properties["RequestType"];
return Task.FromResult(type == RequestType.Premium);
},
trueOperation: new DelegateWorkflowOperation(
"PremiumProcessing",
async (input, foundry, ct) => { /* Premium logic */ return input; }
),
falseOperation: new DelegateWorkflowOperation(
"StandardProcessing",
async (input, foundry, ct) => { /* Standard logic */ return input; }
)
))
.Build();
Pattern 4: Fork-Join
Process items in parallel then join results.
// Create multiple processing operations
var itemOperations = items.Select(item =>
new DelegateWorkflowOperation(
$"Process_{item.Id}",
async (input, foundry, ct) => {
var result = await ProcessAsync(item);
foundry.Properties[$"Result_{item.Id}"] = result;
return result;
}
)).Cast<IWorkflowOperation>().ToArray();
var workflow = WorkflowForge.CreateWorkflow()
.WithName("ParallelProcessing")
.AddOperation(ForEachWorkflowOperation.CreateNoInput(
itemOperations,
maxConcurrency: Environment.ProcessorCount,
name: "ProcessInParallel"
))
.AddOperation(new DelegateWorkflowOperation(
"JoinResults",
async (input, foundry, ct) => {
var results = items.Select(item =>
foundry.GetPropertyOrDefault<Result>($"Result_{item.Id}")).ToList();
var aggregated = Aggregate(results);
return aggregated;
}
))
.Build();
Data Flow Between Operations
Primary: Dictionary-Based (Recommended)
Store all workflow data in foundry.Properties:
// Operation 1: Store data
foundry.Properties["CustomerId"] = customerId;
foundry.Properties["OrderDate"] = DateTime.UtcNow;
foundry.Properties["Items"] = orderItems;
// Operation 2: Retrieve data
var customerId = (string)foundry.Properties["CustomerId"];
var orderDate = (DateTime)foundry.Properties["OrderDate"];
var items = foundry.Properties["Items"] as List<OrderItem>;
Advantages:
- Flexible - add/remove properties dynamically
- No type constraints
- Easy debugging
Best Practices:
- Use consistent key names
- Store primitive types or serializable objects
- Consider using constants for key names
Secondary: Type-Safe Input/Output
Pass data directly between typed operations:
// This pattern chains operations with type safety
var result1 = await operation1.ForgeAsync(input, foundry, ct); // Returns Order
var result2 = await operation2.ForgeAsync(result1, foundry, ct); // Takes Order, returns ValidationResult
Advantages:
- Compile-time type safety
- IntelliSense support
- Clear contracts
Best Practices:
- Use for operations with stable contracts
- Document expected input/output types
- Consider immutable types for safety
Output Chaining Behavior
By default, the foundry passes each operation’s output as the next operation’s input. This enables explicit chaining without additional plumbing.
You can disable chaining when operations should always receive null input:
var options = new WorkflowForgeOptions
{
EnableOutputChaining = false
};
var foundry = WorkflowForge.CreateFoundry("NoChaining", options: options);
Compensation and Rollback
Implementing Compensation
public class CreateOrderOperation : WorkflowOperationBase
{
public override bool SupportsRestore => true;
protected override async Task<object?> ForgeAsyncCore(
object? inputData,
IWorkflowFoundry foundry,
CancellationToken cancellationToken)
{
var orderId = await _orderService.CreateAsync();
// Store for potential rollback
foundry.Properties["CreatedOrderId"] = orderId;
foundry.Logger.LogInformation("Order {OrderId} created", orderId);
return orderId;
}
public override async Task RestoreAsync(
object? outputData,
IWorkflowFoundry foundry,
CancellationToken cancellationToken)
{
var orderId = (string)foundry.Properties["CreatedOrderId"];
await _orderService.DeleteAsync(orderId);
foundry.Logger.LogInformation("Order {OrderId} rolled back", orderId);
}
}
Compensation Flow
- Workflow executes operations sequentially
- Operation fails
- WorkflowSmith triggers compensation
- Executes
RestoreAsyncin reverse order on completed operations - Only operations with
SupportsRestore = trueare compensated
Execution and Compensation Modes
- Default: stop on first error, best-effort compensation.
- ContinueOnError: run all operations and throw
AggregateExceptionat the end. - FailFastCompensation: stop compensation on first restore failure.
- ThrowOnCompensationError: surface compensation failures as
AggregateException.
Best Practices
1. Keep Operations Focused
Each operation should do one thing well:
// Good: Focused operations
.AddOperation("ValidateOrder", ValidateAsync)
.AddOperation("ReserveInventory", ReserveAsync)
.AddOperation("ProcessPayment", ProcessPaymentAsync)
// Bad: God operation
.AddOperation("ProcessEverything", async (foundry, ct) => {
// Validation, inventory, payment all mixed together
})
2. Use Foundry Properties for Shared State
// Good: Store in foundry (typed helpers)
foundry.SetProperty("OrderId", orderId);
foundry.SetProperty("ProcessedAt", DateTime.UtcNow);
// Bad: Hidden state
private static string _orderId; // Don't do this
3. Log Important Events
protected override async Task<object?> ForgeAsyncCore(...)
{
foundry.Logger.LogInformation("Processing order {OrderId}", orderId);
try
{
var result = await ProcessAsync(orderId);
foundry.Logger.LogInformation("Order {OrderId} processed successfully", orderId);
return result;
}
catch (Exception ex)
{
foundry.Logger.LogError(ex, "Failed to process order {OrderId}", orderId);
throw;
}
}
4. Implement Compensation for Critical Operations
// Operations that modify state should support restoration
public override bool SupportsRestore => true; // For: Create, Update, Delete
public override bool SupportsRestore => false; // For: Read, Query, Log
5. Use Type-Safe Operations for Complex Business Logic
// Good: Testable, maintainable
public class ComplexBusinessLogic : WorkflowOperationBase<Input, Output>
{
// Can be unit tested
// Dependencies injected
// Clear contracts
}
// Okay: Simple delegate operation
.AddOperation(new DelegateWorkflowOperation(
"SimpleTransform",
async (input, foundry, ct) => Transform(input)
))
6. Handle Cancellation
protected override async Task<object?> ForgeAsyncCore(
object? inputData,
IWorkflowFoundry foundry,
CancellationToken cancellationToken)
{
// Pass cancellation token to async operations
var result = await _service.ProcessAsync(data, cancellationToken);
// Check cancellation periodically in loops
foreach (var item in items)
{
cancellationToken.ThrowIfCancellationRequested();
// Process item
}
return result;
}
7. Dispose Resources Properly
public class ResourceOperation : WorkflowOperationBase
{
private readonly IDisposable _resource;
public override void Dispose()
{
_resource?.Dispose();
base.Dispose();
}
}
Related Documentation
- Architecture - Understanding WorkflowForge design
- Event System - Monitoring operation execution
- Samples Guide - See operations in action
- API Reference - Complete API documentation