processSessionFiles method

Future<ProcessedStats> processSessionFiles(
  1. List<String> sessionFiles, {
  2. ProcessOptions options = const ProcessOptions(),
})

Process session files and extract stats. Can filter by date range.

Implementation

Future<ProcessedStats> processSessionFiles(
  List<String> sessionFiles, {
  ProcessOptions options = const ProcessOptions(),
}) async {
  final fromDate = options.fromDate;
  final toDate = options.toDate;

  final dailyActivityMap = <String, DailyActivity>{};
  final dailyModelTokensMap = <String, Map<String, int>>{};
  final sessions = <SessionStats>[];
  final hourCounts = <int, int>{};
  var totalMessages = 0;
  var totalSpeculationTimeSavedMs = 0;
  final modelUsageAgg = <String, ModelUsage>{};
  final shotDistributionMap = shotStatsEnabled.value ? <int, int>{} : null;
  final sessionsWithShotCount = <String>{};

  // Process session files in batches.
  for (var i = 0; i < sessionFiles.length; i += _batchSize) {
    final batch = sessionFiles.sublist(
      i,
      min(i + _batchSize, sessionFiles.length),
    );

    final results = await Future.wait(
      batch.map((sessionFile) async {
        try {
          if (fromDate != null) {
            try {
              final fileStat = await File(sessionFile).stat();
              final fileModifiedDate = toDateString(fileStat.modified);
              if (isDateBefore(fileModifiedDate, fromDate)) {
                return _SessionReadResult(
                  sessionFile: sessionFile,
                  skipped: true,
                );
              }
            } catch (_) {
              // If we can't stat the file, try to read it anyway.
            }
          }

          final file = File(sessionFile);
          final content = await file.readAsString();
          final lines = content.split('\n').where((l) => l.isNotEmpty);
          final entries = <Map<String, dynamic>>[];
          for (final line in lines) {
            try {
              entries.add(jsonDecode(line) as Map<String, dynamic>);
            } catch (_) {
              // Skip malformed lines.
            }
          }
          return _SessionReadResult(
            sessionFile: sessionFile,
            entries: entries,
          );
        } catch (e) {
          return _SessionReadResult(
            sessionFile: sessionFile,
            error: e.toString(),
          );
        }
      }),
    );

    for (final result in results) {
      if (result.skipped) continue;
      if (result.error != null || result.entries == null) continue;

      final sessionFile = result.sessionFile;
      final entries = result.entries!;
      final sessionId = _basenameWithoutExtension(sessionFile, '.jsonl');

      final messages = entries.where((e) => _isTranscriptMessage(e)).toList();

      if (messages.isEmpty) continue;

      // Check for speculation-accept entries.
      for (final entry in entries) {
        if (entry['type'] == 'speculation-accept') {
          totalSpeculationTimeSavedMs +=
              ((entry['timeSavedMs'] as num?)?.toInt() ?? 0);
        }
      }

      final isSubagentFile = sessionFile.contains('/subagents/');

      // Extract shot count from PR attribution.
      if (shotStatsEnabled.value && shotDistributionMap != null) {
        final parentSessionId = isSubagentFile
            ? _extractParentSessionId(sessionFile)
            : sessionId;

        if (!sessionsWithShotCount.contains(parentSessionId)) {
          final shotCount = _extractShotCountFromMessages(messages);
          if (shotCount != null) {
            sessionsWithShotCount.add(parentSessionId);
            shotDistributionMap[shotCount] =
                (shotDistributionMap[shotCount] ?? 0) + 1;
          }
        }
      }

      // Filter out sidechain messages for session metadata.
      final mainMessages = isSubagentFile
          ? messages
          : messages.where((m) => m['isSidechain'] != true).toList();
      if (mainMessages.isEmpty) continue;

      final firstMessage = mainMessages.first;
      final lastMessage = mainMessages.last;

      final firstTimestamp = DateTime.tryParse(
        firstMessage['timestamp']?.toString() ?? '',
      );
      final lastTimestamp = DateTime.tryParse(
        lastMessage['timestamp']?.toString() ?? '',
      );

      if (firstTimestamp == null || lastTimestamp == null) continue;

      final dateKey = toDateString(firstTimestamp);

      if (fromDate != null && isDateBefore(dateKey, fromDate)) continue;
      if (toDate != null && isDateBefore(toDate, dateKey)) continue;

      final existing =
          dailyActivityMap[dateKey] ?? DailyActivity(date: dateKey);

      if (!isSubagentFile) {
        final duration = lastTimestamp
            .difference(firstTimestamp)
            .inMilliseconds;
        sessions.add(
          SessionStats(
            sessionId: sessionId,
            duration: duration,
            messageCount: mainMessages.length,
            timestamp: firstMessage['timestamp'] as String,
          ),
        );

        totalMessages += mainMessages.length;
        existing.sessionCount++;
        existing.messageCount += mainMessages.length;

        final hour = firstTimestamp.hour;
        hourCounts[hour] = (hourCounts[hour] ?? 0) + 1;
      }

      if (!isSubagentFile || dailyActivityMap.containsKey(dateKey)) {
        dailyActivityMap[dateKey] = existing;
      }

      // Process messages for tool usage and model stats.
      for (final message in mainMessages) {
        if (message['type'] == 'assistant') {
          final content = message['message']?['content'];
          if (content is List) {
            for (final block in content) {
              if (block is Map && block['type'] == 'tool_use') {
                final activity = dailyActivityMap[dateKey];
                if (activity != null) {
                  activity.toolCallCount++;
                }
              }
            }
          }

          final usage = message['message']?['usage'];
          if (usage is Map) {
            final model =
                (message['message']?['model'] as String?) ?? 'unknown';
            if (model == syntheticModel) continue;

            modelUsageAgg.putIfAbsent(model, () => ModelUsage());
            final agg = modelUsageAgg[model]!;
            agg.inputTokens += (usage['input_tokens'] as num?)?.toInt() ?? 0;
            agg.outputTokens +=
                (usage['output_tokens'] as num?)?.toInt() ?? 0;
            agg.cacheReadInputTokens +=
                (usage['cache_read_input_tokens'] as num?)?.toInt() ?? 0;
            agg.cacheCreationInputTokens +=
                (usage['cache_creation_input_tokens'] as num?)?.toInt() ?? 0;

            final totalTokens =
                ((usage['input_tokens'] as num?)?.toInt() ?? 0) +
                ((usage['output_tokens'] as num?)?.toInt() ?? 0);
            if (totalTokens > 0) {
              final dayTokens = dailyModelTokensMap[dateKey] ?? {};
              dayTokens[model] = (dayTokens[model] ?? 0) + totalTokens;
              dailyModelTokensMap[dateKey] = dayTokens;
            }
          }
        }
      }
    }
  }

  final sortedDailyActivity = dailyActivityMap.values.toList()
    ..sort((a, b) => a.date.compareTo(b.date));

  final sortedDailyModelTokens =
      dailyModelTokensMap.entries
          .map((e) => DailyModelTokens(date: e.key, tokensByModel: e.value))
          .toList()
        ..sort((a, b) => a.date.compareTo(b.date));

  return ProcessedStats(
    dailyActivity: sortedDailyActivity,
    dailyModelTokens: sortedDailyModelTokens,
    modelUsage: modelUsageAgg,
    sessionStats: sessions,
    hourCounts: hourCounts,
    totalMessages: totalMessages,
    totalSpeculationTimeSavedMs: totalSpeculationTimeSavedMs,
    shotDistribution: shotDistributionMap,
  );
}