The Observers & Middleware module provides hooks for customizing and monitoring Datum operations through observers and middleware.
Overview#
Observers and middleware allow you to intercept, modify, and monitor data operations throughout the Datum system. They provide powerful extension points for logging, validation, transformation, and custom business logic.
Middleware#
DatumMiddleware #
Middleware intercepts and can modify data operations during create, read, update, and delete operations.
Key Methods:
transformBeforeSave(T entity): Modify entity before savingtransformAfterFetch(T entity): Modify entity after fetching
Creating Middleware#
class EncryptionMiddleware extends DatumMiddleware<Task> {
@override
Future<Task> transformBeforeSave(Task entity) async {
// Encrypt sensitive data before saving
final encryptedDescription = await encrypt(entity.description ?? '');
return entity.copyWith(description: encryptedDescription);
}
@override
Future<Task> transformAfterFetch(Task entity) async {
// Decrypt sensitive data after fetching
final decryptedDescription = await decrypt(entity.description ?? '');
return entity.copyWith(description: decryptedDescription);
}
}
class ValidationMiddleware extends DatumMiddleware<User> {
@override
Future<User> transformBeforeSave(User entity) async {
// Validate user data
if (entity.email.isEmpty) {
throw ValidationException('Email is required');
}
if (!isValidEmail(entity.email)) {
throw ValidationException('Invalid email format');
}
return entity;
}
}
class AuditMiddleware extends DatumMiddleware<Task> {
@override
Future<Task> transformBeforeSave(Task entity) async {
// Add audit information
final auditData = {
'lastModifiedBy': currentUserId,
'lastModifiedAt': DateTime.now(),
'changeType': 'update',
};
return entity.copyWith(auditData: auditData);
}
}
Registering Middleware#
final registrations = [
DatumRegistration<Task>(
localAdapter: HiveTaskAdapter(),
remoteAdapter: SupabaseTaskAdapter(),
middlewares: [
EncryptionMiddleware(),
ValidationMiddleware(),
AuditMiddleware(),
],
),
];
Middleware Execution Order#
Middleware executes in registration order:
- Before Save:
transformBeforeSavemethods run in order - Save Operation: Entity saved to adapters
- After Fetch:
transformAfterFetchmethods run in reverse order
// Execution flow for saving:
// 1. EncryptionMiddleware.transformBeforeSave
// 2. ValidationMiddleware.transformBeforeSave
// 3. AuditMiddleware.transformBeforeSave
// 4. Save to local adapter
// 5. Save to remote adapter
// 6. AuditMiddleware.transformAfterFetch (if fetching)
// 7. ValidationMiddleware.transformAfterFetch (if fetching)
// 8. EncryptionMiddleware.transformAfterFetch (if fetching)
Observers#
DatumObserver #
Observers monitor data operations without modifying them. They receive notifications about operation lifecycle events.
Key Methods:
onCreate(T entity): Called before creating an entityonUpdate(T oldEntity, T newEntity): Called before updating an entityonDelete(T entity): Called before deleting an entityonRead(T entity): Called after reading an entity
Creating Observers#
class LoggingObserver extends DatumObserver<Task> {
@override
Future<void> onCreate(Task entity) async {
logger.info('Creating task: ${entity.title}');
}
@override
Future<void> onUpdate(Task oldEntity, Task newEntity) async {
logger.info('Updating task ${oldEntity.id}: ${oldEntity.title} -> ${newEntity.title}');
}
@override
Future<void> onDelete(Task entity) async {
logger.warn('Deleting task: ${entity.title}');
}
@override
Future<void> onRead(Task entity) async {
logger.debug('Reading task: ${entity.title}');
}
}
class NotificationObserver extends DatumObserver<Task> {
@override
Future<void> onCreate(Task entity) async {
if (entity.assignedTo != currentUserId) {
await sendNotification(
userId: entity.assignedTo,
message: 'New task assigned: ${entity.title}',
);
}
}
@override
Future<void> onUpdate(Task oldEntity, Task newEntity) async {
if (oldEntity.isCompleted != newEntity.isCompleted && newEntity.isCompleted) {
await sendNotification(
userId: newEntity.createdBy,
message: 'Task completed: ${newEntity.title}',
);
}
}
}
class CacheInvalidationObserver extends DatumObserver<Post> {
@override
Future<void> onCreate(Post entity) async {
await cache.invalidate('posts_list');
await cache.invalidate('user_${entity.userId}_posts');
}
@override
Future<void> onUpdate(Post oldEntity, Post newEntity) async {
await cache.invalidate('post_${oldEntity.id}');
if (oldEntity.userId != newEntity.userId) {
await cache.invalidate('user_${oldEntity.userId}_posts');
}
}
@override
Future<void> onDelete(Post entity) async {
await cache.invalidate('post_${entity.id}');
await cache.invalidate('posts_list');
await cache.invalidate('user_${entity.userId}_posts');
}
}
Registering Observers#
final registrations = [
DatumRegistration<Task>(
localAdapter: HiveTaskAdapter(),
remoteAdapter: SupabaseTaskAdapter(),
observers: [
LoggingObserver(),
NotificationObserver(),
],
),
DatumRegistration<Post>(
localAdapter: HivePostAdapter(),
remoteAdapter: SupabasePostAdapter(),
observers: [
CacheInvalidationObserver(),
],
),
];
Global Observers#
GlobalDatumObserver#
Global observers monitor system-wide events across all entities.
Key Methods:
onSyncStart(): Called when global sync startsonSyncEnd(DatumSyncResult result): Called when global sync ends
Creating Global Observers#
class GlobalAnalyticsObserver extends GlobalDatumObserver {
@override
Future<void> onSyncStart() async {
analytics.track('sync_started', properties: {
'timestamp': DateTime.now().toIso8601String(),
});
}
@override
Future<void> onSyncEnd(DatumSyncResult result) async {
analytics.track('sync_completed', properties: {
'duration': result.duration.inMilliseconds,
'syncedCount': result.syncedCount,
'failedCount': result.failedCount,
'conflictsResolved': result.conflictsResolved,
});
}
}
class GlobalHealthObserver extends GlobalDatumObserver {
@override
Future<void> onSyncStart() async {
// Record sync start for health monitoring
await healthMonitor.recordSyncStart();
}
@override
Future<void> onSyncEnd(DatumSyncResult result) async {
// Update health metrics
await healthMonitor.recordSyncEnd(result);
// Alert on sync failures
if (result.failedCount > 0) {
await alertSystem.sendAlert(
'Sync completed with failures',
'Failed operations: ${result.failedCount}',
);
}
}
}
Registering Global Observers#
await Datum.initialize(
config: config,
connectivityChecker: connectivityChecker,
registrations: registrations,
observers: [
GlobalAnalyticsObserver(),
GlobalHealthObserver(),
],
);
Advanced Patterns#
Conditional Middleware#
class ConditionalEncryptionMiddleware extends DatumMiddleware<Task> {
@override
Future<Task> transformBeforeSave(Task entity) async {
// Only encrypt sensitive tasks
if (entity.isSensitive) {
final encryptedDescription = await encrypt(entity.description ?? '');
return entity.copyWith(description: encryptedDescription);
}
return entity;
}
@override
Future<Task> transformAfterFetch(Task entity) async {
if (entity.isSensitive) {
final decryptedDescription = await decrypt(entity.description ?? '');
return entity.copyWith(description: decryptedDescription);
}
return entity;
}
}
Composite Observers#
class CompositeObserver extends DatumObserver<Task> {
final List<DatumObserver<Task>> _observers;
CompositeObserver(this._observers);
@override
Future<void> onCreate(Task entity) async {
for (final observer in _observers) {
await observer.onCreate(entity);
}
}
@override
Future<void> onUpdate(Task oldEntity, Task newEntity) async {
for (final observer in _observers) {
await observer.onUpdate(oldEntity, newEntity);
}
}
@override
Future<void> onDelete(Task entity) async {
for (final observer in _observers) {
await observer.onDelete(entity);
}
}
@override
Future<void> onRead(Task entity) async {
for (final observer in _observers) {
await observer.onRead(entity);
}
}
}
Async Middleware#
class AsyncValidationMiddleware extends DatumMiddleware<User> {
@override
Future<User> transformBeforeSave(User entity) async {
// Perform async validation (e.g., check uniqueness)
final existingUser = await userService.findByEmail(entity.email);
if (existingUser != null && existingUser.id != entity.id) {
throw ValidationException('Email already exists');
}
// Perform external API validation
final isValid = await externalApi.validateUser(entity);
if (!isValid) {
throw ValidationException('User validation failed');
}
return entity;
}
}
Error Handling in Middleware/Observers#
class ResilientObserver extends DatumObserver<Task> {
@override
Future<void> onCreate(Task entity) async {
try {
await notificationService.sendWelcomeNotification(entity);
} catch (e) {
// Log error but don't fail the operation
logger.error('Failed to send welcome notification: $e');
// Consider sending to error tracking service
await errorTracker.report(e, context: {'operation': 'create', 'entityId': entity.id});
}
}
}
class SafeMiddleware extends DatumMiddleware<Task> {
@override
Future<Task> transformBeforeSave(Task entity) async {
try {
return await performTransformation(entity);
} catch (e) {
logger.error('Middleware transformation failed: $e');
// Return original entity to allow operation to continue
return entity;
}
}
Future<Task> performTransformation(Task entity) async {
// Actual transformation logic here
return entity;
}
}
Performance Considerations#
Middleware Performance#
- Keep transformations fast: Avoid heavy computations in middleware
- Use async carefully: Async operations can impact performance
- Cache results: Cache expensive operations when possible
- Batch operations: Process multiple entities together when possible
Observer Performance#
- Make observers lightweight: Avoid blocking operations
- Use async observers: Don't block main operations
- Batch notifications: Send batched notifications when possible
- Conditional execution: Only execute when necessary
Memory Management#
- Clean up resources: Dispose of resources in observers/middleware
- Avoid memory leaks: Be careful with stream subscriptions
- Limit concurrent operations: Control concurrency in middleware
Testing#
Testing Middleware#
void main() {
test('EncryptionMiddleware encrypts data', () async {
final middleware = EncryptionMiddleware();
final task = Task(description: 'secret data');
final transformed = await middleware.transformBeforeSave(task);
expect(transformed.description, isNot(equals('secret data')));
expect(await decrypt(transformed.description!), equals('secret data'));
});
test('ValidationMiddleware rejects invalid data', () async {
final middleware = ValidationMiddleware();
final invalidUser = User(email: '');
expect(
() => middleware.transformBeforeSave(invalidUser),
throwsA(isA<ValidationException>()),
);
});
}
Testing Observers#
void main() {
test('LoggingObserver logs operations', () async {
final logger = MockLogger();
final observer = LoggingObserver(logger);
await observer.onCreate(testTask);
verify(logger.info('Creating task: ${testTask.title}')).called(1);
});
test('NotificationObserver sends notifications', () async {
final notificationService = MockNotificationService();
final observer = NotificationObserver(notificationService);
await observer.onCreate(testTask);
verify(notificationService.sendNotification(
userId: testTask.assignedTo,
message: 'New task assigned: ${testTask.title}',
)).called(1);
});
}
Best Practices#
Middleware Best Practices#
- Keep it focused: Each middleware should have a single responsibility
- Make it idempotent: Running multiple times should be safe
- Handle errors gracefully: Don't break operations due to middleware failures
- Document transformations: Clearly document what each middleware does
- Test thoroughly: Test edge cases and error conditions
Observer Best Practices#
- Don't modify data: Observers should only observe, not modify
- Handle failures: Don't let observer failures break operations
- Be efficient: Keep observers lightweight and fast
- Use appropriate scope: Choose between entity-specific and global observers
General Best Practices#
- Order matters: Consider the order of middleware and observers
- Avoid dependencies: Minimize dependencies between middleware/observers
- Monitor performance: Track the impact of middleware on performance
- Version carefully: Consider versioning when changing middleware behavior
- Document behavior: Clearly document what each component does
Common Use Cases#
Authentication & Authorization#
class AuthorizationMiddleware extends DatumMiddleware<Document> {
@override
Future<Document> transformBeforeSave(Document entity) async {
if (!await permissionService.canEdit(entity, currentUser)) {
throw AuthorizationException('Not authorized to edit document');
}
return entity;
}
}
Data Enrichment#
class EnrichmentMiddleware extends DatumMiddleware<Post> {
@override
Future<Post> transformAfterFetch(Post entity) async {
// Add computed fields
final author = await userService.getById(entity.userId);
final commentCount = await commentService.countByPostId(entity.id);
return entity.copyWith(
authorName: author.name,
commentCount: commentCount,
);
}
}
Audit Trail#
class AuditObserver extends DatumObserver<Task> {
@override
Future<void> onUpdate(Task oldEntity, Task newEntity) async {
await auditService.logChange(
entityType: 'Task',
entityId: oldEntity.id,
userId: currentUserId,
changes: oldEntity.diff(newEntity),
timestamp: DateTime.now(),
);
}
}
Caching#
class CacheObserver extends DatumObserver<Product> {
@override
Future<void> onUpdate(Product oldEntity, Product newEntity) async {
await cache.invalidate('product_${oldEntity.id}');
await cache.invalidate('products_list');
// Update cache with new data
await cache.set('product_${newEntity.id}', newEntity, ttl: Duration(hours: 1));
}
}
```</content>