Imagine the following scenario: Monday, 9 AM. You grab your coffee, join the stand-up call, and suddenly realise: you have absolutely no idea what you did last week. The weekend wiped your mental cache clean. Was it the React component refactor? Or was that two weeks ago? You vaguely remember fighting with TypeScript errors on Thursday, but what was the actual solution? Your Git commits say 'fix: update logic' — thanks, past you, very helpful.
Here's the thing: your brain isn't built to be a perfect activity log. But your computer? It remembers everything. That's where PiecesOS comes in.
The goal
Create an app that tracks what work is done every day.

For Part 1, we will work on:
Connecting to PiecesOS
Getting our workstream summaries
Grouping the summaries by day
Keeping everything updated in real-time
We’ll hit a UI later on. Let’s take this one step at a time!
Prerequisites
Dart SDK version 3.8.1 or higher (flutter.dev)
Gemini API key (will mention that in part 2)
Setting up
Let’s begin by creating a new flutter app.
flutter create daily_recap_app
Before you can start using the Pieces SDK, you’ll need to add it to your Dart or Flutter project. Open your project’s pubspec.yaml file and add the following under ‘dependencies:’:
dependencies:
# The Pieces SDK
pieces_os_client:
git:
url: https:
web_socket_channel: ^3.0.1The SDK is automatically generated from our API setup, so it’s mostly made up of code that helps your app talk to Pieces’ api.
We also added the web_socket_channel dependency to connect to the Pieces web-socket.
Step 1: Connecting to PiecesOS
Alright, first challenge – how do we even talk to PiecesOS?
Let's first create a new file lib/services/pieces_os_service.dart
Nothing fancy – just setting up our API clients.
Now the fun part – actually connecting:
Future<Application> connectApplication() async {
final seededApp = SeededTrackedApplication(
name: ApplicationNameEnum.OPEN_SOURCE,
platform: Platform.operatingSystem == "macos"
? PlatformEnum.MACOS
: PlatformEnum.WINDOWS,
version: "0.0.1",
);
final connection = SeededConnectorConnection(
application: seededApp,
);
_context = await _connectorApi.connect(
seededConnectorConnection: connection,
);
print('Successfully connected to Pieces OS!');
return _context!.application;
}Basically, we're saying, "Hey Pieces, I'm a new app, let me in!" and it gives us back a context with our application info.
Step 2: The WebSocket
This websocket specifically sends us all of the IDs for the users' workstream summaries in Pieces upon first connection. After, it sends only the IDs of either newly created summaries, summaries that have updated data, or summaries that were deleted.
Here's the setup:
void _startWebSocketListener() {
final wsUrl = '$websocketUrl/workstream_summaries/stream/identifiers';
_wsChannel = WebSocketChannel.connect(Uri.parse(wsUrl));
print('WebSocket connected to $wsUrl');
_wsSubscription = _wsChannel!.stream.listen(
(message) async {
try {
final streamedIdentifiers = StreamedIdentifiers.fromJson(
jsonDecode(message),
);
for (final id in streamedIdentifiers!.iterable) {
final summaryId = id.workstreamSummary?.id;
if (summaryId != null) {
await _fetchAndCacheSummary(summaryId);
}
}
} catch (e) {
print('Error processing message: $e');
}
},
onDone: () {
print('WebSocket closed, reconnecting...');
_reconnectWebSocket();
},
onError: (error) {
print('WebSocket error: $error');
_reconnectWebSocket();
},
);
}Here's the beauty of this approach: The WebSocket just keeps until their app shuts down or they manually close it, sending us identifiers. When we first connect, it dumps ALL existing identifiers on us (could be from a year ago!) and stays open to send us new ones as they're created in real-time. One connection handles everything!
Step 3: Fetching and caching summaries
Okay, so we have identifiers. Now what? We need to actually fetch the summary data and organize it.
Here's what a WorkstreamSummary looks like (the important parts anyway):
class WorkstreamSummary {
String id;
String? name;
GroupedTimestamp created;
GroupedTimestamp? updated;
}And GroupedTimestamp is basically:
class GroupedTimestamp {
DateTime value;
}So when we fetch a summary, we need to:
Get the created date
Strip the time from the date
Add the summary to that day's list
Sort everything by time
Here's the code:
Future<void> _fetchAndCacheSummary(String identifier) async {
final summary = await _workstreamSummaryApi
.workstreamSummariesSpecificWorkstreamSummarySnapshot(identifier);
if (summary == null) {
return;
}
final createdDate = summary.created.value;
final dayKey = DateTime(
createdDate.year,
createdDate.month,
createdDate.day,
);
_summariesByDay.putIfAbsent(dayKey, () => []);
final existingIndex = _summariesByDay[dayKey]!
.indexWhere((s) => s.id == summary.id);
if (existingIndex != -1) {
_summariesByDay[dayKey]![existingIndex] = summary;
} else {
_summariesByDay[dayKey]!.add(summary);
}
_summariesByDay[dayKey]!.sort((a, b) {
return b.created.value.compareTo(a.created.value);
});
_summariesStreamController.add(Map.unmodifiable(_summariesByDay));
print('Cached summary ${summary.id} for day $dayKey');
}The _summariesByDay is just a Map<DateTime, List<WorkstreamSummary>> where:
Key: A DateTime set to midnight (e.g., Nov 4, 2025 at 00:00:00)
Value: List of all summaries for that day, sorted by time
Let's also add some useful methods to retrieve a summary
List<WorkstreamSummary> getSummariesForDay(DateTime date) {
final dayKey = DateTime(date.year, date.month, date.day);
return _summariesByDay[dayKey] ?? [];
}
List<DateTime> getDaysWithSummaries() {
final days = _summariesByDay.keys.toList();
days.sort((a, b) => b.compareTo(a));
return days;
}Step 4: Keeping everything in Sync
Speaking of streams – let's add a broadcast stream that others can listen to:
final StreamController<Map<DateTime, List<WorkstreamSummary>>>
_summariesStreamController = StreamController.broadcast();
Stream<Map<DateTime, List<WorkstreamSummary>>> get summariesStream =>
_summariesStreamController.stream;So in the UI (when we build it), we can just do:
service.summariesStream.listen((summaries) {
print('Got new data!');
});
Step 4.5: Waiting for Initial WebSocket Sync
Here's a problem: when the app first starts, the WebSocket dumps all existing summaries on us. This could be hundreds of identifiers! If we try to show the UI or generate a recap before they're all loaded, we'll have incomplete data.
But we only need to wait once – on the first message. After that, the WebSocket just sends new updates in real-time and we don't want to block.
Here's the solution using a Completer:
Step 1: Add Fields and Wait Method
Place this code block right after the PiecesOSService() constructor and before the connectApplication() method:
Completer<void>? _initialSyncCompleter;
bool get _isInitialSyncComplete =>
_initialSyncCompleter?.isCompleted ?? false;
Future<void> waitForInitialSync() async {
if (_isInitialSyncComplete) {
return;
}
if (_initialSyncCompleter != null) {
return _initialSyncCompleter!.future;
}
_initialSyncCompleter = Completer<void>();
return _initialSyncCompleter!.future;
}Step 2: Update WebSocket Listener
Replace _wsSubscription with this new section:
_wsSubscription = _wsChannel!.stream.listen(
(message) async {
try {
final streamedIdentifiers = StreamedIdentifiers.fromJson(
jsonDecode(message),
);
for (final id in streamedIdentifiers!.iterable) {
final summaryId = id.workstreamSummary?.id;
if (summaryId != null) {
await _fetchAndCacheSummary(summaryId);
}
}
if (!_isInitialSyncComplete) {
_isInitialSyncComplete = true;
_initialSyncCompleter?.complete();
print('Initial WebSocket sync complete!');
}
} catch (e) {
print('Error processing message: $e');
}
},
);Step 3: Use in App Initialization
Place this code in your app's entry point (typically lib/main.dart, create the file if you don't have it):
import 'services/pieces_os_service.dart';
Future<void> main() async {
final service = PiecesOSService();
await service.connectApplication();
service.startWebSocketListener();
await service.waitForInitialSync();
print('Ready to go! All summaries loaded.');
}Step 5: Accessing the Actual Summary Content
WorkstreamSummary objects from the SDK don't directly expose the summary text. The actual content is stored in annotations.
Each summary has annotations, and we need to:
Loop through the summary's annotations
Find the one with type SUMMARY
Fetch that annotation using AnnotationApi
Extract the text content
Let's add a method to fetch annotation content:
Future<String?> getSummaryContent(WorkstreamSummary summary) async {
try {
for (final annotationRef in summary.annotations?.indices.keys.toList() ?? []) {
final annotation = await _annotationApi
.annotationSpecificAnnotationSnapshot(annotationRef);
if (annotation == null) {
continue;
}
if (annotation.type == AnnotationTypeEnum.SUMMARY) {
return annotation.text;
}
}
return null;
} catch (e) {
print('Error fetching annotation content for ${summary.id}: $e');
return null;
}
}The SDK separates metadata (ID, timestamp) from content (stored in annotations).
Step 6: Creating a Proper Data Structure
Let's create a proper class for summaries with their content. Add this to lib/models/daily_recap_models.dart:
class SummaryWithContent {
final String id;
final String title;
final String content;
final DateTime timestamp;
SummaryWithContent({
required this.id,
required this.title,
required this.content,
required this.timestamp,
});
}Now, let's add a convenient method to get summaries with their content:
import 'package:daily_recap_app/models/daily_recap_models.dart';
Future<List<SummaryWithContent>> getSummariesWithContentForDay(
DateTime date,
) async {
final summaries = getSummariesForDay(date);
final List<SummaryWithContent> summariesWithContent = [];
for (final summary in summaries) {
final content = await getSummaryContent(summary);
if (content != null && content.isNotEmpty) {
summariesWithContent.add(
SummaryWithContent(
id: summary.id,
title: summary.name,
content: content,
timestamp: summary.created.value,
),
);
}
}
return summariesWithContent;
}Perfect! Now we have rich, typed data ready for AI processing and UI display.
Part 1 Summary
Connect to PiecesOS
Open WebSocket for real-time updates
Fetch and cache summaries
Group by day
Stream updates to UI
Extract summary content from annotations
What is created so far lib/services/pieces_os_service.dart
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:pieces_os_client/api.dart';
import 'package:pieces_os_client/api_client.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:daily_recap_app/models/daily_recap_models.dart';
class PiecesOSService {
static const String baseUrl = 'http://localhost:39300';
static const String websocketUrl = 'ws://localhost:39300';
late final ConnectorApi _connectorApi;
late final WorkstreamSummaryApi _workstreamSummaryApi;
late final AnnotationApi _annotationApi;
final StreamController<Map<DateTime, List<WorkstreamSummary>>>
_summariesStreamController = StreamController.broadcast();
Stream<Map<DateTime, List<WorkstreamSummary>>> get summariesStream =>
_summariesStreamController.stream;
bool _isInitialSyncComplete = false;
Completer<void>? _initialSyncCompleter;
PiecesOSService() {
final client = ApiClient(basePath: baseUrl);
_connectorApi = ConnectorApi(client);
_workstreamSummaryApi = WorkstreamSummaryApi(client);
_annotationApi = AnnotationApi(client);
}
Future<Application> connectApplication() async {
final seededApp = SeededTrackedApplication(
name: ApplicationNameEnum.OPEN_SOURCE,
platform: Platform.operatingSystem == "macos"
? PlatformEnum.MACOS
: PlatformEnum.WINDOWS,
version: "0.0.1",
);
final connection = SeededConnectorConnection(
application: seededApp,
);
_context = await _connectorApi.connect(
seededConnectorConnection: connection,
);
print('Successfully connected to Pieces OS!');
return _context!.application;
}
Future<void> waitForInitialSync() async {
if (_isInitialSyncComplete) {
return;
}
if (_initialSyncCompleter != null) {
return _initialSyncCompleter!.future;
}
_initialSyncCompleter = Completer<void>();
return _initialSyncCompleter!.future;
}
void _startWebSocketListener() {
final wsUrl = '$websocketUrl/workstream_summaries/stream/identifiers';
_wsChannel = WebSocketChannel.connect(Uri.parse(wsUrl));
print('WebSocket connected to $wsUrl');
_wsSubscription = _wsChannel!.stream.listen(
(message) async {
try {
final streamedIdentifiers = StreamedIdentifiers.fromJson(
jsonDecode(message),
);
for (final id in streamedIdentifiers!.iterable) {
final summaryId = id.workstreamSummary?.id;
if (summaryId != null) {
await _fetchAndCacheSummary(summaryId);
}
}
if (!_isInitialSyncComplete) {
_isInitialSyncComplete = true;
_initialSyncCompleter?.complete();
print('Initial WebSocket sync complete!');
}
} catch (e) {
print('Error processing message: $e');
}
},
onDone: () {
print('WebSocket closed, reconnecting...');
_reconnectWebSocket();
},
onError: (error) {
print('WebSocket error: $error');
_reconnectWebSocket();
},
);
}
Future<void> _fetchAndCacheSummary(String identifier) async {
final summary = await _workstreamSummaryApi
.workstreamSummariesSpecificWorkstreamSummarySnapshot(identifier);
if (summary == null) {
return;
}
final createdDate = summary.created.value;
final dayKey = DateTime(
createdDate.year,
createdDate.month,
createdDate.day,
);
if (!_summariesByDay.containsKey(dayKey)) {
_summariesByDay[dayKey] = [];
}
final existingIndex = _summariesByDay[dayKey]!
.indexWhere((s) => s.id == summary.id);
if (existingIndex != -1) {
_summariesByDay[dayKey]![existingIndex] = summary;
} else {
_summariesByDay[dayKey]!.add(summary);
}
_summariesByDay[dayKey]!.sort((a, b) {
return b.created.value.compareTo(a.created.value);
});
_summariesStreamController.add(Map.unmodifiable(_summariesByDay));
print('Cached summary ${summary.id} for day $dayKey');
}
List<WorkstreamSummary> getSummariesForDay(DateTime date) {
final dayKey = DateTime(date.year, date.month, date.day);
return _summariesByDay[dayKey] ?? [];
}
List<DateTime> getDaysWithSummaries() {
final days = _summariesByDay.keys.toList();
days.sort((a, b) => b.compareTo(a));
return days;
}
Future<List<SummaryWithContent>> getSummariesWithContentForDay(
DateTime date,
) async {
final summaries = getSummariesForDay(date);
final List<SummaryWithContent> summariesWithContent = [];
for (final summary in summaries) {
final content = await getSummaryContent(summary);
if (content != null && content.isNotEmpty) {
summariesWithContent.add(
SummaryWithContent(
id: summary.id,
title: summary.name,
content: content,
timestamp: summary.created.value,
),
);
}
}
return summariesWithContent;
}
}