当前位置:首页(Home) >> 文件内容显示(File Show)

试读《C#并发编程经典实例》(文前+Rx基础+测试)

图 灵 程 序 设 计 丛 书  
C#并发编程经典实例  
Concurrency in C# Cookbook  
[
美]Stephen Cleary  
相银初  
Beijing ? Cambridge ? Farnham ? K?ln ? Sebastopol ? Tokyo  
人 民 邮 电 出 版 社  
??京  
内 容 提 要  
本书全面讲解 C# 并发编程技术,侧重于.NET 平台上较新、较实用的方法。全书分为几大部分:  
首先介绍几种并发编程技术,包括异步编程、并行编程、TPL 数据流、响应式编程 ;然后阐述一  
些重要的知识点,包括测试技巧、互操作、取消并发、函数式编程与 OOP、同步、调度 ;最后介  
绍了几个实用技巧。全书共包含 70 多个有配套源码的实用方法,可用于服务器程序、桌面程序和  
移动应用的开发。  
本书适合具有 .NET 基础,希望学习最新并发编程技术的开发人员阅读。  
著????[] Stephen Cleary  
译????相银初  
责任编辑?李松峰  
执行编辑?李?静?曹静雯  
责任印制?杨林杰  
人民邮电出版社出版发行??北京市丰台区成寿寺路11号  
邮编?100164??电子邮件?315@ptpress.com.cn  
网址?http://www.ptpress.com.cn  
北京??????印刷  
开本:800×1000?1/16  
印张:11.75  
字数:237千字  
印数:1 3 000册  
20151 月第 1 版  
20151 月北京第 1次印刷  
著作权合同登记号 图字:01-2014-6523号  
定价:49.00元  
读者服务热线:(010)51095186600?印装质量热线:(010)81055316  
反盗版热线:(010)81055315  
广告经营许可证:京崇工商广字第 0021 号  
版权声明  
?
2014 by O’Reilly Media, Inc.  
Simplified Chinese Edition, jointly published by O’Reilly Media, Inc. and Posts & Telecom  
Press, 2015. Authorized translation of the English edition, 2014 O’Reilly Media, Inc., the  
owner of all rights to publish and sell the same.  
All rights reserved including the rights of reproduction in whole or in part in any form.  
英文原版由 O’Reilly Media, Inc. 出版,2014。  
简体中文版由人民邮电出版社出版,2015。英文原版的翻译得到 O’Reilly Media, Inc.  
的授权。此简体中文版的出版和销售得到出版权和销售权的所有者 —— O’Reilly  
Media, Inc. 的许可。  
版权所有,未得书面许可,本书的任何部分和全部不得以任何形式重制。  
III  
O’Reilly Media, Inc.绍  
O’Reilly Media 通过图书、杂志、在线服务、调查研究和会议等方式传播创新知识。  
1978 年开始,O’Reilly 一直都是前沿发展的见证者和推动者。超级极客们正在开创  
着未来,而我们关注真正重要的技术趋势——通过放大那些“细微的信号”来刺激社  
会对新科技的应用。作为技术社区中活跃的参与者,O’Reilly 的发展充满了对创新的  
倡导、创造和发扬光大。  
O’Reilly 为软件开发人员带来革命性的“动物书”;创建第一个商业网站(GNN);组  
织了影响深远的开放源代码峰会,以至于开源软件运动以此命名;创立了 Make 杂志,  
从而成为 DIY 革命的主要先锋;公司一如既往地通过多种形式缔结信息与人的纽带。  
O’Reilly 的会议和峰会集聚了众多超级极客和高瞻远瞩的商业领袖,共同描绘出开创  
新产业的革命性思想。作为技术人士获取信息的选择,O’Reilly 现在还将先锋专家的  
知识传递给普通的计算机用户。无论是通过书籍出版,在线服务或者面授课程,每一  
O’Reilly 的产品都反映了公司不可动摇的理念——信息是激发创新的力量。  
业界评论  
OReilly Radar 博客有口皆碑。”  
Wired  
OReilly 凭借一系列(真希望当初我也想到了)非凡想法建立了数百万美元的业务。”  
Business 2.0  
OReilly Conference 是聚集关键思想领袖的绝对典范。”  
CRN  
一本 OReilly 的书就代表一个有用、有前途、需要学习的主题。”  
Irish Times  
Tim 是位特立独行的商人,他不光放眼于最长远、最广阔的视野并且切实地按照  
Yogi Berra 的建议去做了:‘如果你在路上遇到岔路口,走小路(岔路)。’回顾过去  
Tim 似乎每一次都选择了小路,而且有几次都是一闪即逝的机会,尽管大路也不错。”  
Linux Journal  
目录  
译者序 ....................................................................................................................................................IX  
前言.........................................................................................................................................................XI  
1 章?并发编程概述.......................................................................................................................  
1
.1?并发编程简介.............................................................................................................................  
.2?异步编程简介.............................................................................................................................  
.3?并行编程简介.............................................................................................................................  
.4?响应式编程简介.........................................................................................................................  
1
1
1
1
1
1
1
1
1
1
3
7
9
.5?数据流简介...............................................................................................................................11  
.6?多线程编程简介.......................................................................................................................13  
.7?并发编程的集合.......................................................................................................................13  
.8?现代设计...................................................................................................................................14  
.9?技术要点总结...........................................................................................................................14  
2 章?异步编程基础.....................................................................................................................17  
.1?暂停一段时间...........................................................................................................................18  
2
.2?返回完成的任务.......................................................................................................................19  
2
.3?报告进度...................................................................................................................................21  
2
.4?等待一组任务完成...................................................................................................................22  
2
.5?等待任意一个任务完成...........................................................................................................25  
2
.6?任务完成时的处理...................................................................................................................26  
2
.7?避免上下文延续.......................................................................................................................29  
2
2.8?处理 async Task方法的异常.  
2.9?处理 async void方法的异常.  
.................................................................................................30  
.................................................................................................32  
V
3 章?并行开发的基础 ................................................................................................................35  
.1?数据的并行处理.......................................................................................................................35  
3
.2?并行聚合...................................................................................................................................37  
3
.3?并行调用...................................................................................................................................38  
3
.4?动态并行...................................................................................................................................40  
3
.5?并行 LINQ ................................................................................................................................41  
3
4 章?数据流基础 .........................................................................................................................43  
.1?链接数据流块...........................................................................................................................44  
4
.2?传递出错信息...........................................................................................................................45  
4
.3?断开链接...................................................................................................................................47  
4
.4?限制流量...................................................................................................................................48  
4
.5?数据流块的并行处理...............................................................................................................48  
4
.6?创建自定义数据流块...............................................................................................................49  
4
5 章?Rx 基础................................................................................................................................51  
.1?转换 .NET 事件........................................................................................................................52  
5
.2?发通知给上下文.......................................................................................................................54  
5
.3?用窗口和缓冲对事件分组.......................................................................................................56  
5
.4?用限流和抽样抑制事件流.......................................................................................................58  
5
.5?超时...........................................................................................................................................60  
5
6 章?测试技巧..............................................................................................................................63  
6.1?async方法的单元测试 .  
...........................................................................................................64  
6.2?预计失败的 async方法的单元测试 .  
......................................................................................65  
6.3?async void方法的单元测试.  
..................................................................................................67  
.4?数据流网格的单元测试...........................................................................................................68  
6
.5?Rx Observable 对象的单元测试..............................................................................................70  
6
.6?用虚拟时间测试 Rx Observable 对象.....................................................................................72  
6
7 章?互操作..................................................................................................................................75  
7.1?用 async代码封装 Async方法与 Completed事件 .  
...............................................................75  
.......................................................................................77  
..........................................................................................78  
..................................................................................................80  
7.2?用 async代码封装 Begin/End方法.  
7.3?用 async代码封装所有异步操作 .  
7.4?用 async代码封装并行代码 .  
7.5?用 async代码封装 Rx Observable 对象 .  
................................................................................80  
................................................................................82  
7.6?用 Rx Observable 对象封装 async代码 .  
.7?Rx Observable 对象和数据流网格..........................................................................................83  
7
VI 目录  
8 章?集合.......................................................................................................................................85  
.1?不可变栈和队列.......................................................................................................................87  
8
.2?不可变列表...............................................................................................................................89  
8
.3?不可变 Set 集合........................................................................................................................91  
8
.4?不可变字典...............................................................................................................................93  
8
.5?线程安全字典...........................................................................................................................94  
8
.6?阻塞队列...................................................................................................................................96  
8
.7?阻塞栈和包...............................................................................................................................99  
8
.8?异步队列.................................................................................................................................100  
8
.9?异步栈和包.............................................................................................................................102  
8
.10?阻塞 / 异步队列....................................................................................................................104  
8
9 章?取消.....................................................................................................................................109  
.1?发出取消请求.........................................................................................................................110  
9
.2?通过轮询响应取消请求.........................................................................................................112  
9
.3?超时后取消.............................................................................................................................114  
9
9.4?取消 async代码 .  
....................................................................................................................115  
.5?取消并行代码.........................................................................................................................116  
9
9
9
9
9
.6?取消响应式代码.....................................................................................................................117  
.7?取消数据流网格.....................................................................................................................119  
.8?注入取消请求.........................................................................................................................120  
.9?与其他取消体系的互操作.....................................................................................................122  
10 章?函数式 OOP...................................................................................................................125  
0.1?异步接口和继承...................................................................................................................125  
1
0.2?异步构造:工厂...................................................................................................................127  
1
0.3?异步构造:异步初始化模式...............................................................................................129  
1
0.4?异步属性...............................................................................................................................132  
1
0.5?异步事件...............................................................................................................................134  
1
0.6?异步销毁...............................................................................................................................137  
1
11 章?同步..................................................................................................................................143  
1.1?阻塞锁...................................................................................................................................148  
1
1.2?异步锁...................................................................................................................................149  
1
1.3?阻塞信号...............................................................................................................................151  
1
1.4?异步信号...............................................................................................................................152  
1
1.5?限流.......................................................................................................................................154  
1
目录 | VII  
12 章?调度..................................................................................................................................157  
2.1?调度到线程池.......................................................................................................................157  
1
2.2?任务调度器...........................................................................................................................159  
1
2.3?调度并行代码.......................................................................................................................161  
1
2.4?用调度器实现数据流的同步...............................................................................................161  
1
13 章?实用技巧 .........................................................................................................................163  
3.1?初始化共享资源...................................................................................................................163  
1
3.2?Rx 延迟求值 .........................................................................................................................165  
1
3.3?异步数据绑定.......................................................................................................................166  
1
3.4?隐式状态...............................................................................................................................168  
1
封面介绍..............................................................................................................................................170  
VIII 目录  
译者序  
关于并发编程的几个误解  
关于并发编程,很多人都有一些误解。  
误解一并发就是多线程  
实际上多线程只是并发编程的一种形式,在 C# 中还有很多更实用、更方便的并发编程技  
术,包括异步编程、并行编程、TPL 数据流、响应式编程等。  
误解二只有大型服务器程序才需要考虑并发  
服务器端的大型程序要响应大量客户端的数据请求,当然要充分考虑并发。但是桌面程序  
和手机、平板等移动端应用同样需要考虑并发编程,因为它们是直接面向最终用户的,而  
现在用户对使用体验的要求越来越高。程序必须能随时响应用户的操作,尤其是在后台处  
理时(读写数据、与服务器通信等),这正是并发编程的目的之一。  
误解三并发编程很复杂必须掌握很多底层技术  
C# .NET 提供了很多程序库,并发编程已经变得简单多了。尤其是 .NET 4.5 推出了全新  
asyncawait关键字,使并发编程的代码减少到了最低限度。并行处理和异步开发已  
经不再是高手们的专利,只要使用本书中的方法,每个开发人员都能写出交互性良好、高  
效、可靠的并发程序。  
本书的特色  
本书全面讲解 C# 并发编程技术,侧重于 .NET 平台上较新、较实用的方法。全书分为几大  
IX  
部分:首先介绍几种并发编程技术,包括异步编程、并行编程、TPL 数据流、响应式编程  
等;然后是一些重要的知识点,包括测试技巧、互操作、取消并发、函数式编程与 OOP、  
同步、调度等;最后介绍了几个实用技巧。书中包含 70 多个配有源码的实用方法,可用  
于服务器程序、桌面程序和移动端应用的开发。  
本书填补了一个市场空白:它是一本用最新方法进行并发编程的入门指引和参考书。  
本书作者 Stephen Cleary 是美国著名的软件开发者和技术书作家、C# MVP,在 C#/C++/  
JavaScript 等方面均有丰富的经验。我非常有幸能翻译他的著作。  
翻译中的一点感受  
过去的十多年我一直在从事软件开发和设计工作。相信国内很多开发人员都和我一样,心  
中存在着一个疑惑:我国的软件人员很多(绝对数量不会比美国少),但为什么软件技术  
总体上落后欧美国家那么多?确定翻译《C# 并发编程经典实例》这本书后,我一边仔细  
阅读原书,一边遵循作者的思路,逐渐发现作者思考问题的一个理念。这就是按软件的不  
同层次进行明确分工,我只负责我所实现的这个层次,底层技术是为上层服务的,我只负  
责选择和调用,不管内部的实现过程;同样,我负责的层次为更高一层的软件提供服务,  
供上层调用,也不需要上层关心我的内部实现。  
由此想到,这正好反映出国内开发人员中的一个通病,即分工不够细、技术关注不够精。  
很多公司和团队在开发时都喜欢大包大揽,从底层到应用层全部自己实现;很多开发人员  
也热衷于“大而全”地学习技术,试图掌握软件开发中的各种技术,而不是精通某一方  
面。甚至流行这样一种观点,实现底层软件、写驱动的才是高级开发人员,做上层应用的  
人仅仅是“码农”。本书作者明确地反对了这种看法,书中强调如何利用好现成的库,而  
不是全部采用底层技术自己实现。利用现成的库开发出高质量的软件,对技术能力的考验  
并不低于开发底层库。  
感谢  
在本书的翻译过程中,得到了图灵公司李松峰老师的支持和帮助,在此表示感谢。由于本  
人水平有限,书中难免有疏忽和错误,恳请读者朋友们批评指正。  
2
014 10 月于深圳  
X | 译者序  
5 章  
Rx基础  
LINQ 能。 LINQ to Objects 于  
IEnumerable<T>) 和 LINQ to Entities IQueryableT>) 是 LINQ 供  
者。另外还有很多提供者,并且大多数都采用相同的基本架构。查询是延后执行(lazily  
evaluated)的,只有在需要时才会从序列中获取数据。从概念上讲,这是一种拉取模式。  
在查询过程中数据项是被逐个拉取出来的。  
Reactive ExtensionsRx)把事件看作是依次到达的数据序列。因此,将 Rx 认作是 LINQ  
to events(基于 IObservable<T>)也是可以的,它与其他 LINQ 提供者的主要区别在于,  
Rx 采用“推送”模式。就是说,Rx 的查询规定了在事件到达时程序该如何响应。Rx 在  
LINQ 的基础上构建,增加了一些功能强大的操作符,作为扩展方法。  
本章介绍一些更常用的 Rx 操作。需要注意的是,所有的 LINQ 操作都可以在 Rx 中使用。  
从概念上看,过滤(Where)、投影(Select)等简单操作,和其他 LINQ 提供者的操作是  
一样的。本章不介绍那些常见的 LINQ 操作,而将重点放在 Rx LINQ 基础上增加的新  
功能,尤其是与时间有关的功能。  
要使用 Rx,需要在应用中安装一个 NuGet Rx-Main。支持 Reactive Extensions 的平台非  
常丰富(参见表 5-1)。  
5-1:支持Reactive Extensions的平台  
平??台  
Rx支持情况  
.
NET 4.5  
NET 4.0  
.
5
1
续)  
平??台  
Rx支持情况  
Mono iOS/Droid  
Windows Store  
Windows Phone Apps 8.1  
Windows Phone SL 8.0  
Windows Phone SL 7.1  
Silverlight 5  
5
.1?转换.NET事件  
问题  
把一个事件作为 Rx 输入流,每次事件发生时通过 OnNext生成数据。  
解决方案  
Observable类定义了一些事件转换器。大部分 .NET 框架事件与 FromEventPattern兼容,  
对于不遵循通用模式的事件,需要改用 FromEvent。  
FromEventPattern最适合使用委托类型为 EventHandler<T>的事件。很多较新框架类的事  
件都采用了这种委托类型。例如,Progress<T>类定义了事件 ProgressChanged,这个事件  
的委托类型就是 EventHandler<T>,因此,它就很容易被封装到 FromEventPattern:  
var progress = new Progress<int>();  
var progressReports = Observable.FromEventPattern<int>(  
handler => progress.ProgressChanged += handler,  
handler => progress.ProgressChanged -= handler);  
progressReports.Subscribe(data => Trace.WriteLine("OnNext:" + data.EventArgs));  
请注意,data.EventArgs是强类型的 intFromEventPattern的类型参数(上例中为 int)  
EventHandler<T>T相同。Rx FromEventPattern中的两个 Lambda 参数来实现订阅  
和退订事件。  
较新的 UI 框架采用 EventHandler<T>,可以很方便地应用在 FromEventPattern中。但是有  
些较旧的类常为每个事件定义不同的委托类型。这些事件也能在 FromEventPattern中使用,  
但需要做一些额外的工作。例如,System.Timers.Timer类有一个事件 Elapsed,它的类型是  
ElapsedEventHandler。对此旧类事件,可以用下面的方法封装进 FromEventPattern:  
var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };  
var ticks = Observable.FromEventPattern<ElapsedEventHandler, ElapsedEventArgs>(  
handler => (s, a) => handler(s, a),  
handler => timer.Elapsed += handler,  
5
2 | 第 5 章  
handler => timer.Elapsed -= handler);  
ticks.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs.SignalTime));  
注意,data.EventArgs仍然是强类型的。现在 FromEventPattern的类型参数是对应的事件  
处理程序和 EventArgs的派生类。FromEventPattern的第一个 Lambda 参数是一个转换器,  
它将 EventHandler<ElapsedEventArgs>转换成 ElapsedEventHandler。除了传递事件,这个  
转换器不应该做其他处理。  
上面代码的语法明显有些别扭。另一个方法是使用反射机制:  
var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };  
var ticks = Observable.FromEventPattern(timer, "Elapsed");  
ticks.Subscribe(data => Trace.WriteLine("OnNext: "  
+
((ElapsedEventArgs)data.EventArgs).SignalTime));  
采用这种方法后,调用 FromEventPattern就简单多了。但是这种方法也有缺点:出现了  
一个怪异的字符串("Elapsed"),并且消息的使用者不是强类型了。就是说,这时 data.  
EventArgsobject类型,需要人为地转换成 ElapsedEventArgs。  
讨论  
事件是 Rx 流数据的主要来源。本节介绍如何封装遵循标准模式的事件(标准事件模式:  
第一个参数是事件发送者,第二个参数是事件的类型参数)。对于不标准的事件类型,可  
以用重载 Observable.FromEvent的办法,把事件封装进 Observable对象。  
把 事 件 封 装 进 Observable 象 后, 每 次 引 发 该 事 件 都 会 调 用 OnNext 在 处 理  
AsyncCompletedEventArgs 时会发生令人奇怪的现象,所有的异常信息都是通过数据  
形 式 传 递 的(OnNext), 而 不 是 通 过 错 误 传 递(OnError)。 看 一 个 封 装 WebClient.  
DownloadStringCompleted的例子:  
var client = new WebClient();  
var downloadedStrings = Observable.FromEventPattern(client,  
"DownloadStringCompleted");  
downloadedStrings.Subscribe(  
data =>  
{
var eventArgs = (DownloadStringCompletedEventArgs)data.EventArgs;  
if (eventArgs.Error != null)  
Trace.WriteLine("OnNext: (Error) " + eventArgs.Error);  
else  
Trace.WriteLine("OnNext: " + eventArgs.Result);  
}
,
ex => Trace.WriteLine("OnError: " + ex.ToString()),  
) => Trace.WriteLine("OnCompleted"));  
(
client.DownloadStringAsync(new Uri("http://invalid.example.com/"));  
WebClient.DownloadStringAsync出错并结束时,引发带有异常 AsyncCompletedEventArgs.Error  
Rx基础 | 53  
的事件。可惜 Rx 会把这作为一个数据事件,因此这个程序的结果是显示OnNext:(Error),  
而不是OnError:。  
有些事件的订阅和退订必须在特定的上下文中进行。例如,很多 UI 控件的事件必须在 UI  
线程中订阅。Rx 提供了一个操作符 SubscribeOn,可以控制订阅和退订的上下文。大多数  
情况下没必要使用这个操作符,因为基于 UI 的事件订阅通常就是在 UI 线程中进行的。  
参阅  
5
.2 节介绍如何修改引发事件的上下文。  
5
.4 节介绍如何对事件限流,以免订阅者因事件太多而崩溃。  
5
.2?发通知给上下文  
问题  
Rx 尽量做到了线程不可知(thread agnostic)。因此它会在任意一个活动线程中发出通知  
例如 OnNext)。  
但是我们通常希望通知只发给特定的上下文。例如 UI 元素只能被它所属的 UI 线程控制,  
因此,如果要根据 Rx 的通知来修改 UI,就应该把通知“转移”到 UI 线程。  
解决方案  
Rx 提供了 ObserveOn操作符,用来把通知转移到其他线程调度器。  
看下面的例子,使用 Interval,每秒钟产生一个 OnNext通知:  
private void Button_Click(object sender, RoutedEventArgs e)  
{
Trace.WriteLine("UI thread is " + Environment.CurrentManagedThreadId);  
Observable.Interval(TimeSpan.FromSeconds(1))  
.
Subscribe(x => Trace.WriteLine("Interval " + x + " on thread " +  
Environment.CurrentManagedThreadId));  
}
用我的电脑测试,显示结果为:  
UI thread is 9  
Interval 0 on thread 10  
Interval 1 on thread 10  
Interval 2 on thread 11  
Interval 3 on thread 11  
Interval 4 on thread 10  
5
4 | 第 5 章  
Interval 5 on thread 11  
Interval 6 on thread 11  
因为 Interval基于一个定时器(没有指定的线程),通知会在线程池线程中引发,而不是  
UI 线程中。要更新 UI 元素,可以通过 ObserveOn输送通知,并传递一个代表 UI 线程  
的同步上下文:  
private void Button_Click(object sender, RoutedEventArgs e)  
{
var uiContext = SynchronizationContext.Current;  
Trace.WriteLine("UI thread is " + Environment.CurrentManagedThreadId);  
Observable.Interval(TimeSpan.FromSeconds(1))  
.
.
ObserveOn(uiContext)  
Subscribe(x => Trace.WriteLine("Interval " + x + " on thread " +  
Environment.CurrentManagedThreadId));  
}
ObserveOn的另一个常用功能是可以在必要时离开 UI 线程。假设有这样的情况:鼠标一移  
动,就意味着需要进行一些 CPU 密集型的计算。默认情况下,所有的鼠标移动事件都发  
生在 UI 线程,因此可以使用 ObserveOn把通知移动到一个线程池线程,在那里进行计算,  
然后再把表示结果的通知返回给 UI 线程:  
private void Button_Click(object sender, RoutedEventArgs e)  
{
?
var uiContext = SynchronizationContext.Current;  
Trace.WriteLine("UI thread is " + Environment.CurrentManagedThreadId);  
Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(  
handler => (s, a) => handler(s, a),  
handler => MouseMove += handler,  
handler => MouseMove -= handler)  
.
.
.
{
Select(evt => evt.EventArgs.GetPosition(this))  
ObserveOn(Scheduler.Default)  
Select(position =>  
// 复杂的计算过程。  
Thread.Sleep(100);  
var result = position.X + position.Y;  
Trace.WriteLine("Calculated result " + result + " on thread " +  
Environment.CurrentManagedThreadId);  
return result;  
}
.
.
)
ObserveOn(uiContext)  
Subscribe(x => Trace.WriteLine("Result " + x + " on thread " +  
Environment.CurrentManagedThreadId));  
}
运行这段代码的话,就会发现计算过程是在线程池线程中进行的,计算结果在 UI 线程中  
显示。另外,还会发现计算和结果会滞后于输入,形成等待的队列,这种现象出现的原因  
在于,比起 100 1 次的计算,鼠标移动的更新频率更高。Rx 中有几种技术可以处理这  
种情况,其中一个常用方法是对输入流速进行限制,具体会在 5.4 节介绍。  
Rx基础 | 55  
讨论  
实际上,ObserveOn是把通知转移到一个 Rx 调度器上了。本节介绍了默认调度器(即线程  
池)和一种创建 UI 调度器的方法。ObserveOn最常用的功能是移到或移出 UI 线程,但调  
度器也能用于别的场合。6.6 节介绍高级测试时,将再次关注调度器。  
参阅  
5
5
6
.1 节介绍如何利用事件创建序列。  
.4 节介绍如何限制事件流的流速。  
.6 节介绍测试 Rx 代码的特殊流程。  
5
.3?用窗口和缓冲对事件分组  
问题  
有一系列事件,需要在它们到达时进行分组。举个例子,需要对一些成对的输入作出响  
应。第二个例子,需要在 2 秒钟的窗口期内,对所有输入进行响应。  
解决方案  
Rx 提供了两个对到达的序列进行分组的操作:BufferWindowBuffer会留住到达的  
事件,直到收完一组事件,然后会把这一组事件以一个集合的形式一次性地转送过去。  
Window会在逻辑上对到达的事件进行分组,但会在每个事件到达时立即传递过去。Buffer  
的返回类型是 IObservable<IList<T>>(由若干个集合组成的事件流);Window的返回类型  
IObservable<IObservable<T>>(由若干个事件流组成的事件流)。  
下面的例子使用 Interval,每秒创建 1 OnNext通知,然后每 2 个通知做一次缓冲:  
private void Button_Click(object sender, RoutedEventArgs e)  
{
Observable.Interval(TimeSpan.FromSeconds(1))  
.
.
Buffer(2)  
Subscribe(x => Trace.WriteLine(  
DateTime.Now.Second + ": Got " + x[0] + " and " + x[1]));  
}
用我的电脑测试,每 2 秒产生 2 个输出:  
1
1
1
3: Got 0 and 1  
5: Got 2 and 3  
7: Got 4 and 5  
5
6 | 第 5 章  
1
2
9: Got 6 and 7  
1: Got 8 and 9  
下面的例子有些类似,使用 Window创建一些事件组,每组包含 2 个事件:  
private void Button_Click(object sender, RoutedEventArgs e)  
{
Observable.Interval(TimeSpan.FromSeconds(1))  
.
.
{
Window(2)  
Subscribe(group =>  
Trace.WriteLine(DateTime.Now.Second + ": Starting new group");  
group.Subscribe(  
x => Trace.WriteLine(DateTime.Now.Second + ": Saw " + x),  
(
) => Trace.WriteLine(DateTime.Now.Second + ": Ending group"));  
}
);  
}
用我的电脑测试,输出的结果就是这样:  
1
1
1
1
1
2
2
2
2
2
2
2
2
7: Starting new group  
8: Saw 0  
9: Saw 1  
9: Ending group  
9: Starting new group  
0: Saw 2  
1: Saw 3  
1: Ending group  
1: Starting new group  
2: Saw 4  
3: Saw 5  
3: Ending group  
3: Starting new group  
这几个例子说明了 BufferWindow的区别。Buffer等待组内的所有事件,然后把所有事  
件作为一个集合发布。Window用同样的方法进行分组,但它是在每个事件到达时就发布。  
BufferWindow都可以使用时间段作为参数。在下面的例子中,所有的鼠标移动事件被  
收集进窗口,每秒一个窗口:  
private void Button_Click(object sender, RoutedEventArgs e)  
{
Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(  
handler => (s, a) => handler(s, a),  
handler => MouseMove += handler,  
handler => MouseMove -= handler)  
Buffer(TimeSpan.FromSeconds(1))  
Subscribe(x => Trace.WriteLine(  
DateTime.Now.Second + ": Saw " + x.Count + " items."));  
.
.
}
输出的结果依赖于怎么移动鼠标,类似于这样:  
Rx基础 | 57  
4
5
5
5
5
5
5
9: Saw 93 items.  
0: Saw 98 items.  
1: Saw 39 items.  
2: Saw 0 items.  
3: Saw 4 items.  
4: Saw 0 items.  
5: Saw 58 items.  
讨论  
BufferWindow可用来抑制输入信息,并把输入塑造成我们想要的样子。另一个实用技  
术是限流(throttling),将在 5.4 节介绍。  
BufferWindows都有其他重载,可用在更高级的场合。参数为 skiptimeShift的重载  
能创建互相重合的组,还可跳过组之间的元素。还有一些重载可使用委托,可对组的边界  
进行动态定义。  
参阅  
5
.1 节介绍如何利用事件创建序列。  
5
.4 节介绍对事件流进行限流。  
5
.4?用限流和抽样抑制事件流  
问题  
有时事件来得太快,这是编写响应式代码时经常碰到的问题。一个速度太快的事件流可导  
致程序的处理过程崩溃。  
解决方案  
Rx 专门提供了几个操作符,用来对付大量涌现的事件数据。ThrottleSample这两个操  
作符提供了两种不同方法来抑制快速涌来的输入事件。  
Throttle建立了一个超时窗口,超时期限可以设置。当一个事件到达时,它就重新开始计  
时。当超时期限到达时,它就把窗口内到达的最后一个事件发布出去。  
下面的例子也是监视鼠标移动,但使用了 Throttle,在鼠标保持静止 1 秒后才报告最近一  
条移动事件。  
private void Button_Click(object sender, RoutedEventArgs e)  
{
Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(  
5
8 | 第 5 章  
handler => (s, a) => handler(s, a),  
handler => MouseMove += handler,  
handler => MouseMove -= handler)  
.
.
.
Select(x => x.EventArgs.GetPosition(this))  
Throttle(TimeSpan.FromSeconds(1))  
Subscribe(x => Trace.WriteLine(  
DateTime.Now.Second + ": Saw " + (x.X + x.Y)));  
}
输出结果依赖于鼠标的实际动作,我的测试结果是这样:  
4
4
5
5
7: Saw 139  
9: Saw 137  
1: Saw 424  
6: Saw 226  
Throttle常用于类似“文本框自动填充”这样的场合,用户在文本框中输入文字,当他停  
止输入时,才需要进行真正的检索。  
为抑制快速运动的事件序列,Sample操作符使用了另一种方法。Sample建立了一个有规律  
的超时时间段,每个时间段结束时,它就发布该时间段内最后的一条数据。如果这个时间  
段没有数据,就不发布。  
下面的例子捕获鼠标移动,每隔一秒采样一次。 与 Throttle不同,使用 Sample的例子中,  
不需要让鼠标静止一段时间,就可要看到结果。  
private void Button_Click(object sender, RoutedEventArgs e)  
{
Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(  
handler => (s, a) => handler(s, a),  
handler => MouseMove += handler,  
handler => MouseMove -= handler)  
.
.
.
Select(x => x.EventArgs.GetPosition(this))  
Sample(TimeSpan.FromSeconds(1))  
Subscribe(x => Trace.WriteLine(  
DateTime.Now.Second + ": Saw " + (x.X + x.Y)));  
}
我先让鼠标静止几秒钟,然后连续移动,得到了下面的输出结果:  
1
1
1
1
2
2
2: Saw 311  
7: Saw 254  
8: Saw 269  
9: Saw 342  
0: Saw 224  
1: Saw 277  
讨论  
对于快速涌来的输入,限流和抽样是很重要的两种工具。别忘了还有一个过滤输入的简单  
Rx基础 | 59  
方法,就是采用标准 LINQ Where操作符。可以这样说,ThrottleSample操作符与  
Where基本差不多,唯一的区别是 ThrottleSample根据时间段过滤,而 Where根据事件  
的数据过滤。在抑制快速涌来的输入流时,这三种操作符提供了三种不同的方法。  
参阅  
5
.1 节介绍如何创建事件序列。  
5
.2 节介绍如何修改引发事件的上下文。  
5
.5?超时  
问题  
我们希望事件能在预定的时间内到达,即使事件不到达,也要确保程序能及时进行响应。  
通常此类事件是单一的异步操作(例如,等待 Web 服务请求的响应)。  
解决方案  
Timeout操作符在输入流上建立一个可调节的超时窗口。一旦新的事件到达,就重置超  
时窗口。如果超过期限后事件仍没到达,Timeout操作符就结束流,并产生一个包含  
TimeoutExceptionOnError通知。  
下面的代码向一个域名发出 Web 请求,并使用 1 秒作为超时值:  
private void Button_Click(object sender, RoutedEventArgs e)  
{
var client = new HttpClient();  
client.GetStringAsync("http://www.example.com/").ToObservable()  
.
.
Timeout(TimeSpan.FromSeconds(1))  
Subscribe(  
x => Trace.WriteLine(DateTime.Now.Second + ": Saw " + x.Length),  
ex => Trace.WriteLine(ex));  
}
Timeout非常适用于异步操作,例如 Web 请求,但它也能用于任何事件流。下面的例子在  
监视鼠标移动时使用 Timeout,使用起来更加简单:  
private void Button_Click(object sender, RoutedEventArgs e)  
{
Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(  
handler => (s, a) => handler(s, a),  
handler => MouseMove += handler,  
handler => MouseMove -= handler)  
Select(x => x.EventArgs.GetPosition(this))  
.
6
0 | 第 5 章  
.
.
Timeout(TimeSpan.FromSeconds(1))  
Subscribe(  
x => Trace.WriteLine(DateTime.Now.Second + ": Saw " + (x.X + x.Y)),  
ex => Trace.WriteLine(ex));  
}
我移动了一下鼠标,然后停止 1 秒,得到如下结果:  
1
1
1
1
6: Saw 180  
6: Saw 178  
6: Saw 177  
6: Saw 176  
System.TimeoutException: The operation has timed out.  
值得注意的是,一旦向 OnError发送 TimeoutException,整个事件流就结束了,不会继续  
传来鼠标移动事件。为了阻止这种情况出现,Timeout操作符具有重载方式,在超时发生  
时用另一个流来替代,而不是抛出异常并结束流。  
下面的例子,在超时之前观察鼠标移动,超时发生后进行切换,观察鼠标点击:  
private void Button_Click(object sender, RoutedEventArgs e)  
{
var clicks = Observable.FromEventPattern  
<
MouseButtonEventHandler, MouseButtonEventArgs>(  
handler => (s, a) => handler(s, a),  
handler => MouseDown += handler,  
handler => MouseDown -= handler)  
Select(x => x.EventArgs.GetPosition(this));  
.
Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(  
handler => (s, a) => handler(s, a),  
handler => MouseMove += handler,  
handler => MouseMove -= handler)  
.
.
.
Select(x => x.EventArgs.GetPosition(this))  
Timeout(TimeSpan.FromSeconds(1), clicks)  
Subscribe(  
x => Trace.WriteLine(  
DateTime.Now.Second + ": Saw " + x.X + "," + x.Y),  
ex => Trace.WriteLine(ex));  
}
我先移动一下鼠标,停止 1 秒,然后在两个不同的位置点击。下面的输出表明,超时发生  
前鼠标移动事件在进行快速移动,超时后变成两个鼠标点击事件:  
4
4
4
4
5
5
9: Saw 95,39  
9: Saw 94,39  
9: Saw 94,38  
9: Saw 94,37  
3: Saw 130,141  
5: Saw 469,4  
Rx基础 | 61  
讨论  
Timeout操作符对优秀的程序来说是十分必要的,因为我们总是希望程序能及时响应,即  
使外部环境不理想。它可用于任何事件流,尤其是在异步操作时。需要注意,此时内部的  
操作并没有真正取消,操作将继续执行,直到成功或失败。  
参阅  
5
7
9
9
.1 节介绍如何利用事件创建序列。  
.6 节介绍如何把异步代码封装成 Observable 对象事件流。  
.6 节介绍收到 CancellationToken时如何从序列中退订。  
.3 节介绍用 CancellationToken来实现超时功能。  
6
2 | 第 5 章  
6 章  
测试技巧  
测试是保证软件质量必不可少的环节。近年来,提倡单元测试的人越来越多,到处都能听  
到有关单元测试的讨论。有人提倡测试驱动型的开发模式,以保证软件测试和开发同步进  
行、同时完成。大家都知道单元测试在保证代码质量和整个开发过程中的作用,然而大多  
数开发人员直到今天都没有真正编写过单元测试。  
我建议大家至少写一些单元测试,首先从自己觉得最没信心的代码开始。根据我个人的经  
验,单元测试主要有两大好处。  
(
1) 更好地理解代码。你是否遇到过这种情况:你了解程序的某个部分能正常运行,却对  
它的实现原理一无所知。当软件出现了令你不可思议的错误时,这种疑问常常占据你  
的内心深处。要理解那些特别“难”的代码的内部机理,编写单元测试就是一个很好  
的办法。编写描述代码行为的单元测试之后,就不会觉得这部分代码神秘了。编写一  
批单元测试后,最终就能搞清那些代码的行为,以及它们和其他代码之间的依赖关系。  
(
2) 修改代码时更有把握。迟早会有那么一天,你会因为有功能需求而必须修改那些“恐  
怖”的代码,你将无法继续假装它不存在。(我了解那种感觉。我经历过!)最好提前  
做好准备:在此类需求到来之前,为那些恐怖的代码编写单元测试。提前准备,以免  
以后麻烦。如果你的单元测试是完整的,你就相当于有了一个早期预警系统,如果修  
改后的代码影响到已有功能时,它就会立即发出警告。  
不管是你自己还是其他人的代码,都能获得上述好处。我敢肯定单元测试还能带来其他好  
处。单元测试能减少错误出现的频率吗?很有可能。单元测试能减少项目的整体时间吗?  
有可能。但是我在上面列出的几条好处是肯定会有的。我每次编写单元测试时都能感受到。  
6
3
因此,我强烈推荐单元测试。  
本章的内容全部是关于测试的。很多开发人员(甚至包括经常编写单元测试的人)都逃避  
并发代码的单元测试,因为他们总觉得非常难。然而本章的内容将会告诉大家,并发代码  
的单元测试并没有想象中那么难。现在的语言功能和开发库,例如 asyncRx,在测试的  
方便性方面做了很多考虑,并且确实能体现出这点。我建议大家使用本章的方法编写单元  
测试,尤其是并发编程的新手(就是认为新并发代码“很难”或“可怕”的人)。  
6
.1?async方法的单元测试  
问题  
需要对 async方法进行单元测试。  
解决方案  
现在大多数单元测试框架都支持 async Task类型的单元测试,包括 MSTestNUnit、  
xUnit。从 Visual Studio 2012 开始,MSTest 才支持 async Task类型的单元测试,因此需要  
将老版本升级到最新版本。  
下面是一个 async类型 MSTest 单元测试的例子:  
[TestMethod]  
public async Task MyMethodAsync_ReturnsFalse()  
{
var objectUnderTest = ...;  
bool result = await objectUnderTest.MyMethodAsync();  
Assert.IsFalse(result);  
}
单元测试框架检测到方法的返回类型是 Task,会自动 加上 await等待任务完成,然后将测  
试结果标记为“成功”或“失败”。  
如果单元测试框架不支持 async Task类型的单元测试,就需要做一些额外的修改才能等待  
异步操作。其中一种做法是使用 Task.Wait,并在有错误时拆开 AggregateException对象。  
我的建议是使用 NuGet Nito.AsyncEx中的 AsyncContext类:  
[TestMethod]  
public void MyMethodAsync_ReturnsFalse()  
{
AsyncContext.Run(async () =>  
{
var objectUnderTest = ...;  
bool result = await objectUnderTest.MyMethodAsync();  
Assert.IsFalse(result);  
6
4 | 第 6 章  
}
);  
}
AsyncContext.Run