StreamController
Understand how to create and manage streams manually using StreamController in Dart.
What is it?
StreamController is a class that provides manual control over a stream. It allows you to create a stream and add events (data, errors, and completion) to it programmatically, giving you full control over the stream's lifecycle.
Why does it exist?
StreamController exists to:
- Create streams manually
- Add events programmatically
- Control stream lifecycle
- Implement custom stream sources
- Handle complex stream patterns
- Create broadcast streams
Basic StreamController
Creating a StreamController
A StreamController provides a stream that you can control. You add events to the controller, and they are delivered to listeners.
import 'dart:async';
void main() {
// Create a StreamController
StreamController<String> controller = StreamController<String>();
// Listen to the stream
controller.stream.listen(
(data) => print('Received: $data'),
onError: (error) => print('Error: $error'),
onDone: () => print('Stream closed'),
);
// Add events
controller.add('First message');
controller.add('Second message');
controller.add('Third message');
// Add an error
controller.addError('Something went wrong');
// Close the stream
controller.close();
}
// Output:
// Received: First message
// Received: Second message
// Received: Third message
// Error: Something went wrong
// Stream closed
What's happening here? -
StreamControllercreates a new stream -add()pushes data events into the stream -addError()pushes an error into the stream -close()ends the stream and callsonDone- Listeners receive events as they're added
Single Subscription Controller
Single subscription controllers are the default type. They support only one listener and are suitable for event sequences that are consumed once.
StreamController<int> controller = StreamController<int>();
// Add events before listening
controller.add(1);
controller.add(2);
// Listen (only one listener allowed)
controller.stream.listen(
(data) => print('Data: $data'),
onDone: () => print('Done'),
);
controller.add(3);
controller.add(4);
controller.close();
// Output:
// Data: 1
// Data: 2
// Data: 3
// Data: 4
// Done
Key insights: - Default controller type (single subscription) - Only one listener is allowed - Events are buffered before listening - Perfect for one-time event sequences
Broadcast Controllers
Creating Broadcast Streams
Broadcast controllers allow multiple listeners to receive events simultaneously. They're useful for sharing events across multiple parts of your application.
import 'dart:async';
void main() {
// Broadcast StreamController
StreamController<int> controller = StreamController<int>.broadcast();
// Add initial events
controller.add(1);
controller.add(2);
// First listener
controller.stream.listen(
(data) => print('Listener 1: $data'),
onDone: () => print('Listener 1 done'),
);
// Second listener
controller.stream.listen(
(data) => print('Listener 2: $data'),
onDone: () => print('Listener 2 done'),
);
// Add more events
controller.add(3);
controller.add(4);
controller.close();
}
// Output:
// Listener 1: 3
// Listener 2: 3
// Listener 1: 4
// Listener 2: 4
// Listener 1 done
// Listener 2 done
What's happening here? -
broadcast()creates a broadcast controller - Multiple listeners can subscribe - Each listener receives all events - Events after subscription are received - Events before subscription are not received
Broadcast vs Single Subscription
// Single subscription (default)
void singleSubscriptionExample() {
var controller = StreamController<int>();
controller.stream.listen((data) => print('1: $data'));
// This would cause an error
// controller.stream.listen((data) => print('2: $data')); // Error!
controller.add(42);
controller.close();
}
// Broadcast (multiple listeners)
void broadcastExample() {
var controller = StreamController<int>.broadcast();
controller.stream.listen((data) => print('Listener 1: $data'));
controller.stream.listen((data) => print('Listener 2: $data'));
controller.add(42); // Both listeners receive this
controller.close();
}
Key insights: - Single subscription = one listener only - Broadcast = multiple listeners allowed - Choose based on your use case - Broadcast is more common in UI applications
Advanced Features
StreamController with Transformer
StreamController can be used with transformers to modify events before they reach listeners.
import 'dart:async';
void main() {
// Create controller
var controller = StreamController<int>();
// Transform the stream
var transformed = controller.stream.map((value) => value * 2);
// Listen to transformed stream
transformed.listen((data) => print('Transformed: $data'));
// Add events
controller.add(1); // -> Transformed: 2
controller.add(2); // -> Transformed: 4
controller.add(3); // -> Transformed: 6
controller.close();
}
StreamController with Backpressure
StreamController provides backpressure handling through the
onListen,onPause,onResumecallbacks.
void backpressureExample() {
var controller = StreamController<int>(
onListen: () => print('Listener attached'),
onPause: () => print('Listener paused'),
onResume: () => print('Listener resumed'),
onCancel: () => print('Listener cancelled'),
);
var subscription = controller.stream.listen(
(data) => print('Data: $data'),
onDone: () => print('Done'),
);
controller.add(1);
controller.add(2);
// Pause the subscription
subscription.pause();
print('Paused');
controller.add(3); // Buffered, not delivered yet
controller.add(4); // Buffered, not delivered yet
// Resume the subscription
subscription.resume();
print('Resumed');
controller.add(5);
controller.close();
}
// Output:
// Listener attached
// Data: 1
// Data: 2
// Paused
// Resumed
// Data: 3
// Data: 4
// Data: 5
// Done
// Listener cancelled
What's happening here? -
onListenfires when listener attaches -onPausefires when subscription pauses -onResumefires when subscription resumes - Events are buffered during pause -onCancelfires when listener cancels
Real-World Examples
UI State Management
StreamController is commonly used for state management in UI applications.
class CounterState {
final int count;
CounterState(this.count);
}
class CounterBloc {
final StreamController<CounterState> _stateController =
StreamController<CounterState>();
final StreamController<void> _incrementController =
StreamController<void>();
int _count = 0;
CounterBloc() {
// Listen to increment events
_incrementController.stream.listen((_) {
_count++;
_stateController.add(CounterState(_count));
});
}
// Stream of states
Stream<CounterState> get state => _stateController.stream;
// Sink for increment events
Sink<void> get increment => _incrementController.sink;
void dispose() {
_stateController.close();
_incrementController.close();
}
}
void main() async {
var bloc = CounterBloc();
// Listen to state changes
bloc.state.listen((state) {
print('Count: ${state.count}');
});
// Trigger increments
bloc.increment.add(null); // Count: 1
bloc.increment.add(null); // Count: 2
bloc.increment.add(null); // Count: 3
bloc.dispose();
}
Key insights: - BLoC pattern uses StreamController - Separate streams for events and states - Sink for adding events - Stream for receiving states - Clean disposal of controllers
Event Bus
StreamController is great for implementing event bus patterns.
class EventBus {
static final EventBus _instance = EventBus._internal();
factory EventBus() => _instance;
EventBus._internal();
final Map<Type, StreamController> _controllers = {};
// Register event type
StreamController<T> _getController<T>() {
return _controllers.putIfAbsent(
T,
() => StreamController<T>.broadcast(),
) as StreamController<T>;
}
// Emit an event
void emit<T>(T event) {
_getController<T>().add(event);
}
// Listen to events
Stream<T> on<T>() {
return _getController<T>().stream as Stream<T>;
}
// Dispose all controllers
void dispose() {
for (var controller in _controllers.values) {
controller.close();
}
_controllers.clear();
}
}
// Event classes
class UserLoggedInEvent {
final String userId;
UserLoggedInEvent(this.userId);
}
class UserLoggedOutEvent {
final String userId;
UserLoggedOutEvent(this.userId);
}
void main() {
var eventBus = EventBus();
// Listen for events
eventBus.on<UserLoggedInEvent>().listen((event) {
print('User logged in: ${event.userId}');
});
eventBus.on<UserLoggedOutEvent>().listen((event) {
print('User logged out: ${event.userId}');
});
// Emit events
eventBus.emit(UserLoggedInEvent('user123'));
eventBus.emit(UserLoggedOutEvent('user123'));
eventBus.dispose();
}
// Output:
// User logged in: user123
// User logged out: user123
Best Practices
Always Close Controllers
// Good: Close controller when done
class DataService {
final StreamController<String> _controller = StreamController();
Stream<String> get data => _controller.stream;
void dispose() {
_controller.close(); // Always close!
}
}
// Bad: Not closing
class BadDataService {
final StreamController<String> _controller = StreamController();
Stream<String> get data => _controller.stream;
// Memory leak!
}
Use Sink for Input
// Good: Use Sink for adding events
class CounterBloc {
final StreamController<int> _controller = StreamController();
// Sink for adding events
Sink<int> get add => _controller.sink;
// Stream for listening
Stream<int> get stream => _controller.stream;
}
// Bad: Exposing controller directly
class BadBloc {
final StreamController<int> controller = StreamController();
// Exposed too much
}
Common Mistakes
Closing Controller While Adding
Wrong:
var controller = StreamController();
controller.close();
controller.add('data'); // Error! Controller closed
Correct:
var controller = StreamController();
controller.add('data');
controller.close(); // Close after adding
Memory Leaks
Wrong:
class MyWidget {
final StreamController controller = StreamController();
// Controller never closed!
}
Correct:
class MyWidget {
StreamController controller = StreamController();
void dispose() {
controller.close();
}
}
Summary
StreamController provides manual control over streams, allowing you to add events, handle errors, manage listeners, and control the stream lifecycle.
Next Steps
Now that you understand StreamController, continue to:
Did You Know?
- StreamController can be paused and resumed
- Broadcast controllers support multiple listeners
- Controllers should always be closed
sinkis used for adding eventsstreamis used for listening- Controllers support backpressure handling
- StreamController is used in BLoC pattern
Next up, bro! async* (Async Generators)! 🚀