前言
我们在此前已经介绍了APM模型和EAP模型,以及它们的优缺点。在EAP模型中,可以实时得知异步操作的进度,以及支持取消操作。但是组合多个异步操作仍需大量工作,编写大量代码方可完成。
因此,在.Net Framework 4.0中,引入了一个新的关于异步操作的模型,叫做任务并行库,简称为TPL。
第三个异步编程模型:TPL
概述
TPL,全称为Task Parallel Library,它可以被认为是线程池之上的又一个抽象层,隐藏了部分底层细节,核心概念为任务。
一个任务代表了一个异步操作,该操作可以通过多种方式运行,可以使用或者不使用独立线程(如Thread)运行,还可以通过多种方式和其他任务组合起来。
在本文中,我们将探究TPL的使用方式,以及如何正确处理异常,取消任务,如何使多个任务同时执行等。
创建TPL
我们首先需要创建一个控制台程序,用来执行Task的创建和运行,并在Task内部使用委托调用一个方法,用来打印当前任务以及当前任务所在的线程信息,如图所示:
我们分别使用了三种方式来创建任务并执行:
在第一种方式中,使用new Task类的方式,把需要执行的内容放入Action委托并传入参数,最后使用Start方法开启任务执行,若不调用Start方法,则不会启动任务,切记。
在第二种方式和第三种方式中,被创建的任务会立即开始工作,所以无需显式调用Start方法。Task.Run与Task.Factory.StartNew的区别为,前者是后者的一个快捷方式,但后者拥有附加选项,如没有特殊需求,通常使用前者来创建任务。
相关代码如下:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace TPLDemo
{
class Program
{
static void Main(string[] args)
{
var t1 = new Task(() => TaskMethod("任务1"));
var t2 = new Task(() => TaskMethod("任务2"));
t1.Start();
t2.Start();
Task.Run(() => TaskMethod("任务3"));
Task.Factory.StartNew(() => TaskMethod("任务4"));
Task.Factory.StartNew(() => TaskMethod("任务5"), TaskCreationOptions.LongRunning);
Thread.Sleep(TimeSpan.FromSeconds(1000));
}
/// <summary>
/// 任务运行的方法
/// </summary>
/// <param name="name">The name.</param>
static void TaskMethod(string name)
{
Console.WriteLine($@"Task {name} 是一个正在线程id为 {Thread.CurrentThread.ManagedThreadId} 上运行的任务,
是否为线程池线程:{Thread.CurrentThread.IsThreadPoolThread}");
}
}
}
接着我们来看一下运行结果,如图所示:
可以看出任务1,2,3,4均为线程池中的线程,也印证了我们此前的概念,TPL为线程池上的一个抽象层。而任务5在实现时被我们标记为需要长时间运行的任务,因此在调度时,并未使用线程池中的线程,而是单独开启一个线程执行,这样可以避免线程池中的线程被长时间占用,无法复用资源。
实现取消
在EAP模型中,我们借助BackgroundWorker组件封装好的取消方法,可以对正在执行的线程进行取消。那么这样的方式毕竟是有很大的局限性的,因此,在Net Framework 4.0中,微软创建了统一的模型来协作取消涉及两个对象的异步操作或长时间运行的同步操作,它就是CancellationTokenSource和CancellationToken。
我们需要创建CancellationTokenSource实例以传入Task,来标识此任务包含外部取消操作,然后使用CancellationToken来传播任务内的应取消操作的通知,如图所示:
相关代码如下:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace TPLDemo
{
class Program
{
static void Main(string[] args)
{
var cts = new CancellationTokenSource();
var longTask = new Task<int>(() => TaskMethod("任务1", 10, cts.Token), cts.Token);
Console.WriteLine(longTask.Status);
cts.Cancel();
Console.WriteLine(longTask.Status);
Console.WriteLine("任务1在执行前已经被取消");
cts = new CancellationTokenSource();
longTask = new Task<int>(() => TaskMethod("任务2", 10, cts.Token), cts.Token);
longTask.Start();
for (int i = 0; i < 5; i++)
{
Thread.Sleep(TimeSpan.FromSeconds(0.5));
Console.WriteLine(longTask.Status);
}
cts.Cancel();
for (int i = 0; i < 5; i++)
{
Thread.Sleep(TimeSpan.FromSeconds(0.5));
Console.WriteLine(longTask.Status);
}
Console.WriteLine($"任务2执行完成,结果:{longTask.Result}");
Console.Read();
}
/// <summary>
/// 任务取消的方法
/// </summary>
/// <param name="name"></param>
/// <param name="seconds"></param>
/// <param name="token"></param>
/// <returns></returns>
private static int TaskMethod(string name, int seconds, CancellationToken token)
{
Console.WriteLine($@"Task {name} 是一个正在线程id为 {Thread.CurrentThread.ManagedThreadId} 上运行的任务,
是否为线程池线程:{Thread.CurrentThread.IsThreadPoolThread}");
for (int i = 0; i < seconds; i++)
{
Thread.Sleep(TimeSpan.FromSeconds(1));
if (token.IsCancellationRequested)
{
return -1;
}
}
return 42 * seconds;
}
}
}
运行后结果如图所示:
从代码中,我们可以看出,我们给Task传递了两次CancellationTokenSource,一次是任务内执行方法,一次是任务本身构造函数,那么为什么要这样做呢?
因为如果我们在任务启动之前进行取消,那么该任务所在的TPL模型,就会“接管”该取消操作,因为这些代码根本不会继续执行。我们查看第一个任务的状态可以得知,它已经被取消了,如果在此时再调用Start方法,那么将会抛出一个异常。
而在第二个任务中,我们先执行任务,再做取消,那么此时我们相当于是在外部对此任务进行取消控制,而且在执行取消之后,任务2的状态依然是RanToCompletion,而不是Canceled。因为从TPL的角度来看,该任务正常完成了它的工作,所以我们在编写代码时需要辨别这两种情况,同时理解它在两种情况下职责的不同。
处理异常
在普通情况下,我们通常使用try-catch代码块来处理异常,但在TPL中,最底层的异常会被封装为一个AggregateException的通用异常,如果需要获取真正的异常,则需要访问InnerException属性,相关实现如图所示:
相关代码如下:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace TPLDemo
{
class Program
{
static void Main(string[] args)
{
Task<int> task;
try
{
task = Task.Run(() => TaskMethod("任务1", 2));
int result = task.Result;
Console.WriteLine($"结果为:{result}");
}
catch (Exception ex)
{
Console.WriteLine($"发生异常:{ex}");
}
Console.WriteLine("----------------------------------------------------------------------------------------");
Console.Read();
}
static int TaskMethod(string name, int seconds)
{
Console.WriteLine($@"Task {name} 是一个正在线程id为 {Thread.CurrentThread.ManagedThreadId} 上运行的任务,
是否为线程池线程:{Thread.CurrentThread.IsThreadPoolThread}");
Thread.Sleep(TimeSpan.FromSeconds(seconds));
throw new Exception("异常!");
}
}
}
运行后结果如图所示:
从代码实现和运行结果中,我们可以看出调用Task的Result属性,会使得当前线程等待直到该任务完成,并将异常传播到当前线程,因此我们可以通过catch捕获到该异常,且该异常的类型为AggregateException,同时我们打印出的结果包含底层真正异常内容。
但在TPL中,还有另外一种方式来处理异常,那就是使用Task的GetAwaiter和GetResult方法来获取结果,相关代码如下:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace TPLDemo
{
class Program
{
static void Main(string[] args)
{
Task<int> task;
try
{
task = Task.Run(() => TaskMethod("任务1", 2));
int result = task.Result;
Console.WriteLine($"结果为:{result}");
}
catch (Exception ex)
{
Console.WriteLine($"发生异常:{ex}");
}
Console.WriteLine("----------------------------------------------------------------------------------------");
Console.WriteLine();
try
{
task = Task.Run(() => TaskMethod("任务2", 2));
int result = task.GetAwaiter().GetResult();
Console.WriteLine($"结果为:{result}");
}
catch (Exception ex)
{
Console.WriteLine($"发生异常:{ex}");
}
Console.WriteLine("----------------------------------------------------------------------------------------");
Console.Read();
}
static int TaskMethod(string name, int seconds)
{
Console.WriteLine($@"Task {name} 是一个正在线程id为 {Thread.CurrentThread.ManagedThreadId} 上运行的任务,
是否为线程池线程:{Thread.CurrentThread.IsThreadPoolThread}");
Thread.Sleep(TimeSpan.FromSeconds(seconds));
throw new Exception("异常!");
}
}
}
运行后结果如图所示:
我们从结果中可以看出,在这种情况下,可以直接捕获到底层异常,而无需再访问InnerException属性,原因是TPL模型会直接提取该异常进行处理。
由上述两种情况我们可以得出结论:如果你需要直接获取并处理底层异常,那么请使用GetAwaiter和GetResult方法来获取Task的结果,反之,则可直接使用Result属性。
任务并行
我们在之前的示例中,都是单独创建任务并执行,每个任务的执行过程和结果都是独立的。那么,如果我们需要多个任务并行,要怎么做呢?可以使用如下方式:
我们分别创建了三个任务,但任务之间并不再是无关联的关系,而是使用了Task.WhenAll与ContineWith来使得它们以某种方式关联起来。
相关代码如下:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace TPLDemo
{
class Program
{
static void Main(string[] args)
{
var firstTask = new Task<int>(() => TaskMethod("任务1", 3));
var secondTask = new Task<int>(() => TaskMethod("任务2", 2));
var whenAllTask = Task.WhenAll(firstTask, secondTask);
whenAllTask.ContinueWith(x =>
{
Console.WriteLine($"任务1结果为:{x.Result[0]},任务2结果为:{x.Result[1]}");
}, TaskContinuationOptions.OnlyOnRanToCompletion);
firstTask.Start();
secondTask.Start();
Console.Read();
}
static int TaskMethod(string name, int seconds)
{
Console.WriteLine($@"Task {name} 是一个正在线程id为 {Thread.CurrentThread.ManagedThreadId} 上运行的任务,
是否为线程池线程:{Thread.CurrentThread.IsThreadPoolThread}");
Thread.Sleep(TimeSpan.FromSeconds(seconds));
return 42 * seconds;
}
}
}
运行后结果如图所示:
分析代码及运行结果,我们可以得知,在前两个任务完成后,第三个任务才开始运行,并且该任务的结果提供了一个结果数组,第一个元素是第一个任务的结果,第二个元素是第二个任务的结果,以此类推。
在TPL中,我们也可以创建另外一系列任务,并使用Task.WhenAny的方式等待这些任务中的任何一个执行完成。当有一个任务完成时,会从列表中移除该任务并继续等待其他任务完成,直到列表为空为止。获取任务的完成进展情况,或在运行任务时使用超时,都可以使用Task.WhenAny方法。例如我们等待一组任务运行,并且使用其中一个任务来记录是否超时,如果该任务先完成,那么我们只需取消其他还未完成的任务即可。
小结
我们在这一篇中,讲解了TPL的发展历程和使用方式,对比APM和EAP模型,TPL显得比较灵活且功能强大,支持取消、异常和并行等操作。
但TPL模型仍有它的不足之处
- 阅读此类程序代码时,仍难以理解程序的实际执行顺序。
- 处理异常时,不得不使用单独的后续操作任务来处理在之前的异步操作中发生的错误,导致了代码比较分散,增加了复杂度。
所以为了解决这些问题,微软直接从语言层面引入了更高级别的抽象,真正简化了异步编程,使得编写异步程序更为容易。那么它又是什么呢?它能为我们提供多少便利性呢?预知后事如何,且听下回分解。