Skip to main content

异步 Future Stream

程序在等待异步操作完成的过程中,非常有必要给予用户适当的反馈,这个时候就可以使用进度条。

进度条

CircularProgressIndicator

CircularProgressIndicator()
CircularProgressIndicator(
valueColor: AlwaysStoppedAnimation(Colors.red),
backgroundColor: Colors.white,
)

LinearProgressIndicator

LinearProgressIndicator()
LinearProgressIndicator(
valueColor: AlwaysStoppedAnimation(Colors.red),
backgroundColor: Colors.grey,
)
LinearProgressIndicator(
value: 0.1,
valueColor: AlwaysStoppedAnimation(Colors.blue),
backgroundColor: Colors.grey,
)

CupertinoActivityIndicator

CupertinoActivityIndicator()
CupertinoActivityIndicator(
radius: 24
)

Future 异步

在其他语言,比如 java、c++ 中, 同时要执行多个任务可以使用多线程来实现。而在 Dart 语言中没有线程 和进程的概念 ,它是单线程+事件循环的设计模式,DartFlutter 中要同时执行多个任务可以使用异步 来实现,Flutter 中主要使用 Future 来实现异步操作 。

Future 返回值

  • Future 是一个泛型,其中 T 代表的是我们耗时操作返回的具体值,如 Future 表示一个未来的字符串,Future 表示一个未来的布尔值,如果不需要返回值,可以使用 Future
    • Future.value(): 返回一个指定值的 Future
    • Future.delayed(): 返回一个延时执行的 Future
    • Future(() { ... return ... }) 返回异步的 function

Flutter 自带的 Demo 中有个计数器的功能,我们想的是执行计数器方法的时候让程序并行的去执行另一个统计的任务,这个时候就可以使用 Future 来完成。

Future<String> getNetworkData() {
return Future(() {
//执行其他耗时操作
int result = 0;
for (int i = 0; i < 100000; i++) {
result += i;
}
return "result:$result";
});
}
...
void _incrementCounter() {
getNetworkData().then((value) => print(value));
print("执行");
setState(() {
_counter++;
}
);
}

floatingActionButton: FloatingActionButton(
onPressed: _incrementCounter,
tooltip: 'Increment',
child: const Icon(Icons.add),
)

I/flutter ( 1799): 执行
I/flutter ( 1799): result:4999950000

Future 处理异常

异常处理使我们在开发中特别需要注意的,正确的处理程序运行中的异常,能给用户带来更好的体验。

future 中可以使用 catchError() 或在 then() 方法中传入可选参数 onError 来处理异常,可以使用 whenComplete 监听完成事件。

Future<String> getNetworkData() {
Future.delayed(const Duration(seconds: 3));
return Future.error(Exception("this is error"));
}

void _incrementCounter() {
getNetworkData()
.then((value) => print(value))
.onError((error, stackTrace) => {print(error)})
.whenComplete(() => print("完成"));

print("执行");
setState(() {
_counter++;
});
}

Future .then连缀来处理多个事务

Future<int> getNetworkData() {
return Future.value(12);
}

int _counter = 0;
void _incrementCounter() {
getNetworkData()
.then((value){
return value*2;
})
.then((value) => print(value))
.onError((error, stackTrace) => {print(error)})
.whenComplete(() => print("完成"));

print("执行");

setState(() {
_counter++;
});
}

async 和 await 关键字来处理 future

  • 作用:
    1. async:在方法体前面是使用,定义该方法为一个异步方法。
    2. await:等待并获得异步表达式的执行结果,并且给关键字只能在async修饰的方法中。
Future<int> getNetworkData(){
return Future.value(12);
}

void _incrementCounter() async{
var retult =await getNetworkData();
print(retult);
setState(() {
_counter++;
});
}

处理 async 方法中的异常

对于async中的方法的异常,我们按以下方式进行处理:

Future<int> getNetworkData() {
return Future.error(Exception("this is errot"));
}

void _incrementCounter() async {
try {
var retult = await getNetworkData();
print(retult);
} catch (e) {
print(e);
}
setState(() {
_counter++;
});
}

FutureBuilder

FutureBuilder是一个可以自动追踪Future的状态并在其状态改变的时候自动重绘的组件。

