NazcaSpace
navigation

Java Stream在Android上的应用

notion image
对于一名Android开发者而言,在代码中使用Lambda表达式或是Method References应该是常规操作了,毕竟IDE也会提示你replace的嘛。同样作为Java8的重要特性,Java Stream出现的频率明显更低。在日常的开发中,合理的使用Java Stream可以简化代码,提高代码的可读性,同时Java Stream也可作为RxJava的补充。本文将对Java Stream的概念以及使用做一个说明,同时给出Java Stream在常见的手机机型上的性能数据。
 

Java中的函数式编程

函数式编程,定义的是输入和输出之间的映射关系。在Java 8中,Java为函数式编程引入了三个新的语法概念:Stream类、Lambda表达式和函数接口(Functional Inteface)。更多关于函数式的说明,可参考
 
Java Stream作为Java函数式编程的重要应用,提供了大量的api,函数式接口,让开发者能够以函数式编程的方式对数据流进行处理。Java Stream有几个显著的特点(From JavaDoc)
  1. Stream是延迟操作的。Stream由0或多个中间操作符(Intermediate Operaions)和一个终态操作符(Terminal Operation)组成,只有当初始化终态操作符时,对数据源的操作才会触发。
  1. Stream本身不存储数据。数据从数据源到终态操作符的传递都是通过管道(Stream Pipeline)进行。
  1. Stream不改变数据源中的数据。例如使用filter对数据源筛选时,不会真的将不满足条件的数据从数据源中删去
  1. Stream的数据只能被访问一次。比如数据源是一个int型数组[1,2,3], 当Stream遍历到2后,当前Stream便不被允许再次访问1,只能重新创建一个Stream,重新将数据源作为输入传递给新创建的Stream。

使用Java Stream简化代码

