Введение:#
Сегодня я расскажу вам о том, как можно объединить несколько потоков данных Stream в Dart, сохраняя при этом их идентичность. Это полезно, если вы создаете систему реального времени, где данные поступают из разных источников и вам важно знать, откуда они пришли. Давайте начнем!
Проблема#
Представьте, что вы разрабатываете приложение для мониторинга различных сенсоров в умном доме. (Люблю эту тему 😁)
У вас есть три потока данных:
- температура (Temperature)
- влажность (Humidity)
- статус датчика движения (Motion)
Эти потоки поступают от разных сенсоров:
Stream<int> temperatureStream = getTemperatureStream();
Stream<int> humidityStream = getHumidityStream();
Stream<int> motionStream = getMotionStream();
На первый взгляд, вы можете подумать, что объединить эти потоки довольно просто. Используем класс StreamGroup:
Stream<int> combinedStream = StreamGroup.merge([temperatureStream, humidityStream, motionStream]);
combinedStream.listen((event) {
print(event);
});
И действительно, это работает. Но вот проблема: когда данные начинают поступать из этих объединенных потоков, вы видите что-то вроде этого:
> 25
> 60
> 1
Решение#
Чтобы решить эту проблему, предлагаю метод, который позволит не только объединить потоки, но и сохранить информацию об их источнике. Мы создадим два вспомогательных класса и метод для объединения потоков. Увидел в статье Punnyarthabanerjee подобный подход и решил адаптировать для себя
Шаг 1: Класс StreamIdentity
Этот класс будет хранить идентификатор потока и сам поток:
class StreamIdentity {
final String identifier;
final Stream stream;
StreamIdentity(this.identifier, this.stream);
}
Шаг 2: Класс MergedStreamResponse
Этот класс будет содержать данные data
и информацию о том, из какого потока они пришли StreamIdentity
, а также временную метку timestamp
:
class MergedStreamResponse {
final StreamIdentity streamIdentity;
final dynamic data;
final DateTime timestamp;
MergedStreamResponse(this.streamIdentity, this.data) : timestamp = DateTime.now();
}
Шаг 3: Метод mergeStream
Этот метод объединит все потоки и будет добавлять данные в контроллер вместе с их идентификатором:
Stream<MergedStreamResponse> mergeStream(List<StreamIdentity> streams) {
final StreamController<MergedStreamResponse> controller = StreamController();
for (var identity in streams) {
identity.stream.listen((event) {
controller.add(MergedStreamResponse(identity, event));
});
}
return controller.stream;
}
Применение#
Теперь мы можем передать наши потоки в метод mergeStream
и обрабатывать данные с учетом их источника:
void main() {
Stream<int> temperatureStream = getTemperatureStream();
Stream<int> humidityStream = getHumidityStream();
Stream<int> motionStream = getMotionStream();
List<StreamIdentity> streams = [
StreamIdentity('temperature', temperatureStream),
StreamIdentity('humidity', humidityStream),
StreamIdentity('motion', motionStream),
];
Stream<MergedStreamResponse> combinedStream = mergeStream(streams);
combinedStream.listen((response) {
print('Data from ${response.streamIdentity.identifier} at ${response.timestamp}: ${response.data}');
});
}
Теперь данные выводятся с указанием их источника и времени получения:
Data from temperature at 2024-05-10 12:00:00: 25
Data from humidity at 2024-05-10 12:00:01: 60
Data from motion at 2024-05-10 12:00:02: 1
Пример реализации (полный код)#
class StreamIdentity {
final String identifier;
final Stream stream;
StreamIdentity(this.identifier, this.stream);
}
class MergedStreamResponse {
final StreamIdentity streamIdentity;
final dynamic data;
final DateTime timestamp;
MergedStreamResponse(this.streamIdentity, this.data) : timestamp = DateTime.now();
}
Stream<MergedStreamResponse> mergeStream(List<StreamIdentity> streams) {
final StreamController<MergedStreamResponse> controller = StreamController();
for (var identity in streams) {
identity.stream.listen((event) {
controller.add(MergedStreamResponse(identity, event));
});
}
return controller.stream;
}
void main() {
Stream<double> temperatureSensor = getTemperatureStream();
Stream<double> humiditySensor = getHumidityStream();
Stream<int> motionSensor = getMotionStream();
List<StreamIdentity> sensors = [
StreamIdentity('temperature', temperatureSensor),
StreamIdentity('humidity', humiditySensor),
StreamIdentity('motion', motionSensor),
];
Stream<MergedStreamResponse> combinedStream = mergeStream(sensors);
combinedStream.listen((response) {
print('Data from ${response.streamIdentity.identifier} at ${response.timestamp}: ${response.data}');
});
}
Stream<double> getTemperatureStream() async* {
yield* Stream.periodic(Duration(seconds: 1), (count) => 20.0 + count);
}
Stream<double> getHumidityStream() async* {
yield* Stream.periodic(Duration(seconds: 2), (count) => 50.0 + count * 0.5);
}
Stream<int> getMotionStream() async* {
yield* Stream.periodic(Duration(seconds: 3), (count) => count % 2);
}
В этом примере данные от датчика температуры, влажностности и датчика движения объединяются и выводятся с указанием их источника и времени получения. Это позволяет легко отслеживать и анализировать данные от каждого сенсора, что значительно упрощает принятие решений.
Заключение#
Надеюсь, эта статья была полезной и вы узнали что-то новое о работе с потоками в Dart. Если у вас есть вопросы или комментарии, обращайтесь. Стараюсь писать кратко и по делу. Спасибо за внимание и спасибо автору статьи на медиум за оригинальную идею!