FutureBuilder 追踪 Future 的状态

  1. 定义一个模拟请求数据的异步方法
  2. 数据加载的时候加载一个 Indicator ,数据加载完毕后显示数据
import 'package:flutter/material.dart';
void main() {
runApp(const MyApp());
}

class MyApp extends StatelessWidget {
const MyApp({Key? key}) : super(key: key);
// This widget is the root of your application.
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'Flutter Demo',
theme: ThemeData(
primarySwatch: Colors.blue,
),
home: const MyHomePage(),
);
}
}

class MyHomePage extends StatefulWidget {
const MyHomePage({super.key});@override
State<MyHomePage> createState() => _MyHomePageState();
}

class _MyHomePageState extends State<MyHomePage> {
Future<String> loadData() async {
await Future.delayed(const Duration(seconds: 2)); // 等待2秒
// throw "404 data not found"; // 若需测试异常情况,可把注释去掉
return "this is server data"; // 正常返回数据
}

@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: const Text('Title'),
),
body: FutureBuilder(
future: loadData(), // 读取网络数据,异步函数,返回一个Future类型
builder: (BuildContext context, AsyncSnapshot<String> snapshot) {
// 检查ConnectionState是否为done,以此判断Future是否结束
if (snapshot.connectionState == ConnectionState.done) {
// 当Future结束时,data和error必有一个不是空
if (snapshot.hasError) {
// 判断是否有错误,有则显示错误
return Center(
child: Text("ERROR: ${snapshot.error}"),
);

} else {
// 没有错误,则显示数据
return Center(
child: Text("DATA: ${snapshot.data}")
);
}
} else {
// Future还没结束,因此渲染一个圆形进度条
return const Center(
child: CircularProgressIndicator(),
);
}
},
),
);
}
}

FutureBuilder initialData

import 'package:flutter/material.dart';
void main() {
runApp(const MyApp());
}

class MyApp extends StatelessWidget {
const MyApp({Key? key}) : super(key: key);
// This widget is the root of your application.
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'Flutter Demo',
theme: ThemeData(
primarySwatch: Colors.blue,
),
home: const MyHomePage(),
);
}
}

class MyHomePage extends StatefulWidget {
const MyHomePage({super.key});
@override
State<MyHomePage> createState() => _MyHomePageState();
}

class _MyHomePageState extends State<MyHomePage> {
Future<String> loadData() async {
await Future.delayed(const Duration(seconds: 2)); // 等待2秒
// throw "404 data not found"; // 若需测试异常情况,可把注释去掉
return "this is server data"; // 正常返回数据
}

@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: const Text('Title'),
), // AppBar
body: FutureBuilder(
future: loadData(),
initialData: "我是一个初始值",
builder: (context, snapshot) {
if (snapshot.hasError) {
return Center(
child: Text("ERROR: ${snapshot.error}"),
);
} else {
return Center(
child: Text("${snapshot.data}"),
);
}
}
), // FutureBuilder
);
}
}

Stream 、StreamBuilder

Stream

前面章节我们给大家介绍了 FutureFuture 在未来只会获取一个值。Stream 的字面意思是水流, Stream 不像 Future 那样只会在未来获取一个值,它可以异步获取0个或者多个值。如果说 Future 是一个 异步版本的 int 或者 StringStream 则更像是异步版本的列表,List ,List,列表里面可能会有0个或者多个 元素。

final future=Future.delayed(const Duration(seconds: 1),()=>43);
final stream=Stream.periodic(const Duration(seconds: 1),(value)=>value);
@override
void initState() {
super.initState();
future.then((value) => print(value));
stream.listen((event) {
print("stream:$event");
});
}

StreamBuilder

Flutter 中,StreamBuilder 是一个将 Stream 流与 Widget 结合到一起的组件,可实现组件的局部数 据更新 , StreamBuilder 组件和 FutureBuilder 组件比较相似,不同点在于它是一个可以自动跟踪 Stream(数据流或事件流)的状态,并在 Stream 有变化时自动重绘的组件。Stream 不同于 Future ,可 能会在生命周期内释放出任意数量的数据值(正常)或者错误信息(异常),通常被用于读取文件或者 下载网络资源等操作,也有时候用于状态管理。

  • StreamBuilder主要功能:

    1. 实现局部刷新
    2. 读取流实现读取文件或者下载网络资源等操作
    3. 父子组件之间的数据广播
  • StreamBuilder 实现一个计数器