Example 1 (例子来自文章)
假设你有一个业务需求,要求筛选出年龄大于等于60的用户,然后将他们按照年龄从大到小排序并将他们的名字放在 List 中返回。
// 非Stream实现 public List<String> collect(List<User> users) { List<User> userList = new ArrayList<>(); // 筛选出年龄大于等于 60 的用户 for (User user : users) if (user.age >= 60) userList.add(user); // 将他们的年龄从大到小排序 userList.sort(Comparator.comparing(User::getAge).reversed()); List<String> result = new ArrayList<>(); for (User user : userList) result.add(user.name); return result; }
使用Stream实现:代码更简洁且提升了可读性
public List<String> collect(List<User> users) { return users.stream() .filter(user -> user.age >= 60) .sorted(comparing(User::getAge).reversed()) .map(User::getName) .collect(Collectors.toList()); }
Example 2
我们还可以将Java Stream与CompletableFuture结合一起使用,构建出更加清晰易懂的异步代码。
假设有这样一段业务需求:
输入一个String数组,将数组中的每一个String作为请求报文的参数,向服务器发起请求,等所有结果返回后,组装成字符串列表返回。
没有使用Stream时,我们实现这个需求需要使用并发控制等待所有结果执行完成后,再将结果拼装到字符转列表中。为了实现这一点,我们可以使用信号量进行并发控制,也可以用CountDownLatch控制,当然更可以用ForkJoin框架完成,但这些都需要写较多的代码,并且需要进行显式的并发控制。
如果我们使用Java Stream的并发流搭配CompletableFuture,那么实现会如下所示,链式调用一目了然,针对每一个元素,请求网络,等网络返回后获取结果,并使用filter过滤异常结果,最后使用collect操作符将结果组装成List返回。
List<String> read(List<String> fileList) { return fileList.parallelStream() .map(this::getStringFromNet) .map(it -> { try { return it.get(); // 这一段代码也可以写入到getStringFromNet中 } catch (ExecutionException|InterruptedException e) { return null; } }) .filter(Objects::nonNull) .collect(Collectors.toList()); } CompletableFuture<String> getStringFromNet(String file) { // 耗时操作 }
 

Android下使用Java Stream

第一步:环境配置。
  1. 启用JAVA8
  1. SDK版本24以上支持Stream
第二步: 创建一个Stream
  • List自带方法创建Stream,如上示例代码。
  • 对于数组,也有对应方法: Arrays.stream(xxx)生成
第三步:加入多个中间操作符
常见的有: map(), filter()等
第四步:加入一个终态操作符
满足简单需求可以使用reduce()
若是业务复杂,可使用collect()

中间操作符

map
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
map()方法定义如上,看起来很复杂,但实际等价于: R = F(T),即通过传入的Function将T类型的入参处理成R类型的出参。
在理解map方法时,出参可以直接认为获得了R,之所以代码中返回的是Stream,一方面是链式调用所需,另一方面是因为中间操作符本质上是在构建Stream链表,这里的出参Stream<R>最终会由终态操作符来触发操作,获得R。
filter
Stream<T> filter(Predicate<? super T> predicate);
等价于函数:pass = P(T) 若返回true,则pass。
flatMap
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
flatMap与map一样等价于 R = F(T),但与map不同的是,flatMap对于每一个输入T,会生成一个新的Stream。如下图所示,map并不改变流本身,而flatmap会改变流本身,在处理的过程中,flatmap先改变一个流,然后flatmap遍历所有改变后的流依次执行后续操作符。
notion image

终态操作符

查找:findAny(),findFirst 找到流中的元素。找到即可返回(短路Short-circuiting)
计数: count()
归并: reduce()
T reduce(T identity, BinaryOperator<T> accumulator);
最简单的一种使用方式如上, 等价于 T = F(T, U), 遍历流中的每一个元素,对每一个元素U和上一次执行F得到的结果T继续执行函数F,得到最新的结果T。

使用流需要注意几下几点:

  1. 不要改变数据源本身,即不能再中间操作符中执行对数据源本身的更改,例如add,remove数据源数组
  1. 在stream中不要操作管道外的数据。左下面的代码在并发流中是有问题的,我们完全可以通过改变stream的写法达到一样的效果,如右下。例子来自于
ArrayList<String> results = new ArrayList<>(); stream.filter(s -> pattern.matcher(s).matches()) .forEach(s -> results.add(s));
List<String>results = stream.filter(s -> pattern.matcher(s).matches()) .collect(Collectors.toList());
 

使用并发流(Parallel Stream)

Stream是一个串行的流,只会在单线程下顺序遍历数据源中的数据依次执行操作符。一旦我们使用Parallel Stream,就可以使用多线程并发的遍历数据源中的数据,在各自的线程下依次执行操作符,最后讲结果合并到一起。
因此,相比于Stream,Parallel Stream需要更多的解决以下2个问题:
  1. 多线程下,数据源中的分片规则是什么?这个规则需要保证每个子线程获取的数据不包含重复部分,又要注重效率。
  1. 子线程执行完成后,如何保证结果能够正确的合并?
一般情况下,开发者不需要关心第一个问题,Java提供的IterSpliterator已经帮助我们处理好了第一个问题。本篇文章不会涉及Stream的内部原理,下面将从Collector接口类说明如何解决第二个问题。
Collector
Collector是一个接口,用于collect终态操作符完成数据的归集。
<R, A> R collect(Collector<? super T, A, R> collector); public interface Collector<T, A, R> { Supplier<A> supplier(); BiConsumer<A, T> accumulator(); BinaryOperator<A> combiner(); Function<A, R> finisher(); Set<Characteristics> characteristics(); }
对于一个Stream,无论是否为并发流,都携带了Characteristics,不同的Characteristics包含了不同的Stream特性。对于并发流,有一个关键的Characteristics——CONCURRENT,是否设置了该特性,决定了不同的使用方式,默认不设置CONCURRENT。
设置COCURRENT
第一个方法supplier。 他的作用是提供数据容器。我们知道Collector的作用是数据归集,那自然需要有一个容器来存放数据。无论有多少个子线程并发从数据源取数据,supplier提供的数据容器只有一个,所以这种情况下supplier提供的数据容器应当是线程安全的。
第二个方法是归集方法,与上文所述reduce()一样。 A = F(A, T),当然,其中的A就是supplier提供的。
第三个方法在本情况下无需使用
第四个方法,如果需要返回的数据类型是R,但是归集到的数据类型是A,则最后通过finisher进行转换,如果A和R类型一致,就不需要使用。当然需要使用IDENTITY_FINISH告诉Strean无需转换。
未设置COCURRENT
第一个方法supplier。 有多少个子线程就会有多少个数据容器,因此每个线程互不打扰,可以使用非线程安全的容器。
第二个方法与上文一致。
第三个方法因为多个子线程有多个数据容器,因此这个方法需要完成多个容器之间的数据合并,即解决上文所述的第二个问题。
第四个方法与上文一致。
 
举例说明,知道了上面的用法, 我们来一个StackOverflow上的提问:
提问者说到,针对以下代码,为何reduce()不能起到预期的结果?但只要不适用并发流,就可以得到预期的结果。
public class App { public static void main(String[] args) { String[] grades = {"A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K"}; StringBuilder concat = Arrays.stream(grades).parallel() .reduce(new StringBuilder(), (sb, s) -> sb.append(s), (sb1, sb2) -> sb1.append(sb2)); System.out.println(concat); } }
collect()函数是最为底层的归集函数,reduce()函数虽然并不是collect()函数的简化实现,但基本原理是类似的。代码中我们看到创建了一个StringBuilder的非线程安全类,另外reduce()不支持子线程创建各自的数据归集容器,因此这里一定会发生并发问题。
那是不是把StringBuilder换成StringBuffer就可以了呢?也不行,因为全局只有一个数据归集容器,所以就不必要Combiner了,也就是reduce()第三个参数一定为null,但很可惜,reduce()不支持第三个参数为null,也就是无论如何,reduce()都会执行Combiner,既然这样,我们就知道了reduce()不太适合在并发流中使用。那解决的思路也就有了,使用collect(),改成如下代码,即可正常运行。
StringBuilder concat = Arrays.stream(grades).parallel() .collect(StringBuilder::new, StringBuilder::append, StringBuilder::append);
 
[1]
[2]
[3]
badge