This guide shows how to implement a complete Supabase remote adapter for Datum.
Overview#
Supabase provides real-time database capabilities with PostgreSQL and works seamlessly with Datum's synchronization features. This adapter uses Supabase's realtime subscriptions for live data sync and PostgREST for CRUD operations.
Setup#
Add Supabase dependencies to your pubspec.yaml:
dependencies:
supabase_flutter: ^2.0.0
recase: ^4.1.0
Initialize Supabase in your app:
import 'package:supabase_flutter/supabase_flutter.dart';
Future<void> main() async {
await Supabase.initialize(
url: 'YOUR_SUPABASE_URL',
anonKey: 'YOUR_SUPABASE_ANON_KEY',
);
runApp(MyApp());
}
Implementation#
import 'dart:async';
import 'dart:ui';
import 'package:datum/datum.dart';
import 'package:example/bootstrap.dart';
import 'package:supabase_flutter/supabase_flutter.dart';
import 'package:recase/recase.dart';
/// Manages retry logic for Supabase realtime subscriptions
class _SubscriptionRetryManager {
Timer? _retryTimer;
int _retryCount = 0;
bool _isRetrying = false;
DateTime? _lastRetryTime;
int _consecutiveFailures = 0;
static const int maxRetries = 5;
static const Duration baseRetryDelay = Duration(seconds: 1);
bool get isRetrying => _isRetrying;
bool get hasFailures => _retryCount > 0 || _consecutiveFailures > 0;
void scheduleRetry(
String tableName, bool isAuthenticated, VoidCallback retryCallback) {
if (_isRetrying || !isAuthenticated) {
talker.debug(
"Skipping retry: already retrying or not authenticated for table: $tableName");
return;
}
_trackFailure();
if (_retryCount >= maxRetries) {
talker.error(
"Max retry attempts reached for table: $tableName. Giving up.");
return;
}
_isRetrying = true;
_retryCount++;
final delay = _calculateDelay();
talker.warning(
"Scheduling retry attempt $_retryCount for table: $tableName in ${delay.inSeconds} seconds");
_retryTimer?.cancel();
_retryTimer = Timer(delay, () {
if (!isAuthenticated) {
talker.debug(
"Skipping retry: user not authenticated for table: $tableName");
_isRetrying = false;
return;
}
talker.info(
"Retrying subscription for table: $tableName (attempt $_retryCount)");
_isRetrying = false;
retryCallback();
});
}
void _trackFailure() {
final now = DateTime.now();
if (_lastRetryTime != null &&
now.difference(_lastRetryTime!).inSeconds < 30) {
_consecutiveFailures++;
} else {
_consecutiveFailures = 1;
}
_lastRetryTime = now;
}
Duration _calculateDelay() {
var delaySeconds =
(baseRetryDelay.inSeconds * (1 << (_retryCount - 1))).clamp(1, 30);
if (_consecutiveFailures >= 3) {
delaySeconds = (delaySeconds * 5).clamp(30, 300);
}
return Duration(seconds: delaySeconds);
}
void reset() {
_retryCount = 0;
_isRetrying = false;
_retryTimer?.cancel();
_retryTimer = null;
_consecutiveFailures = 0;
_lastRetryTime = null;
}
void dispose() {
_retryTimer?.cancel();
}
}
class SupabaseRemoteAdapter<T extends DatumEntityInterface>
extends RemoteAdapter<T> {
final String tableName;
final T Function(Map<String, dynamic>) fromMap;
final SupabaseClient? _clientOverride;
SupabaseRemoteAdapter({
required this.tableName,
required this.fromMap,
SupabaseClient? clientOverride,
}) : _clientOverride = clientOverride;
// Core components
RealtimeChannel? _channel;
StreamController<DatumChangeDetail<T>>? _streamController;
final Map<String, RealtimeChannel> _relatedChannels = {};
final _SubscriptionRetryManager _retryManager = _SubscriptionRetryManager();
// Authentication
StreamSubscription<AuthState>? _authSubscription;
bool _isAuthenticated = false;
final StreamController<bool> _authStateController =
StreamController<bool>.broadcast();
SupabaseClient get _client => _clientOverride ?? Supabase.instance.client;
String get _metadataTableName => 'sync_metadata';
Stream<bool> get authStateStream => _authStateController.stream;
@override
Future<void> delete(String id, {String? userId}) async {
try {
await _client.from(tableName).delete().eq(
'id',
id,
);
} on PostgrestException catch (e) {
// PGRST116: "Cannot coerce the result to a single JSON object" - means no rows were affected
if (e.code == 'PGRST116') {
throw EntityNotFoundException(
message: 'Entity with id $id not found in table $tableName',
);
}
rethrow;
}
}
@override
Future<AdapterHealthStatus> checkHealth() async {
final auth =
Supabase.instance.client.auth.currentSession?.accessToken == null;
return auth == true
? AdapterHealthStatus.unhealthy
: AdapterHealthStatus.healthy;
}
@override
Future<List<T>> readAll({String? userId, DatumSyncScope? scope}) async {
talker.info(
"🔍 [Adapter] readAll called for table: $tableName, userId: $userId");
talker.debug("🔍 [Adapter] scope: $scope");
try {
PostgrestFilterBuilder queryBuilder = _client.from(tableName).select();
talker.debug("🔍 [Adapter] Created query builder for table: $tableName");
// Apply filters from the sync scope, if provided.
if (scope != null) {
talker.debug(
"🔍 [Adapter] Applying ${scope.query.filters.length} filters");
for (final condition in scope.query.filters) {
queryBuilder = _applyFilter(queryBuilder, condition);
talker.debug("🔍 [Adapter] Applied filter: $condition");
}
}
talker.debug("🔍 [Adapter] Executing query for table: $tableName");
final response = await queryBuilder;
talker.info(
"✅ [Adapter] Query successful for table: $tableName, response type: ${response.runtimeType}");
if (response is List<Map<String, dynamic>>) {
final items =
response.map<T>((json) => fromMap(_toCamelCase(json))).toList();
talker.info(
"✅ [Adapter] Successfully parsed ${items.length} items from table: $tableName");
return items;
} else {
talker.warning(
"⚠️ [Adapter] Unexpected response type for table: $tableName - expected List<Map<String, dynamic>>, got ${response.runtimeType}");
return [];
}
} catch (e, stackTrace) {
talker.error(
"❌ [Adapter] readAll failed for table: $tableName - $e", stackTrace);
rethrow;
}
}
@override
Future<T?> read(String id, {String? userId}) async {
final response = await _client.from(tableName).select().eq('id', id);
if (response.length > 1) {
return null;
}
return fromMap(_toCamelCase(response.first));
}
@override
Future<DatumSyncMetadata?> getSyncMetadata(String userId) async {
talker.info(
"🔍 [Adapter] getSyncMetadata called for userId: $userId, table: $_metadataTableName");
try {
talker.debug(
"🔍 [Adapter] Executing query: SELECT from $_metadataTableName WHERE user_id = $userId");
final response = await _client
.from(_metadataTableName)
.select()
.eq('user_id', userId)
.maybeSingle();
talker.info(
"✅ [Adapter] getSyncMetadata query successful, response: ${response != null ? 'found' : 'null'}");
if (response == null) {
talker.debug(
"🔍 [Adapter] No sync metadata found for user $userId, returning null");
return null;
}
talker.debug("🔍 [Adapter] Parsing sync metadata response: $response");
final metadata = DatumSyncMetadata.fromMap(response);
talker.info(
"✅ [Adapter] Successfully parsed sync metadata for user $userId");
return metadata;
} catch (e, stackTrace) {
talker.error("❌ [Adapter] getSyncMetadata failed for user $userId - $e",
stackTrace);
rethrow;
}
}
@override
Future<bool> isConnected() async => true;
@override
Future<void> create(T entity) async {
final data = _toSnakeCase(entity.toDatumMap(target: MapTarget.remote));
// Ensure userId is in the payload
data['user_id'] = entity.userId;
final response = await _client
.from(tableName)
.upsert(data, onConflict: 'id')
.select()
.maybeSingle();
if (response == null) {
throw Exception(
'Failed to push item: upsert did not return the expected record. Check RLS policies.',
);
}
}
@override
Future<T> patch({
required String id,
required Map<String, dynamic> delta,
String? userId,
}) async {
try {
final snakeCaseDelta = _toSnakeCase(delta);
final response = await _client
.from(tableName)
.update(snakeCaseDelta)
.eq('id', id)
.select()
.maybeSingle();
if (response == null) {
throw EntityNotFoundException(
message:
'Failed to patch item: record not found or RLS policy prevented selection.',
);
}
return fromMap(_toCamelCase(response));
} on PostgrestException catch (e) {
// PGRST116: "Cannot coerce the result to a single JSON object" - means no rows were affected
if (e.code == 'PGRST116') {
throw EntityNotFoundException(
message:
'Entity with id $id not found in table $tableName during patch operation',
);
}
rethrow;
}
}
@override
Stream<List<T>>? watchAll({String? userId, DatumSyncScope? scope}) {
talker.info("👀 [Adapter] watchAll called for table: $tableName, userId: $userId");
if (!_isAuthenticated) {
talker.warning("Cannot watch table '$tableName': user not authenticated");
return null;
}
late StreamController<List<T>> controller;
RealtimeChannel? channel;
Future<void> fetchAndEmit() async {
try {
talker.debug("🔄 [Adapter] Fetching current data for watchAll on table: $tableName");
final items = await readAll(userId: userId, scope: scope);
if (!controller.isClosed) {
controller.add(items);
talker.debug("✅ [Adapter] Emitted ${items.length} items for watchAll on table: $tableName");
}
} catch (e, stackTrace) {
talker.error("❌ [Adapter] Failed to fetch data for watchAll on table '$tableName': $e", stackTrace);
if (!controller.isClosed) {
controller.addError(e);
}
}
}
void setupRealtimeSubscription() {
final channelName = 'watchAll:$tableName:${userId ?? 'all'}';
talker.debug("📡 [Adapter] Setting up realtime subscription for watchAll on table: $tableName");
channel = _client
.channel(channelName)
.onPostgresChanges(
event: PostgresChangeEvent.all,
schema: 'public',
table: tableName,
callback: (payload) {
talker.info('📡 [Adapter] Change detected for watchAll on table: $tableName - ${payload.eventType}');
// For watchAll, we need to re-fetch all data when any change occurs
// In a production app, you might want to be more selective about when to re-fetch
fetchAndEmit();
},
);
channel?.subscribe(
(status, error) {
talker.debug("📡 [Adapter] watchAll subscription status for table '$tableName': $status");
if (error != null) {
talker.error("❌ [Adapter] watchAll subscription error for table '$tableName': $error");
if (!controller.isClosed) {
controller.addError(error);
}
} else if (status == RealtimeSubscribeStatus.subscribed) {
talker.info("✅ [Adapter] Successfully subscribed to watchAll for table: $tableName");
}
},
);
if (channel != null) {
_relatedChannels[channelName] = channel!;
}
}
controller = StreamController<List<T>>.broadcast(
onListen: () {
talker.debug("👂 [Adapter] watchAll stream listened for table: $tableName");
// Fetch initial data
fetchAndEmit();
// Setup realtime subscription
setupRealtimeSubscription();
},
onCancel: () async {
talker.debug("🚫 [Adapter] watchAll stream cancelled for table: $tableName");
if (channel != null) {
await _client.removeChannel(channel!);
_relatedChannels.remove('watchAll:$tableName:${userId ?? 'all'}');
}
},
);
return controller.stream;
}
@override
Stream<T?>? watchById(String id, {String? userId}) {
talker.info("👀 [Adapter] watchById called for table: $tableName, id: $id, userId: $userId");
if (!_isAuthenticated) {
talker.warning("Cannot watch table '$tableName': user not authenticated");
return null;
}
late StreamController<T?> controller;
RealtimeChannel? channel;
Future<void> fetchAndEmit() async {
try {
talker.debug("🔄 [Adapter] Fetching current data for watchById on table: $tableName, id: $id");
final item = await read(id, userId: userId);
if (!controller.isClosed) {
controller.add(item);
talker.debug("✅ [Adapter] Emitted item for watchById on table: $tableName, id: $id (found: ${item != null})");
}
} catch (e, stackTrace) {
talker.error("❌ [Adapter] Failed to fetch data for watchById on table '$tableName', id '$id': $e", stackTrace);
if (!controller.isClosed) {
controller.addError(e);
}
}
}
void setupRealtimeSubscription() {
final channelName = 'watchById:$tableName:$id';
talker.debug("📡 [Adapter] Setting up realtime subscription for watchById on table: $tableName, id: $id");
channel = _client
.channel(channelName)
.onPostgresChanges(
event: PostgresChangeEvent.all,
schema: 'public',
table: tableName,
filter: PostgresChangeFilter(
type: PostgresChangeFilterType.eq,
column: 'id',
value: id,
),
callback: (payload) {
talker.info('📡 [Adapter] Change detected for watchById on table: $tableName, id: $id - ${payload.eventType}');
// For watchById, we emit the updated item or null for deletes
if (payload.eventType == PostgresChangeEvent.delete) {
if (!controller.isClosed) {
controller.add(null);
talker.debug("✅ [Adapter] Emitted null for deleted item in watchById on table: $tableName, id: $id");
}
} else {
// For insert/update, fetch and emit the current item
fetchAndEmit();
}
},
);
channel?.subscribe(
(status, error) {
talker.debug("📡 [Adapter] watchById subscription status for table '$tableName', id '$id': $status");
if (error != null) {
talker.error("❌ [Adapter] watchById subscription error for table '$tableName', id '$id': $error");
if (!controller.isClosed) {
controller.addError(error);
}
} else if (status == RealtimeSubscribeStatus.subscribed) {
talker.info("✅ [Adapter] Successfully subscribed to watchById for table: $tableName, id: $id");
}
},
);
if (channel != null) {
_relatedChannels[channelName] = channel!;
}
}
controller = StreamController<T?>.broadcast(
onListen: () {
talker.debug("👂 [Adapter] watchById stream listened for table: $tableName, id: $id");
// Fetch initial data
fetchAndEmit();
// Setup realtime subscription
setupRealtimeSubscription();
},
onCancel: () async {
talker.debug("🚫 [Adapter] watchById stream cancelled for table: $tableName, id: $id");
if (channel != null) {
await _client.removeChannel(channel!);
_relatedChannels.remove('watchById:$tableName:$id');
}
},
);
return controller.stream;
}
@override
Stream<List<T>>? watchQuery(DatumQuery query, {String? userId}) {
talker.info("👀 [Adapter] watchQuery called for table: $tableName, userId: $userId");
if (!_isAuthenticated) {
talker.warning("Cannot watch table '$tableName': user not authenticated");
return null;
}
late StreamController<List<T>> controller;
RealtimeChannel? channel;
Future<void> fetchAndEmit() async {
try {
talker.debug("🔄 [Adapter] Fetching current data for watchQuery on table: $tableName");
final items = await this.query(query, userId: userId);
if (!controller.isClosed) {
controller.add(items);
talker.debug("✅ [Adapter] Emitted ${items.length} items for watchQuery on table: $tableName");
}
} catch (e, stackTrace) {
talker.error("❌ [Adapter] Failed to fetch data for watchQuery on table '$tableName': $e", stackTrace);
if (!controller.isClosed) {
controller.addError(e);
}
}
}
void setupRealtimeSubscription() {
final channelName = 'watchQuery:$tableName:${userId ?? 'all'}';
talker.debug("📡 [Adapter] Setting up realtime subscription for watchQuery on table: $tableName");
channel = _client
.channel(channelName)
.onPostgresChanges(
event: PostgresChangeEvent.all,
schema: 'public',
table: tableName,
callback: (payload) {
talker.info('📡 [Adapter] Change detected for watchQuery on table: $tableName - ${payload.eventType}');
// For watchQuery, we need to re-evaluate the query when any change occurs
// In a production app, you might want to be more selective about when to re-fetch
// based on whether the changed record matches the query filters
fetchAndEmit();
},
);
channel?.subscribe(
(status, error) {
talker.debug("📡 [Adapter] watchQuery subscription status for table '$tableName': $status");
if (error != null) {
talker.error("❌ [Adapter] watchQuery subscription error for table '$tableName': $error");
if (!controller.isClosed) {
controller.addError(error);
}
} else if (status == RealtimeSubscribeStatus.subscribed) {
talker.info("✅ [Adapter] Successfully subscribed to watchQuery for table: $tableName");
}
},
);
if (channel != null) {
_relatedChannels[channelName] = channel!;
}
}
controller = StreamController<List<T>>.broadcast(
onListen: () {
talker.debug("👂 [Adapter] watchQuery stream listened for table: $tableName");
// Fetch initial data
fetchAndEmit();
// Setup realtime subscription
setupRealtimeSubscription();
},
onCancel: () async {
talker.debug("🚫 [Adapter] watchQuery stream cancelled for table: $tableName");
if (channel != null) {
await _client.removeChannel(channel!);
_relatedChannels.remove('watchQuery:$tableName:${userId ?? 'all'}');
}
},
);
return controller.stream;
}
@override
Future<void> updateSyncMetadata(
DatumSyncMetadata metadata, String userId) async {
// Check if user is authenticated before attempting to update sync metadata
if (!_isAuthenticated) {
talker.warning(
"Skipping sync metadata update for user $userId: User is not authenticated");
return;
}
talker
.debug("Updating sync metadata for user: $userId with data: $metadata");
final data = _toSnakeCase(metadata.toMap());
data['user_id'] = userId;
try {
await _client.from(_metadataTableName).upsert(data);
} catch (e) {
// Handle RLS policy violations gracefully
if (e is PostgrestException && e.code == '42501') {
talker.warning(
"Failed to update sync metadata due to RLS policy violation. User may have logged out.");
// Mark as unauthenticated to prevent further attempts
_updateAuthenticationState(false);
} else {
// Re-throw other exceptions
rethrow;
}
}
}
@override
Stream<DatumChangeDetail<T>>? get changeStream {
_streamController ??= StreamController<DatumChangeDetail<T>>.broadcast(
onListen: _subscribeToChanges,
onCancel: unsubscribeFromChanges,
);
return _streamController?.stream;
}
void _subscribeToChanges() {
talker.info("Subscribing to Supabase changes for table: $tableName");
// Check authentication state before subscribing
final currentUser = _client.auth.currentUser;
final hasSession = _client.auth.currentSession != null;
talker.debug(
"Subscription attempt - Authenticated: $_isAuthenticated, Current user: ${currentUser?.id}, Has session: $hasSession");
if (!_isAuthenticated) {
talker.warning(
"Attempting to subscribe to table '$tableName' while not authenticated. Delaying subscription until authenticated.");
// Don't attempt subscription if not authenticated - let auth monitoring handle it
return;
}
try {
talker.debug("Creating realtime channel for table: $tableName");
_channel = _client
.channel(
'public:$tableName',
)
.onPostgresChanges(
event: PostgresChangeEvent.all,
schema: 'public',
table: tableName,
callback: (payload) {
talker.info(
'Received Supabase change: ${payload.eventType} for table: $tableName');
talker.debug('Payload: $payload');
DatumOperationType? type;
Map<String, dynamic>? record;
switch (payload.eventType) {
case PostgresChangeEvent.insert:
type = DatumOperationType.create;
record = payload.newRecord;
talker.debug('Insert event detected for table: $tableName');
break;
case PostgresChangeEvent.update:
type = DatumOperationType.update;
record = payload.newRecord;
talker.debug('Update event detected for table: $tableName');
break;
case PostgresChangeEvent.delete:
type = DatumOperationType.delete;
record = payload.oldRecord;
talker.debug('Delete event detected for table: $tableName');
break;
case PostgresChangeEvent.all:
talker.debug(
'Received "all" event type for table: $tableName, ignoring.');
break;
}
if (type != null && record != null) {
talker.debug(
'Processing change of type $type for record: $record in table: $tableName');
final item = fromMap(_toCamelCase(record));
// When a delete event comes from Supabase, the oldRecord might only
// contain the ID. If the userId is missing, we assume the change
// belongs to the currently authenticated user.
final userId = item.userId.isNotEmpty
? item.userId
: _client.auth.currentUser?.id;
if (userId == null) {
talker.warning(
'Could not determine userId for change in table: $tableName, dropping event.');
return;
}
_streamController?.add(
DatumChangeDetail<T>(
type: type,
entityId: item.id,
userId: userId,
timestamp: item.modifiedAt,
data: item,
),
);
talker.info(
'Successfully processed and streamed change for ${item.id} in table: $tableName');
} else {
talker.warning(
'Change event received for table: $tableName but not processed (type or record was null).');
}
},
);
talker.debug("Subscribing to channel for table: $tableName");
_channel?.subscribe(
(status, error) {
talker.info(
"Channel subscription status for table '$tableName': $status");
if (error != null) {
talker.error(
"Channel subscription error for table '$tableName': $error");
_handleSubscriptionError();
} else if (status == RealtimeSubscribeStatus.subscribed) {
talker.info(
"Successfully subscribed to changes for table: $tableName");
_onSubscriptionRestored();
} else if (status == RealtimeSubscribeStatus.closed) {
talker.warning("Channel closed for table: $tableName");
_handleSubscriptionError();
} else if (status == RealtimeSubscribeStatus.timedOut) {
talker
.error("Channel subscription timed out for table: $tableName");
_handleSubscriptionError();
} else if (status == RealtimeSubscribeStatus.channelError) {
talker.error("Channel error occurred for table: $tableName");
_handleSubscriptionError();
}
},
);
talker.debug("Channel subscription initiated for table: $tableName");
} catch (e, stackTrace) {
talker.error("Failed to subscribe to changes for table '$tableName': $e",
stackTrace);
_channel = null;
}
}
@override
Future<void> unsubscribeFromChanges() async {
talker.debug("Unsubscribing from changes for table: $tableName");
if (_channel != null) {
talker.debug("Removing main channel for table: $tableName");
await _client.removeChannel(_channel!);
_channel = null;
talker
.info("Successfully unsubscribed from changes for table: $tableName");
} else {
talker.debug("No active channel to unsubscribe for table: $tableName");
}
// Unsubscribe from all related entity channels
if (_relatedChannels.isNotEmpty) {
talker.debug(
"Cleaning up ${_relatedChannels.length} related channels for table: $tableName");
for (final entry in _relatedChannels.entries) {
await _client.removeChannel(entry.value);
talker.debug("Removed related channel: ${entry.key}");
}
_relatedChannels.clear();
}
}
@override
Future<void> resubscribeToChanges() async {
talker.info("Resubscribing to changes for table: $tableName");
talker.debug(
"Current channel state: ${_channel != null ? 'active' : 'null'}");
try {
await unsubscribeFromChanges();
talker.debug(
"Successfully unsubscribed, now subscribing again for table: $tableName");
_subscribeToChanges();
talker.info("Resubscription process completed for table: $tableName");
} catch (e, stackTrace) {
talker.error(
"Failed to resubscribe to changes for table '$tableName': $e",
stackTrace);
}
}
Future<void> clearSyncMetadata(String userId) async {
await _client.from(_metadataTableName).delete().eq('user_id', userId);
}
// Authentication monitoring methods
void startAuthMonitoring() {
if (_authSubscription != null) return; // Already monitoring
talker.info(
"Starting authentication state monitoring for $tableName adapter");
_authSubscription = _client.auth.onAuthStateChange.listen(
(AuthState authState) {
final isAuthenticated = authState.session != null;
_updateAuthenticationState(isAuthenticated);
if (!isAuthenticated) {
talker.info("User logged out, stopping sync for $tableName adapter");
// Stop syncing when user logs out
_retryManager.reset();
unsubscribeFromChanges();
} else {
talker.info("User logged in, resuming sync for $tableName adapter");
// Resume syncing when user logs in
if (_channel == null) {
_subscribeToChanges();
}
}
},
onError: (error) {
talker.error("Auth state monitoring error: $error");
},
);
// Set initial state
final currentSession = _client.auth.currentSession;
_updateAuthenticationState(currentSession != null);
}
Future<void> _stopAuthMonitoring() async {
if (_authSubscription != null) {
await _authSubscription!.cancel();
_authSubscription = null;
talker.info(
"Stopped authentication state monitoring for $tableName adapter");
}
}
void _updateAuthenticationState(bool isAuthenticated) {
if (_isAuthenticated != isAuthenticated) {
_isAuthenticated = isAuthenticated;
_authStateController.add(isAuthenticated);
talker.debug(
"Authentication state changed: $isAuthenticated for $tableName adapter");
}
}
// Subscription management
void _handleSubscriptionError() {
_retryManager.scheduleRetry(
tableName, _isAuthenticated, _subscribeToChanges);
}
void _onSubscriptionRestored() {
final hadFailures = _retryManager.hasFailures;
_retryManager.reset();
if (hadFailures) {
talker.info(
"Subscription restored for table: $tableName after failures. Triggering full sync to catch missed updates.");
_triggerFullSyncAfterRestoration();
} else {
talker.debug(
"Subscription restored for table: $tableName (no failures detected)");
}
}
void _triggerFullSyncAfterRestoration() async {
try {
final currentUser = _client.auth.currentUser;
if (currentUser == null) {
talker.debug(
"No authenticated user found, skipping restoration sync for table: $tableName");
return;
}
// Check if Datum is initialized before attempting sync
if (!Datum.isInitialized) {
talker.debug(
"Datum not initialized yet, skipping restoration sync for table: $tableName");
return;
}
await Datum.manager<T>().synchronize(
currentUser.id,
options: DatumSyncOptions<T>(
forceFullSync: true,
direction: SyncDirection.pullOnly,
),
);
talker.info(
"Successfully completed restoration sync for table: $tableName");
} catch (e, stackTrace) {
talker.error(
"Failed to perform restoration sync for table: $tableName: $e",
stackTrace);
}
}
@override
Future<void> dispose() async {
_retryManager.dispose();
await unsubscribeFromChanges();
await _streamController?.close();
await _stopAuthMonitoring();
await _authStateController.close();
return super.dispose();
}
@override
Future<void> initialize() async {
talker.info("Initializing SupabaseRemoteAdapter for table: $tableName");
talker.debug(
"Current channel state: ${_channel != null ? 'exists' : 'null'}");
try {
// Start monitoring authentication state FIRST
talker.debug("Starting authentication monitoring for table: $tableName");
startAuthMonitoring();
// Wait a brief moment for auth state to stabilize
await Future.delayed(const Duration(milliseconds: 100));
// Only attempt subscription if authenticated
if (_isAuthenticated && _channel == null) {
talker.debug(
"User is authenticated, ensuring clean state and subscribing for table: $tableName");
await unsubscribeFromChanges();
_subscribeToChanges();
} else if (!_isAuthenticated) {
talker.debug(
"User not authenticated during initialization, subscription will be handled by auth monitoring for table: $tableName");
} else {
talker.debug(
"Channel already exists for table: $tableName, skipping subscription during initialization");
}
talker.info(
"Successfully initialized SupabaseRemoteAdapter for table: $tableName");
} catch (e, stackTrace) {
talker.error(
"Failed to initialize SupabaseRemoteAdapter for table '$tableName': $e",
stackTrace);
rethrow;
}
// The Supabase client is initialized globally, so no specific
// initialization is needed for this adapter instance.
return Future.value();
}
@override
Future<List<T>> query(DatumQuery query, {String? userId}) async {
PostgrestFilterBuilder queryBuilder = _client.from(tableName).select();
for (final condition in query.filters) {
queryBuilder = _applyFilter(queryBuilder, condition);
}
final response = await queryBuilder;
return response.map<T>((json) => fromMap(_toCamelCase(json))).toList();
}
@override
Future<void> update(T entity) async {
// The sync engine calls `update` for full-data updates.
// We can use `upsert` to handle both creating and replacing the entity.
// This is simpler and more robust than calculating a diff here.
final data = _toSnakeCase(entity.toDatumMap(target: MapTarget.remote));
data['user_id'] = entity.userId;
await _client.from(tableName).upsert(data, onConflict: 'id');
}
PostgrestFilterBuilder _applyFilter(
PostgrestFilterBuilder builder,
FilterCondition condition,
) {
if (condition is Filter) {
final field = condition.field.snakeCase;
final value = condition.value;
switch (condition.operator) {
case FilterOperator.equals:
return builder.eq(field, value);
case FilterOperator.notEquals:
return builder.neq(field, value);
case FilterOperator.lessThan:
return builder.lt(field, value);
case FilterOperator.lessThanOrEqual:
return builder.lte(field, value);
case FilterOperator.greaterThan:
return builder.gt(field, value);
case FilterOperator.greaterThanOrEqual:
return builder.gte(field, value);
case FilterOperator.arrayContains:
return builder.contains(field, value);
case FilterOperator.isIn:
return builder.inFilter(field, value as List);
default:
talker.warning('Unsupported query operator: ${condition.operator}');
}
} else if (condition is CompositeFilter) {
// Note: Supabase PostgREST builder doesn't directly support nested OR/AND
// in this fluent way. This is a simplified implementation. For complex
// nested logic, you might need to use `rpc` calls to database functions.
final filters = condition.conditions.map((c) {
// This is a simplified conversion and might not work for all cases.
return '${(c as Filter).field.snakeCase}.${(c).operator.name}.${c.value}';
}).join(',');
return builder.filter(condition.operator.name, 'any', filters);
}
return builder;
}
@override
Future<List<R>> fetchRelated<R extends DatumEntityInterface>(
RelationalDatumEntity parent,
String relationName,
RemoteAdapter<R> relatedAdapter,
) async {
final relatedSupabaseAdapter = relatedAdapter as SupabaseRemoteAdapter<R>;
final relatedTableName = relatedSupabaseAdapter.tableName;
PostgrestFilterBuilder queryBuilder =
_client.from(relatedTableName).select();
final relation = parent.relations[relationName];
switch (relation) {
case BelongsTo(:var foreignKey, :var localKey):
final relatedKeyValue = parent.toDatumMap()[localKey];
if (relatedKeyValue == null) {
return [];
}
queryBuilder = queryBuilder.eq(foreignKey.snakeCase, relatedKeyValue);
break;
case HasMany(:final foreignKey, :final localKey):
case HasOne(:final foreignKey, :final localKey):
// The foreign key is in the related table, pointing to the parent.
final foreignKeyColumn = foreignKey.snakeCase;
final localKeyValue = parent.toDatumMap()[localKey];
if (localKeyValue == null) {
return [];
}
queryBuilder = queryBuilder.eq(foreignKeyColumn, localKeyValue);
break;
case ManyToMany(
:final otherForeignKey,
:final otherLocalKey,
:final thisForeignKey,
:final thisLocalKey,
):
// Get the pivot table name from the pivot entity
final pivotAdapter = Datum.manager().remoteAdapter;
if (pivotAdapter is! SupabaseRemoteAdapter) {
throw ArgumentError('Pivot adapter must be a SupabaseRemoteAdapter');
}
final pivotTableName = pivotAdapter.tableName;
// Get parent ID
final parentIdValue = parent.toDatumMap()[thisLocalKey];
if (parentIdValue == null) {
return [];
}
// Query the junction table to find the IDs of related entities
final junctionRecords = await _client
.from(pivotTableName)
.select(otherForeignKey.snakeCase)
.eq(thisForeignKey.snakeCase, parentIdValue);
if (junctionRecords.isEmpty) {
return [];
}
// Extract the IDs of the related entities
final relatedIds = junctionRecords
.map((record) => record[otherForeignKey.snakeCase])
.whereType<String>()
.toList();
if (relatedIds.isEmpty) {
return [];
}
// Query the related table using the extracted IDs
queryBuilder =
queryBuilder.inFilter(otherLocalKey.snakeCase, relatedIds);
break;
case null:
throw ArgumentError(
'Relation "$relationName" not found for parent entity.');
}
final response = await queryBuilder;
return response
.map<R>((json) => relatedSupabaseAdapter.fromMap(_toCamelCase(json)))
.toList();
}
Stream<List<R>> watchRelated<R extends DatumEntityInterface>(
RelationalDatumEntity parent,
String relationName,
RemoteAdapter<R> relatedAdapter,
) {
final relatedSupabaseAdapter = relatedAdapter as SupabaseRemoteAdapter<R>;
final relatedTableName = relatedSupabaseAdapter.tableName;
final relation = parent.relations[relationName];
if (relation == null) {
throw ArgumentError(
'Relation "$relationName" not found for parent entity.');
}
// Create a stream controller for this relationship
late StreamController<List<R>> controller;
RealtimeChannel? channel;
Future<void> fetchAndEmit() async {
try {
final items =
await fetchRelated<R>(parent, relationName, relatedAdapter);
if (!controller.isClosed) {
controller.add(items);
}
} catch (e) {
if (!controller.isClosed) {
controller.addError(e);
}
}
}
void setupRealtimeSubscription() {
final channelName =
'related:$relatedTableName:${parent.id}:$relationName';
channel = _client.channel(channelName).onPostgresChanges(
event: PostgresChangeEvent.all,
schema: 'public',
table: relatedTableName,
callback: (payload) {
talker
.info('Related entity change detected in $relatedTableName');
// Re-fetch and emit updated data when changes occur
fetchAndEmit();
},
);
// For ManyToMany relationships, also watch the pivot table
if (relation is ManyToMany) {
final pivotAdapter = Datum.manager().remoteAdapter;
if (pivotAdapter is SupabaseRemoteAdapter) {
final pivotTableName = pivotAdapter.tableName;
final pivotChannelName =
'pivot:$pivotTableName:${parent.id}:$relationName';
final pivotChannel = _client
.channel(pivotChannelName)
.onPostgresChanges(
event: PostgresChangeEvent.all,
schema: 'public',
table: pivotTableName,
callback: (payload) {
talker.info('Pivot table change detected in $pivotTableName');
fetchAndEmit();
},
);
pivotChannel.subscribe();
_relatedChannels[pivotChannelName] = pivotChannel;
}
}
channel?.subscribe();
if (channel != null) {
_relatedChannels[channelName] = channel!;
}
}
controller = StreamController<List<R>>.broadcast(
onListen: () {
// Fetch initial data
fetchAndEmit();
// Setup realtime subscription
setupRealtimeSubscription();
},
onCancel: () async {
if (channel != null) {
await _client.removeChannel(channel!);
_relatedChannels
.remove('related:$relatedTableName:${parent.id}:$relationName');
}
// Clean up pivot channel if it exists
if (relation is ManyToMany) {
final pivotAdapter = Datum.manager().remoteAdapter;
if (pivotAdapter is SupabaseRemoteAdapter) {
final pivotTableName = pivotAdapter.tableName;
final pivotChannelName =
'pivot:$pivotTableName:${parent.id}:$relationName';
final pivotChannel = _relatedChannels[pivotChannelName];
if (pivotChannel != null) {
await _client.removeChannel(pivotChannel);
_relatedChannels.remove(pivotChannelName);
}
}
}
},
);
return controller.stream;
}
}
Map<String, dynamic> _toSnakeCase(Map<String, dynamic> map) {
final newMap = <String, dynamic>{};
map.forEach((key, value) {
newMap[key.snakeCase] = value;
});
return newMap;
}
Map<String, dynamic> _toCamelCase(Map<String, dynamic> map) {
final newMap = <String, dynamic>{};
map.forEach((key, value) {
newMap[key.camelCase] = value;
});
return newMap;
}
Usage Example#
// Create the adapter
final taskAdapter = SupabaseRemoteAdapter<Task>(
tableName: 'tasks',
fromMap: (map) => Task.fromMap(map),
);
// Register with Datum
final registrations = [
DatumRegistration<Task>(
localAdapter: HiveLocalAdapter<Task>(
boxName: 'tasks',
fromMap: (map) => Task.fromMap(map),
// Pass user change stream for reactive updates
userChangeStream: Datum.instance.userChangeStream,
),
remoteAdapter: taskAdapter,
),
];
Supabase RLS Policies#
Set up Row Level Security policies for your Supabase tables:
Tasks Table#
-- Enable RLS
ALTER TABLE tasks ENABLE ROW LEVEL SECURITY;
-- Users can only access their own tasks
CREATE POLICY "Users can view own tasks" ON tasks
FOR SELECT USING (auth.uid() = user_id);
CREATE POLICY "Users can insert own tasks" ON tasks
FOR INSERT WITH CHECK (auth.uid() = user_id);
CREATE POLICY "Users can update own tasks" ON tasks
FOR UPDATE USING (auth.uid() = user_id);
CREATE POLICY "Users can delete own tasks" ON tasks
FOR DELETE USING (auth.uid() = user_id);
Sync Metadata Table#
-- Enable RLS
ALTER TABLE sync_metadata ENABLE ROW LEVEL SECURITY;
-- Users can only access their own sync metadata
CREATE POLICY "Users can view own sync metadata" ON sync_metadata
FOR SELECT USING (auth.uid() = user_id);
CREATE POLICY "Users can insert own sync metadata" ON sync_metadata
FOR INSERT WITH CHECK (auth.uid() = user_id);
CREATE POLICY "Users can update own sync metadata" ON sync_metadata
FOR UPDATE USING (auth.uid() = user_id);
Database Schema#
Create the necessary tables in your Supabase database:
-- Tasks table
CREATE TABLE tasks (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT,
completed BOOLEAN DEFAULT FALSE,
user_id UUID NOT NULL REFERENCES auth.users(id) ON DELETE CASCADE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
modified_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Sync metadata table
CREATE TABLE sync_metadata (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
user_id UUID NOT NULL REFERENCES auth.users(id) ON DELETE CASCADE UNIQUE,
last_sync_time TIMESTAMP WITH TIME ZONE,
schema_version INTEGER DEFAULT 1,
device_id TEXT,
total_pending_changes INTEGER DEFAULT 0,
conflict_count INTEGER DEFAULT 0,
sync_status TEXT DEFAULT 'never_synced',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Create indexes for better performance
CREATE INDEX idx_tasks_user_id ON tasks(user_id);
CREATE INDEX idx_tasks_modified_at ON tasks(modified_at);
CREATE INDEX idx_sync_metadata_user_id ON sync_metadata(user_id);
-- Enable Row Level Security
ALTER TABLE tasks ENABLE ROW LEVEL SECURITY;
ALTER TABLE sync_metadata ENABLE ROW LEVEL SECURITY;