import 'dart:math';
import 'package:flutter/material.dart';

void main() {
runApp(const MyApp());
}

class MyApp extends StatelessWidget {
const MyApp({Key? key}) : super(key: key);
// This widget is the root of your application.
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'Flutter Demo',
theme: ThemeData(
primarySwatch: Colors.blue,
),
home: const MyHomePage(),
);
}
}

class MyHomePage extends StatefulWidget {
const MyHomePage({super.key});
@override
State<MyHomePage> createState() => _MyHomePageState();
}

class _MyHomePageState extends State<MyHomePage> {
Stream<int> counter() {
return Stream.periodic(const Duration(seconds: 1), (count) {
return count;
});
}

@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: const Text('Title'),
),
body: StreamBuilder(
stream: counter(),
builder: (context, snapshot) {
// 观察ConnectionState的状态
switch (snapshot.connectionState) {
case ConnectionState.none:
return const Center(
child: Text("NONE: 没有数据流"),
);
case ConnectionState.waiting:
return const Center(child: Text("WAITING: 等待数据流"));
case ConnectionState.active:
if (snapshot.hasError) {
return Center(
child: Text("ACTIVE: 数据流活跃,异常: ${snapshot.error}")
);
} else {
return Center(
child: Text("ACTIVE: 数据流活跃,数据: ${snapshot.data}")
);
}
case ConnectionState.done:
return const Center(
child: Text("DONE: 数据流关闭")
);
default:
throw "ConnectionState没有别的状态";
}
}
),
);
}
}

StreamBuilder 实时显当前的时间

import 'dart:math';
import 'package:flutter/material.dart';

void main() {
runApp(const MyApp());
}

class MyApp extends StatelessWidget {
const MyApp({Key? key}) : super(key: key);
// This widget is the root of your application.
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'Flutter Demo',
theme: ThemeData(
primarySwatch: Colors.blue,
),
home: const MyHomePage(),
);
}
}

class MyHomePage extends StatefulWidget {
const MyHomePage({super.key});
@override
State<MyHomePage> createState() => _MyHomePageState();
}

class _MyHomePageState extends State<MyHomePage> {
Stream<String> counter() {
return Stream.periodic(const Duration(seconds: 1), (count) {
return
"${DateTime.now().hour}-${DateTime.now().minute}-${DateTime.now().second}";
});
}

@override
void initState() {
// TODO: implement initState
super.initState();
}

@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: const Text('Title'),
),
body: StreamBuilder(
stream: counter(),
builder: (context, snapshot) {// 观察ConnectionState的状态
switch (snapshot.connectionState) {
case ConnectionState.none:
return const Center(
child: Text("NONE: 没有数据流"),
);
case ConnectionState.waiting:
return const Center(child: Text("WAITING: 等待数据流"));
case ConnectionState.active:
if (snapshot.hasError) {
return Center(
child: Text("ACTIVE: 数据流活跃,异常: ${snapshot.error}")
);
} else {
return Center(
child: Text("ACTIVE: 数据流活跃,数据: ${snapshot.data}")
);
}
case ConnectionState.done:
return const Center(child: Text("DONE: 数据流关闭"));
default:
throw "ConnectionState没有别的状态";
}
}
),
);
}
}

StreamController

一般情况我们都是监听别人给我们的数据流,比如 File("").openRead() 读取文件, 刚才给大家讲了 Stream.periodic 可以创建数据流,如果我们想创建更精确的数据流的话也可以使用 StreamController

注意: import 'dart:async'; // 需要导入异步包

import 'package:flutter/material.dart';

import 'dart:async'; // 需要导入异步包
void main() {
runApp(MyApp());
}

class MyApp extends StatelessWidget {
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'Flutter Demo',
home: DemoPage(),
);
}
}

class DemoPage extends StatefulWidget {
@override
State<DemoPage> createState() => _DemoPageState();
}

