Batch Operations
Advanced batch processing for multiple database operations
What is it?
Batch Operations allow you to group multiple database operations (inserts, updates, deletes) into a single transaction. This ensures atomicity, improves performance, and simplifies complex data manipulations. Drift provides a powerful batch API that supports mixed operations and returning results.
Think of Batch Operations like "mass editing" – instead of updating one document at a time, you can select multiple documents and apply changes to all of them simultaneously.
// 👇 Batch multiple operations
await into(users).batch((batch) {
// Insert new users
batch.insert(UsersCompanion.insert(name: 'User 1', email: 'user1@example.com'));
batch.insert(UsersCompanion.insert(name: 'User 2', email: 'user2@example.com'));
// Update existing users
batch.update(
users,
UsersCompanion(isActive: Value(false)),
(u) => u.isActive.equals(true),
);
// Delete old users
batch.delete(users, (u) => u.createdAt < const Variable(DateTime(2023, 1, 1)));
});
// All operations execute in a single transaction:
// BEGIN TRANSACTION;
// INSERT INTO users ...
// INSERT INTO users ...
// UPDATE users SET is_active = 0 WHERE is_active = 1;
// DELETE FROM users WHERE created_at < '2023-01-01';
// COMMIT;
What's happening here? - Single Transaction – All operations in one transaction - Mixed Operations – Inserts, updates, deletes together - Atomic – All succeed or all fail - Performance – One round-trip to the database
Why does it exist?
- Atomic Operations – All or nothing execution
- Performance – One database round-trip
- Data Integrity – Operations complete together
- Complex Updates – Multiple related changes
- Batch Processing – Process many records efficiently
- Error Handling – Rollback on any failure
Basic Batch Operations
Simple batch patterns
Batch Insert Only
// 👇 Batch insert multiple records
await into(users).batch((batch) {
batch.insert(
UsersCompanion.insert(
name: 'User 1',
email: 'user1@example.com',
),
);
batch.insert(
UsersCompanion.insert(
name: 'User 2',
email: 'user2@example.com',
),
);
batch.insert(
UsersCompanion.insert(
name: 'User 3',
email: 'user3@example.com',
),
);
});
Batch Update Only
// 👇 Batch update multiple records
await into(users).batch((batch) {
// Deactivate all inactive users
batch.update(
users,
UsersCompanion(
isActive: Value(false),
updatedAt: Value(DateTime.now()),
),
(u) => u.isActive.equals(false),
);
// Update status for old users
batch.update(
users,
UsersCompanion(
status: Value('senior'),
updatedAt: Value(DateTime.now()),
),
(u) => u.age > const Variable(60),
);
});
Batch Delete Only
// 👇 Batch delete multiple records
await into(users).batch((batch) {
// Delete inactive users older than 1 year
batch.delete(
users,
(u) =>
u.isActive.equals(false) &
u.createdAt < const Variable(
DateTime.now().subtract(Duration(days: 365))
),
);
// Delete unverified users
batch.delete(
users,
(u) =>
u.isVerified.equals(false) &
u.createdAt < const Variable(
DateTime.now().subtract(Duration(days: 30))
),
);
});
Advanced Batch Patterns
Complex batch operations
Pattern 1: Mixed Operations in Batch
// 👇 Insert, update, and delete in one batch
Future<void> syncUsersWithBatch(List<UserData> users) async {
final existingIds = users.map((u) => u.id).toList();
await into(users).batch((batch) {
// 1️⃣ Insert new users
for (final user in users.where((u) => u.isNew)) {
batch.insert(
UsersCompanion.insert(
name: user.name,
email: user.email,
age: Value(user.age),
),
);
}
// 2️⃣ Update existing users
for (final user in users.where((u) => !u.isNew && u.isUpdated)) {
batch.update(
users,
UsersCompanion(
name: Value(user.name),
age: Value(user.age),
updatedAt: Value(DateTime.now()),
),
(u) => u.id.equals(user.id),
);
}
// 3️⃣ Delete users not in the list
batch.delete(
users,
(u) => u.id.isIn(existingIds).not(),
);
});
}
Pattern 2: Batch with Returning
// 👇 Batch with returning results
final results = await into(users).batchReturning(
(batch) {
batch.insert(
UsersCompanion.insert(name: 'User 1', email: 'user1@example.com'),
);
batch.insert(
UsersCompanion.insert(name: 'User 2', email: 'user2@example.com'),
);
batch.update(
users,
UsersCompanion(status: Value('active')),
(u) => u.isActive.equals(true),
);
},
returning: (u) => [u.id, u.name],
);
for (final result in results) {
print('Affected: ID ${result[0]}, Name ${result[1]}');
}
Pattern 3: Conditional Batch Operations
// 👇 Batch with conditions
Future<void> conditionalBatchUpdate({
required String status,
int? minAge,
int? maxAge,
bool? isActive,
}) async {
await into(users).batch((batch) {
final query = users.select()..where((u) {
var conditions = Expression<bool>.fromConstant(true);
if (minAge != null) {
conditions = conditions & (u.age > const Variable(minAge));
}
if (maxAge != null) {
conditions = conditions & (u.age < const Variable(maxAge));
}
if (isActive != null) {
conditions = conditions & (u.isActive.equals(isActive));
}
return conditions;
});
batch.update(
users,
UsersCompanion(
status: Value(status),
updatedAt: Value(DateTime.now()),
),
query.where,
);
});
}
Pattern 4: Bulk Import with Validation
// 👇 Batch import with validation
class BatchImporter {
final AppDatabase db;
BatchImporter(this.db);
Future<ImportResult> importUsers(List<UserData> users) async {
var inserted = 0;
var updated = 0;
final errors = <String>[];
try {
await db.transaction(() async {
for (final user in users) {
try {
// Validate user data
_validateUser(user);
// Check if user exists
final existing = await (db.select(db.users)
..where((u) => u.email.equals(user.email)))
.getSingleOrNull();
if (existing == null) {
// Insert new user
await db.into(db.users).insert(
UsersCompanion.insert(
name: user.name,
email: user.email,
age: Value(user.age),
),
);
inserted++;
} else {
// Update existing user
await (db.update(db.users)..where((u) => u.id.equals(existing.id)))
.write(UsersCompanion(
name: Value(user.name),
age: Value(user.age),
updatedAt: Value(DateTime.now()),
));
updated++;
}
} catch (e) {
errors.add('User ${user.email}: $e');
}
}
});
return ImportResult(
inserted: inserted,
updated: updated,
errors: errors,
);
} catch (e) {
return ImportResult(
inserted: inserted,
updated: updated,
errors: [...errors, 'Transaction failed: $e'],
);
}
}
void _validateUser(UserData user) {
if (user.name.isEmpty) {
throw Exception('Name is required');
}
if (!user.email.contains('@')) {
throw Exception('Invalid email format');
}
if (user.age != null && (user.age! < 0 || user.age! > 150)) {
throw Exception('Invalid age');
}
}
}
class ImportResult {
final int inserted;
final int updated;
final List<String> errors;
ImportResult({
required this.inserted,
required this.updated,
this.errors = const [],
});
bool get hasErrors => errors.isNotEmpty;
int get total => inserted + updated;
String get summary => 'Inserted: $inserted, Updated: $updated, Errors: ${errors.length}';
}
Pattern 5: Chunked Batch Processing
// 👇 Process large datasets in chunks
class ChunkedBatchProcessor {
final AppDatabase db;
final int chunkSize;
ChunkedBatchProcessor({
required this.db,
this.chunkSize = 1000,
});
Future<ChunkedProcessResult> processUsers(
List<UserData> users,
Function(int, int) onProgress,
) async {
final total = users.length;
var processed = 0;
var inserted = 0;
var updated = 0;
final errors = <String>[];
for (var i = 0; i < users.length; i += chunkSize) {
final chunk = users.sublist(
i,
i + chunkSize > users.length ? users.length : i + chunkSize,
);
try {
await db.transaction(() async {
for (final user in chunk) {
try {
// Process each user in chunk
final existing = await (db.select(db.users)
..where((u) => u.email.equals(user.email)))
.getSingleOrNull();
if (existing == null) {
await db.into(db.users).insert(
UsersCompanion.insert(
name: user.name,
email: user.email,
age: Value(user.age),
),
);
inserted++;
} else {
await (db.update(db.users)..where((u) => u.id.equals(existing.id)))
.write(UsersCompanion(
name: Value(user.name),
age: Value(user.age),
updatedAt: Value(DateTime.now()),
));
updated++;
}
} catch (e) {
errors.add('User ${user.email}: $e');
}
}
});
} catch (e) {
errors.add('Chunk ${i ~/ chunkSize + 1}: Transaction failed - $e');
}
processed += chunk.length;
onProgress(processed, total);
}
return ChunkedProcessResult(
inserted: inserted,
updated: updated,
errors: errors,
totalProcessed: processed,
totalUsers: total,
);
}
}
class ChunkedProcessResult {
final int inserted;
final int updated;
final List<String> errors;
final int totalProcessed;
final int totalUsers;
ChunkedProcessResult({
required this.inserted,
required this.updated,
this.errors = const [],
required this.totalProcessed,
required this.totalUsers,
});
bool get hasErrors => errors.isNotEmpty;
bool get isComplete => totalProcessed == totalUsers;
String get summary => '✅ Inserted: $inserted, Updated: $updated, Errors: ${errors.length}';
}
Real-World Example
Complete e-commerce batch system
// lib/database/batch_service.dart
import 'package:drift/drift.dart';
class BatchService {
final AppDatabase db;
BatchService(this.db);
// ==================== PRODUCT BATCH OPERATIONS ====================
// 👇 Bulk product import
Future<BatchResult> bulkImportProducts(List<ProductData> products) async {
var imported = 0;
final errors = <String>[];
await db.transaction(() async {
for (final product in products) {
try {
// Validate
if (product.sku.isEmpty) {
throw Exception('SKU required for ${product.name}');
}
if (product.price < 0) {
throw Exception('Invalid price for ${product.name}');
}
// Check if exists
final existing = await (db.select(db.products)
..where((p) => p.sku.equals(product.sku)))
.getSingleOrNull();
if (existing == null) {
await db.into(db.products).insert(
ProductsCompanion.insert(
sku: product.sku,
name: product.name,
price: product.price,
description: Value(product.description),
stock: Value(product.stock),
isActive: Value(product.isActive),
),
);
imported++;
} else {
await (db.update(db.products)..where((p) => p.id.equals(existing.id)))
.write(ProductsCompanion(
name: Value(product.name),
price: Value(product.price),
description: Value(product.description),
stock: Value(product.stock),
isActive: Value(product.isActive),
updatedAt: Value(DateTime.now()),
));
imported++;
}
} catch (e) {
errors.add('${product.sku}: $e');
}
}
});
return BatchResult(
processed: imported,
errors: errors,
total: products.length,
);
}
// 👇 Bulk price update
Future<BatchResult> bulkUpdatePrices(
List<PriceUpdateData> updates,
) async {
var updated = 0;
final errors = <String>[];
await db.transaction(() async {
for (final update in updates) {
try {
final result = await (db.update(db.products)
..where((p) => p.id.equals(update.productId)))
.write(ProductsCompanion(
price: Value(update.newPrice),
updatedAt: Value(DateTime.now()),
));
if (result > 0) {
updated++;
}
} catch (e) {
errors.add('Product ${update.productId}: $e');
}
}
});
return BatchResult(
processed: updated,
errors: errors,
total: updates.length,
);
}
// 👇 Bulk stock adjustment
Future<BatchResult> bulkAdjustStock(List<StockAdjustmentData> adjustments) async {
var adjusted = 0;
final errors = <String>[];
await db.transaction(() async {
for (final adj in adjustments) {
try {
// Get current product
final product = await (db.select(db.products)
..where((p) => p.id.equals(adj.productId)))
.getSingle();
final newStock = product.stock + adj.adjustment;
if (newStock < 0) {
throw Exception('Insufficient stock');
}
await (db.update(db.products)..where((p) => p.id.equals(product.id)))
.write(ProductsCompanion(
stock: Value(newStock),
updatedAt: Value(DateTime.now()),
));
adjusted++;
} catch (e) {
errors.add('Product ${adj.productId}: $e');
}
}
});
return BatchResult(
processed: adjusted,
errors: errors,
total: adjustments.length,
);
}
// ==================== ORDER BATCH OPERATIONS ====================
// 👇 Bulk order status update
Future<BatchResult> bulkUpdateOrderStatus(
List<OrderStatusUpdateData> updates,
) async {
var updated = 0;
final errors = <String>[];
await db.transaction(() async {
for (final update in updates) {
try {
final result = await (db.update(db.orders)
..where((o) => o.id.equals(update.orderId)))
.write(OrdersCompanion(
status: Value(update.newStatus),
shippedDate: update.newStatus == 'shipped'
? Value(DateTime.now())
: const Value.absent(),
deliveredDate: update.newStatus == 'delivered'
? Value(DateTime.now())
: const Value.absent(),
updatedAt: Value(DateTime.now()),
));
if (result > 0) {
updated++;
}
} catch (e) {
errors.add('Order ${update.orderId}: $e');
}
}
});
return BatchResult(
processed: updated,
errors: errors,
total: updates.length,
);
}
// 👇 Bulk order cancellation with stock restoration
Future<BatchResult> bulkCancelOrders(List<int> orderIds) async {
var cancelled = 0;
final errors = <String>[];
await db.transaction(() async {
for (final orderId in orderIds) {
try {
// Get order items
final items = await (db.select(db.orderItems)
..where((i) => i.orderId.equals(orderId)))
.get();
// Restore stock
for (final item in items) {
final product = await (db.select(db.products)
..where((p) => p.id.equals(item.productId)))
.getSingle();
await (db.update(db.products)..where((p) => p.id.equals(product.id)))
.write(ProductsCompanion(
stock: Value(product.stock + item.quantity),
updatedAt: Value(DateTime.now()),
));
}
// Cancel order
await (db.update(db.orders)..where((o) => o.id.equals(orderId)))
.write(OrdersCompanion(
status: Value('cancelled'),
isPaid: const Value(false),
paymentStatus: Value('refunded'),
updatedAt: Value(DateTime.now()),
));
cancelled++;
} catch (e) {
errors.add('Order $orderId: $e');
}
}
});
return BatchResult(
processed: cancelled,
errors: errors,
total: orderIds.length,
);
}
// ==================== USER BATCH OPERATIONS ====================
// 👇 Bulk user status update
Future<BatchResult> bulkUpdateUserStatus(
List<UserStatusUpdateData> updates,
) async {
var updated = 0;
final errors = <String>[];
await db.transaction(() async {
for (final update in updates) {
try {
final result = await (db.update(db.users)
..where((u) => u.id.equals(update.userId)))
.write(UsersCompanion(
isActive: Value(update.isActive),
updatedAt: Value(DateTime.now()),
));
if (result > 0) {
updated++;
}
} catch (e) {
errors.add('User ${update.userId}: $e');
}
}
});
return BatchResult(
processed: updated,
errors: errors,
total: updates.length,
);
}
// 👇 Bulk user verification
Future<BatchResult> bulkVerifyUsers(List<int> userIds) async {
var verified = 0;
final errors = <String>[];
await db.transaction(() async {
for (final userId in userIds) {
try {
final result = await (db.update(db.users)
..where((u) => u.id.equals(userId)))
.write(UsersCompanion(
isVerified: const Value(true),
updatedAt: Value(DateTime.now()),
));
if (result > 0) {
verified++;
}
} catch (e) {
errors.add('User $userId: $e');
}
}
});
return BatchResult(
processed: verified,
errors: errors,
total: userIds.length,
);
}
// ==================== DATA CLEANUP BATCHES ====================
// 👇 Archive old data
Future<BatchResult> archiveOldData({
Duration? olderThan,
bool includeUsers = false,
bool includeOrders = false,
}) async {
var archived = 0;
final errors = <String>[];
final cutoffDate = DateTime.now().subtract(olderThan ?? Duration(days: 365));
await db.transaction(() async {
// Archive old orders
if (includeOrders) {
try {
final orders = await (db.select(db.orders)
..where((o) =>
o.status.equals('delivered') |
o.status.equals('cancelled')
)
..where((o) => o.orderDate < const Variable(cutoffDate)))
.get();
for (final order in orders) {
// Archive order items first
final items = await (db.select(db.orderItems)
..where((i) => i.orderId.equals(order.id)))
.get();
for (final item in items) {
await db.into(db.archivedOrderItems).insert(
ArchivedOrderItemsCompanion.insert(
orderId: item.orderId,
productId: item.productId,
quantity: item.quantity,
unitPrice: item.unitPrice,
subtotal: item.subtotal,
total: item.total,
archivedAt: DateTime.now(),
),
);
}
await (db.delete(db.orderItems)
..where((i) => i.orderId.equals(order.id)))
.go();
// Archive order
await db.into(db.archivedOrders).insert(
ArchivedOrdersCompanion.insert(
orderNumber: order.orderNumber,
userId: order.userId,
total: order.total,
status: order.status,
orderDate: order.orderDate,
archivedAt: DateTime.now(),
),
);
await (db.delete(db.orders)
..where((o) => o.id.equals(order.id)))
.go();
archived++;
}
} catch (e) {
errors.add('Order archiving: $e');
}
}
// Archive old users
if (includeUsers) {
try {
final users = await (db.select(db.users)
..where((u) => u.isDeleted.equals(true))
..where((u) => u.deletedAt < const Variable(cutoffDate)))
.get();
for (final user in users) {
await db.into(db.archivedUsers).insert(
ArchivedUsersCompanion.insert(
username: user.username,
email: user.email,
fullName: Value(user.fullName),
archivedAt: DateTime.now(),
),
);
await (db.delete(db.users)
..where((u) => u.id.equals(user.id)))
.go();
archived++;
}
} catch (e) {
errors.add('User archiving: $e');
}
}
});
return BatchResult(
processed: archived,
errors: errors,
total: 0, // Not tracked for cleanup operations
);
}
// ==================== PERFORMANCE OPTIMIZATION ====================
// 👇 Use batch for large operations
Future<BatchResult> optimizeBatchOperation(
List<Operation> operations,
) async {
var processed = 0;
final errors = <String>[];
await db.transaction(() async {
for (final op in operations) {
try {
switch (op.type) {
case OperationType.insert:
await db.into(db.users).insert(op.companion);
break;
case OperationType.update:
await (db.update(db.users)..where(op.where))
.write(op.companion);
break;
case OperationType.delete:
await (db.delete(db.users)..where(op.where)).go();
break;
}
processed++;
} catch (e) {
errors.add('Operation $processed: $e');
}
}
});
return BatchResult(
processed: processed,
errors: errors,
total: operations.length,
);
}
}
// ==================== DATA CLASSES ====================
class BatchResult {
final int processed;
final List<String> errors;
final int total;
BatchResult({
required this.processed,
this.errors = const [],
required this.total,
});
bool get hasErrors => errors.isNotEmpty;
double get successRate => total > 0 ? processed / total : 0;
String get summary {
var summary = '✅ $processed processed';
if (total > 0) {
summary += ' of $total (${(successRate * 100).toStringAsFixed(0)}%)';
}
if (hasErrors) {
summary += '\n❌ ${errors.length} errors: ${errors.join(', ')}';
}
return summary;
}
}
class ProductData {
final String sku;
final String name;
final double price;
final String? description;
final int stock;
final bool isActive;
ProductData({
required this.sku,
required this.name,
required this.price,
this.description,
this.stock = 0,
this.isActive = true,
});
}
class PriceUpdateData {
final int productId;
final double newPrice;
PriceUpdateData({required this.productId, required this.newPrice});
}
class StockAdjustmentData {
final int productId;
final int adjustment;
StockAdjustmentData({required this.productId, required this.adjustment});
}
class OrderStatusUpdateData {
final int orderId;
final String newStatus;
OrderStatusUpdateData({required this.orderId, required this.newStatus});
}
class UserStatusUpdateData {
final int userId;
final bool isActive;
UserStatusUpdateData({required this.userId, required this.isActive});
}
enum OperationType { insert, update, delete }
class Operation {
final OperationType type;
final Insertable? companion;
final Expression<bool> Function(dynamic)? where;
Operation.insert(this.companion)
: type = OperationType.insert,
where = null;
Operation.update(this.where, this.companion)
: type = OperationType.update;
Operation.delete(this.where)
: type = OperationType.delete,
companion = null;
}
Best Practices
- Use batch for 10+ operations – Better performance
- Keep batch sizes manageable – 100-1000 operations
- Use transactions – All or nothing
- Validate before batch – Catch errors early
- Handle errors gracefully – Log and continue
- Chunk large datasets – Process in smaller batches
- Use returning when needed – Get results back
- Test batch operations – Verify atomicity
Common Mistakes
Mistake 1: Huge batch sizes
Wrong:
// 🚫 100,000 operations in one batch (memory issues)
await db.batch((batch) {
for (final item in hugeList) {
batch.insert(item);
}
});
Correct:
// ✅ Chunk into manageable sizes
const chunkSize = 1000;
for (var i = 0; i < hugeList.length; i += chunkSize) {
final chunk = hugeList.sublist(i, min(i + chunkSize, hugeList.length));
await db.batch((batch) {
for (final item in chunk) {
batch.insert(item);
}
});
}
Mistake 2: Not handling errors in batch
Wrong:
// 🚫 One error fails entire batch
await db.batch((batch) {
for (final user in users) {
batch.insert(user); // One invalid user fails everything
}
});
Correct:
// ✅ Validate before batch
for (final user in users) {
if (!isValid(user)) {
throw Exception('Invalid user: ${user.email}');
}
}
await db.batch((batch) {
for (final user in users) {
batch.insert(user);
}
});
Summary
| Feature | Purpose | Best For |
|---|---|---|
| Batch | Multiple operations | Bulk updates |
| Transaction | Atomic operations | Related changes |
| Returning | Get results | Audit trails |
| Chunking | Large datasets | Performance |
Next Steps
Now you understand batch operations, let's dive deeper:
- Transactions – Transaction management
- Streams – Reactive queries
- Performance – Performance optimization
Did You Know?
-
Batch operations are 10-100x faster – For bulk operations
-
Batch operations are atomic – All or nothing
-
Batch uses one transaction – Under the hood
-
Batch supports mixed operations – Insert, update, delete
-
Batch can return results – With batchReturning
-
Chunking prevents memory issues – For large datasets
-
Batch operations are transaction-safe – Rollback on failure
-
Batch is the most efficient way – To process bulk data