Введение:#
Сегодня я расскажу вам о том, как можно объединить несколько потоков данных Stream в Dart, сохраняя при этом их идентичность. Это полезно, если вы создаете систему реального времени, где данные поступают из разных источников и вам важно знать, откуда они пришли. Давайте начнем!
Проблема#
Представьте, что вы разрабатываете приложение для мониторинга различных сенсоров в умном доме. (Люблю эту тему 😁)
У вас есть три потока данных:
- температура (Temperature)
- влажность (Humidity)
- статус датчика движения (Motion)
Эти потоки поступают от разных сенсоров:
Stream<int> temperatureStream = getTemperatureStream();
Stream<int> humidityStream = getHumidityStream();
Stream<int> motionStream = getMotionStream();
На первый взгляд, вы можете подумать, что объединить эти потоки довольно просто. Используем класс StreamGroup:
Stream<int> combinedStream = StreamGroup.merge([temperatureStream, humidityStream, motionStream]);
combinedStream.listen((event) {
  print(event);
});
И действительно, это работает. Но вот проблема: когда данные начинают поступать из этих объединенных потоков, вы видите что-то вроде этого:
> 25
> 60
> 1
Решение#
Чтобы решить эту проблему, предлагаю метод, который позволит не только объединить потоки, но и сохранить информацию об их источнике. Мы создадим два вспомогательных класса и метод для объединения потоков. Увидел в статье Punnyarthabanerjee подобный подход и решил адаптировать для себя
Шаг 1: Класс StreamIdentity
Этот класс будет хранить идентификатор потока и сам поток:
class StreamIdentity {
  final String identifier;
  final Stream stream;
  StreamIdentity(this.identifier, this.stream);
}
Шаг 2: Класс MergedStreamResponse
Этот класс будет содержать данные data и информацию о том, из какого потока они пришли StreamIdentity, а также временную метку timestamp:
class MergedStreamResponse {
  final StreamIdentity streamIdentity;
  final dynamic data;
  final DateTime timestamp;
  MergedStreamResponse(this.streamIdentity, this.data) : timestamp = DateTime.now();
}
Шаг 3: Метод mergeStream
Этот метод объединит все потоки и будет добавлять данные в контроллер вместе с их идентификатором:
Stream<MergedStreamResponse> mergeStream(List<StreamIdentity> streams) {
  final StreamController<MergedStreamResponse> controller = StreamController();
  for (var identity in streams) {
    identity.stream.listen((event) {
      controller.add(MergedStreamResponse(identity, event));
    });
  }
  return controller.stream;
}
Применение#
Теперь мы можем передать наши потоки в метод mergeStream и обрабатывать данные с учетом их источника:
void main() {
  Stream<int> temperatureStream = getTemperatureStream();
  Stream<int> humidityStream = getHumidityStream();
  Stream<int> motionStream = getMotionStream();
  List<StreamIdentity> streams = [
    StreamIdentity('temperature', temperatureStream),
    StreamIdentity('humidity', humidityStream),
    StreamIdentity('motion', motionStream),
  ];
  Stream<MergedStreamResponse> combinedStream = mergeStream(streams);
  combinedStream.listen((response) {
    print('Data from ${response.streamIdentity.identifier} at ${response.timestamp}: ${response.data}');
  });
}
Теперь данные выводятся с указанием их источника и времени получения:
Data from temperature at 2024-05-10 12:00:00: 25
Data from humidity at 2024-05-10 12:00:01: 60
Data from motion at 2024-05-10 12:00:02: 1
Пример реализации (полный код)#
class StreamIdentity {
  final String identifier;
  final Stream stream;
  StreamIdentity(this.identifier, this.stream);
}
class MergedStreamResponse {
  final StreamIdentity streamIdentity;
  final dynamic data;
  final DateTime timestamp;
  MergedStreamResponse(this.streamIdentity, this.data) : timestamp = DateTime.now();
}
Stream<MergedStreamResponse> mergeStream(List<StreamIdentity> streams) {
  final StreamController<MergedStreamResponse> controller = StreamController();
  for (var identity in streams) {
    identity.stream.listen((event) {
      controller.add(MergedStreamResponse(identity, event));
    });
  }
  return controller.stream;
}
void main() {
  Stream<double> temperatureSensor = getTemperatureStream();
  Stream<double> humiditySensor = getHumidityStream();
  Stream<int> motionSensor = getMotionStream();
  List<StreamIdentity> sensors = [
    StreamIdentity('temperature', temperatureSensor),
    StreamIdentity('humidity', humiditySensor),
    StreamIdentity('motion', motionSensor),
  ];
  Stream<MergedStreamResponse> combinedStream = mergeStream(sensors);
  combinedStream.listen((response) {
    print('Data from ${response.streamIdentity.identifier} at ${response.timestamp}: ${response.data}');
  });
}
Stream<double> getTemperatureStream() async* {
  yield* Stream.periodic(Duration(seconds: 1), (count) => 20.0 + count);
}
Stream<double> getHumidityStream() async* {
  yield* Stream.periodic(Duration(seconds: 2), (count) => 50.0 + count * 0.5);
}
Stream<int> getMotionStream() async* {
  yield* Stream.periodic(Duration(seconds: 3), (count) => count % 2);
}
В этом примере данные от датчика температуры, влажностности и датчика движения объединяются и выводятся с указанием их источника и времени получения. Это позволяет легко отслеживать и анализировать данные от каждого сенсора, что значительно упрощает принятие решений.
Заключение#
Надеюсь, эта статья была полезной и вы узнали что-то новое о работе с потоками в Dart. Если у вас есть вопросы или комментарии, обращайтесь. Стараюсь писать кратко и по делу. Спасибо за внимание и спасибо автору статьи на медиум за оригинальную идею!