class _DemoPageState extends State<DemoPage> {
// 定义一个类型为int的Stream
final _controller = StreamController<int>();

@override
void dispose() {
super.dispose();
_controller.close();
}

@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: const Text("Stream Demo"),
),
body: Wrap(
spacing: 20,
children: [
ElevatedButton(
// 按钮点击后Stream会释放出数字1
child: const Text("Emit 1"),
onPressed: () => _controller.add(1),
),
ElevatedButton(
// 按钮点击后Stream会释放出数字2
child: const Text("Emit 2"),
onPressed: () => _controller.add(2),
),
ElevatedButton(
// 按钮点击后Stream会释放出一个错误
child: const Text("Emit Error"),
onPressed: () => _controller.addError("oops"),
),
ElevatedButton(
// 按钮点击后Stream会关闭
child: const Text("Close"),
onPressed: () => _controller.close(),
),
StreamBuilder(
stream: _controller.stream,
builder: (context, snapshot) {
print("正在重新绘制StreamBuilder组件…");
if (snapshot.connectionState == ConnectionState.done) {
return const Text("数据流已关闭");
}
if (snapshot.hasError) return Text("${snapshot.error}");
if (snapshot.hasData) return Text("${snapshot.data}");
return const Center(
child: CircularProgressIndicator(),
);
},
)
],
),
);
}
}

StreamController.broadcast()

StreamController 默认只能有一个监听这,如果有多个监听者的话就需要用 StreamController.broadcast()

import 'package:flutter/material.dart';
import 'dart:async'; // 需要导入异步包

void main() {
runApp(MyApp());
}

class MyApp extends StatelessWidget {
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'Flutter Demo',
home: DemoPage(),
);
}
}

class DemoPage extends StatefulWidget {
@override
State<DemoPage> createState() => _DemoPageState();
}

class _DemoPageState extends State<DemoPage> {
final _controller = StreamController.broadcast();
@override
void initState() {
// TODO: implement initState
super.initState();
_controller.stream.listen((event) {
print(event);
});
}

@override
void dispose() {
super.dispose();
_controller.close();
}

@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: const Text("Stream Demo"),
),
body: Wrap(
spacing: 20,
children: [
ElevatedButton(
// 按钮点击后Stream会释放出数字1
child: const Text("Emit 1"),
onPressed: () => _controller.add(1),
),
ElevatedButton(
// 按钮点击后Stream会释放出数字2
child: const Text("Emit 2"),
onPressed: () => _controller.add(2),
),
ElevatedButton(
// 按钮点击后Stream会释放出一个错误
child: const Text("Emit Error"),
onPressed: () => _controller.addError("oops"),
),
ElevatedButton(
// 按钮点击后Stream会关闭
child: const Text("Close"),
onPressed: () => _controller.close(),
),
StreamBuilder(
stream: _controller.stream , // 去除重复的数据
builder: (context, snapshot) {
print("正在重新绘制StreamBuilder组件…");
if (snapshot.connectionState == ConnectionState.done) {
return const Text("数据流已关闭");
}

if (snapshot.hasError) return Text("${snapshot.error}");
if (snapshot.hasData) return Text("${snapshot.data}");

return const Center(
child: CircularProgressIndicator(),
);
},
)
],
),
);
}
}

Stream 过滤

import 'package:flutter/material.dart';
import 'dart:async'; // 需要导入异步包

void main() {
runApp(MyApp());
}

class MyApp extends StatelessWidget {
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'Flutter Demo',
home: DemoPage(),
);
}
}

class DemoPage extends StatefulWidget {
@override
State<DemoPage> createState() => _DemoPageState();
}

class _DemoPageState extends State<DemoPage> {
// 定义一个类型为int的Stream
final _controller = StreamController.broadcast();
@override
void initState() {
// TODO: implement initState
super.initState();
_controller.stream.listen((event) {
print(event);
});
}

@override
void dispose() {
super.dispose();
_controller.close();
}

@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: const Text("Stream Demo"),
),
body: Wrap(
spacing: 20,
children: [
ElevatedButton(
// 按钮点击后Stream会释放出数字1
child: const Text("Emit 1"),
onPressed: () => _controller.add(1),
),
ElevatedButton(
// 按钮点击后Stream会释放出数字2
child: const Text("Emit 2"),
onPressed: () => _controller.add(2),
),
ElevatedButton(
// 按钮点击后Stream会释放出数字1
child: const Text("Emit 3"),
onPressed: () => _controller.add(3),
),
ElevatedButton(
// 按钮点击后Stream会释放出数字1
child: const Text("Emit 4"),
onPressed: () => _controller.add(4),
),
ElevatedButton(
// 按钮点击后Stream会释放出数字1
child: const Text("Emit 5"),
onPressed: () => _controller.add(5),
),
ElevatedButton(
// 按钮点击后Stream会释放出数字1
child: const Text("Emit 6"),
onPressed: () => _controller.add(6),
),
ElevatedButton(
// 按钮点击后Stream会释放出一个错误
child: const Text("Emit Error"),
onPressed: () => _controller.addError("oops"),
),
ElevatedButton(
// 按钮点击后Stream会关闭
child: const Text("Close"),
onPressed: () => _controller.close(),
),
StreamBuilder(
stream: _controller.stream
.where((event) => event > 3)
.map((event) => event*2)
.distinct(), // 去除重复的数据
builder: (context, snapshot) {
print("正在重新绘制StreamBuilder组件…");
if (snapshot.connectionState == ConnectionState.done) {
return const Text("数据流已关闭");
}
if (snapshot.hasError) return Text("${snapshot.error}");
if (snapshot.hasData) return Text("${snapshot.data}");
return const Center(
child: CircularProgressIndicator(),
);
},
)
],
),
);
}
}

async 和 async*

async 返回 Future

async* 返回 Stream

async 不必多言,有了解的都知道这是异步调用。当一个函数被标记成 async 的时候,意味这个方法可能要从事耗时工作,比如说网络请求、处理图片等等。被 async 标记的方法会把返回值用 Future 包裹一下。

async
Future<int> loadData() async {
await Future.delayed(const Duration(seconds: 1));
return 42;
}

我们可以通过 await 来获取 Future 里的返回值:

main() async {
int result = await loadData();
print(result); // 等待一分钟后打印 '42'
}

async*

async 比多了一个,加上其实是函数生成器的意思。被 async 标记的函数会在返回一组返回值,这些返回值会被包裹在 Stream 中。async* 其实是为 yield 关键字发出的值提供了一个语法糖。

Stream<int> countForOneMinute() async* {
for (int i = 1; i <= 60; i++) {
await Future.delayed(const Duration(seconds: 1));
yield i;
}
}
tip

上面的其实就是 异步生成器 了。我们可以使用 yield 替代 return 返回数据, 因为这个是时候我们的函数还在执行中。此时,我们就可以使用 await for 去等待 Stream 发出的每一个值了。

StreamBuilder 调用

import 'package:flutter/material.dart';
import 'dart:async'; // 需要导入异步包

void main() {
runApp(MyApp());
}

class MyApp extends StatelessWidget {
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'Flutter Demo',
home: DemoPage(),
);
}
}

class DemoPage extends StatefulWidget {
@override
State<DemoPage> createState() => _DemoPageState();
}

class _DemoPageState extends State<DemoPage> {
// 定义一个类型为int的Stream
Stream<int> countForOneMinute() async* {
for (int i = 1; i <= 60; i++) {
await Future.delayed(const Duration(seconds: 1));
yield i;
}
}

@override
void initState() {
// TODO: implement initState
super.initState();
}

@override
void dispose() {
super.dispose();
}

@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: const Text("Stream Demo"),
), // AppBar
body: StreamBuilder(
stream: countForOneMinute(),
builder: (context, snapshot) {
// 观察ConnectionState的状态
switch (snapshot.connectionState) {
case ConnectionState.none:
return const Center(
child: Text("NONE: 没有数据流"),
);
case ConnectionState.waiting:
return const Center(
child: Text("WAITING: 等待数据流")
);

case ConnectionState.active:
if (snapshot.hasError) {
return Center(
child: Text("ACTIVE: 数据流活跃,异常: ${snapshot.error}")
);
} else {
return Center(
child: Text("ACTIVE: 数据流活跃,数据: ${snapshot.data}")
);
}
case ConnectionState.done:
return const Center(
child: Text("DONE: 数据流关闭")
);
default:
throw "ConnectionState没有别的状态";
}
}
), // StreamBuilder
); // Scaffold
}
}