Stream基本概念
- Stream 流是来自数据源的元素队列并支持聚合操作: 元素: 是特定类型的对象,是数据源形成的一个队列.Java中的Stream不会存储元素,而是按需计算 数据源: Stream流数据的来源.可以是集合,数组,I/O Channel,产生器Generator等 聚合操作: 类似SQL语句的一系列操作,比如filter,map,reduce,match和sorted等
- Stream 不是集合元素,不是数据结构来保存数据,是一个有关数据算法和计算的流: 获取一个数据源 source => 数据转换 => 执行操作获取需要的结果 每次转换后原来的 Stream 对象不变,返回一个新的 Stream 对象 这样可以使得对 Stream 的操作链式排列,形成一个管道
- Stream是对集合Collection对象功能的增强: Stream 可以对集合对象进行便利高效的聚合操作和大批量的数据操作 集合类持有的元素都是存储在内存中,数据量很大的集合类会占用大量的内存 .Stream 的元素是延迟计算的,只有在访问时才会进入内存进行计算 集合类的迭代逻辑是调用者负责的,通常使用 for 循环 .Stream 中的迭代是隐含在对 Stream 的各种操作当中
- Stream操作的两个基础特征: Pipelining: 中间操作都会返回流本身.多个操作可以串成一个管道.这样就可以对操作进行优化,比如延迟执行 laziness 和短路 short-circuiting 内部迭代: 通常对集合的遍历都是通过 Iterator 或者 forEach 操作,显式地在集合外部进行迭代 .Stream 提供了内部迭代的方式,通过访问者模式 Visitor 实现
- Stream的特点: 不存储: 数据流 Stream 不是存储元素的数据结构 数据流 Stream 将数据元素从源数据结构,数组,生成器函数和输入输出管道中传递到计算操作的管道中 功能性: 一个数据流 Stream 中的流操作都会产生一个结果,不会修改源数据流 Stream 中的元素 延迟计算: 许多流操作中的各种操作,比如过滤,映射,去重等,只有访问时才会进行计算,这样可以有更多的机会进行优化 无界性: 集合的大小是有界的,但是数据流 Stream 可以是无界的 短路操作等可以允许无限流计算在有限的时间内完成
- 示例: 将流程转化为Java代码
+--------------------+ +------+ +------+ +---+ +-------+
| Stream of elements +-----> |filter+-> |sorted+-> |map+-> |collect|
+--------------------+ +------+ +------+ +---+ +-------+
List<Integer> transactionsIds = widgets.stream()
.filter(b -> b.getColor == RED)
.sorted((x, y) -> x.getWeight() - y.getWeight())
.mapToInt(Widget :: getWeight)
.sum();
生成流
- 集合接口有两种方法来生成流:
- stream(): 为集合创建串行流
- parallelStream(): 为集合创建并行流 并行流可以充分发挥多核处理器的优势 使用fork和join并行方式来拆分任务和加速处理过程
List<String> strings = Arrays.asList("I", "want", "to", "be", "", "great");
List<String> filter = strings.stream().filter(string -> !string.isEmpty()).collect(Collectors.toList());
Stream的创建方式
数组
- 通过数组来生成Stream流
String[] arrayStr = {"a", "b", "c"};
// 使用Stream中的静态方法
Stream<String> stream = Stream.of(arrayStr);
集合
- 通过集合来生成Stream流
List<String> list = Arrays.asList("a", "b", "c");
Stream<String> strem = list.stream();
Stream.generate()
- 通过Stream.generate()方法来生成Stream流
Stream<Integer> generate = Stream.generate(() ->);
Stream.iterate()
- 通过Stream.iterate()方法来生成Stream流
Stream<Integer> iterate = Stream.iterate(, x -> x + 1);
API
- 通过其余相关的API进行创建
String str = "api";
IntStream stream = str.chars();
Stream常用方法
中间操作
- 中间操作: 如果调用方法之后返回的结果是 Stream 对象的操作是一个中间操作
filter
- filter方法用于通过设置的条件过滤出元素
/*
* 过滤出空字符串的个数
*/List<String> strings = Arrays.asList("I", "want", "to", "be", "", "great");
long count = Strings.stream().filter(string -> string.isEmpty()).count();
/*
* 过滤出偶数
*/ Arrays.asList(, 2, 3, 4, 5).stream()
.filter(x -> x % == 0)
//, 4
. forEach (System.out :: println)
/*
* 求出所有的偶数和
*/ int count = Arrays.asList(, 2, 3, 4, 5, 6, 7, 8, 9).stream()
.filter(x -> x % == 0)
.mapToInt(x -> x)
.sum();
System.out.println(count);
distinct
- distinct方法通过元素的hashcode()和equals()去除重复元素
Arrays.asList(, 2, 3, 3, 3, 4, 5).stream()
.distinct()
//, 2, 3, 4, 5
.forEach(System.out :: println);
sorted
- sorted方法根据自然排序返回Stream流
- sorted(Comparator comparator)方法根据提供的Comparator进行排序
/*
* 使用sorted方法实现min求最小值和max求最大值
*/ List<Integer> list = Arrays.asList(, 2, 3, 4, 5);
// 数字的自然排序就是从小到大
Optional<Integer> min = list.stream().sorted().findFirst();
System.out.println(min :: get());
Optional<Integer> max = list.stream.sorted((a, b) -> b - a).findFirst();
System.out.println(max :: get());
/*
* 按照字母顺序a - z排序
*/ Arrays.asList("java", "python", "c").stream().sorted().forEach(System.out :: println);
/*
* 按照字母的长度进行排序
*/ Arrays.asList("java", "python", "c").stream().sorted((a, b) -> a.length() - b.length()).forEach(System.out :: println);
limit
- limit方法用来截取指定长度的Stream流
- skip方法用来丢弃指定长度的流数据,可以与limit配合实现分页
/*
* 打印出集合中 - 30的数据
*/ Stream.iterate(x -> x +).limit(50)
// 跳过前个
.skip()
// 输出 - 30
.forEach(System.out :: println);
map
- map用来将Stream流中的元素按照指定的函数映射成一个新的元素的Stream流
- flatMap用来将Stream流中的每个元素转换成另一个流,然后将所有流连接成一个新的流
List<String> list = Arrays.asList("a, b, c", ", 2, 3");
/*
* 将字符串转换为不带逗号的元素
*/ Stream<String> mapList = list.stream().map(s -> s.replace(",", ""));
// abc
mapList.forEach(System.out :: println);
Stream<String> flatMapList = list.stream().flatMap(s -> {
// 将每一个元素转换为流数据
String[] arrayStr = s.split(",");
Stream<String> stream = Arrays.stream(arrayStr);
return stream;
});
// a b c 2 3
flatMapList.forEach(System.out :: println);
peek
- peek方法用来获取Stream流中的每一个元素 类似于 map, 但是 map 中接收的是一个 Function 表达式,有返回值 peek 中接收的是一个 Consumer 表达式,没有返回值
String str = ", 22, 33, 44, 55";
// 22 33 44 55 165
System.out.println(Stream.of(str.split(",")).peek(System.out :: println).mapToInt(Integer :: valueOf).sum());
终止操作
循环-forEach
- forEach 用来循环遍历每一个元素
// 创建一个实体类,包含有一个属性为num属性.并且有一个build()方法用于为num赋值
String str = ", 2, 3"
/*
* 输出 num =, num = 2, num = 3
*/Stream.of(str.split(",")).map(x -> new User(x)).forEach(System.out :: println);
Stream.of(str.split(",")).map(User :: new).forEach(System.out :: println);
Stream.of(str.split(",")).map(x -> User.build(x)).forEach(System.out :: println);
Stream.of(str.split(",")).map(User :: build).forEach(System.out :: println);
计算-min,max,count, sum
- min: 返回 Stream 流元素中的最小值
- max: 返回 Stream 流元素中的最大值
- count: 返回 Stream 流元素中的总个数
- sum: 对 Stream 流元素进行求和
List<Integer> list = Arrays.asList(, 2, 3, 4, 5, 6);
// 求集合的最大值 -
System.out.println(list.stream().max((a, b) -> a - b).get());
// 求集合的最小值 -
System.out.println(list.stream().min((a, b) -> a - b).get());
// 统计元素的个数 -
System.out.println(list.stream().count());
// 元素求和
String str = ", 22, 33, 44, 55";
System.out.println(Stream.of(str.split(",")).mapToInt(x -> Integer.valueOf(x)).sum());
System.out.println(Stream.of(str.split(",")).mapToInt(Integer :: valueOf).sum());
System.out.println(Stream.of(str.split(",")).map(x -> Integer.valueOf(x)).mapToInt(x -> x).sum());
System.out.println(Stream.of(str.split(",")).map(Integer :: valueOf).mapToInt(x -> x).sum());
匹配-anyMatch,allMatch,noneMatch,findFirst,findAny
- anyMatch: 接收一个 Predicate 函数,只要 Stream 流中有一个元素满足该断言则返回 true, 否则返回 false
- allMatch: 接收一个 Predicate 函数,当 Stream 流中每一个元素都满足该断言则返回 true, 否则返回 false
- noneMatch: 接收一个 Predicate 函数,当 Stream 流中每一个元素都不符合该断言则返回 true, 否则返回 false
- findFirst: 返回 Stream 流中的第一个元素
- findAny: 返回Stream流中的任意一个元素
List<Integer> list = Arrays.asList(, 2, 3, 4, 5, 6);
// 如果集合的元素都大于等于则返回true
System.out.println(list.stream().allMatch(x -> x >=));
// 如果集合中有大于的元素,返回false
Systemm.out.println(list.stream().noneMatch(x -> x >));
// 如果集合中有大于的元素则返回true
System.out.println(list.stream().anyMatch(x -> x >));
// 取第一个偶数
System.out.println(list.stream().filter(x -> x % == 0).findFirst().get());
// 取任意一个偶数
System.out.println(list.stream().filter(x -> x % == 0).findAny().get());
收集器-toArray,collect
- collect: 接收一个 Collector 实例,将流中元素转变为另外一种数据结构
- Collector<T, A, R> 是一个接口,包含 5 个抽象方法: Supplier< A > supplier(): 创建一个结果容器 BiConsumer<A, T> accumulator(): 消费型接口. 第一个参数为容器 A, 第二个参数为流中元素 T BinaryOperator< A > combiner(): 函数接口. 该函数的作用是将各个子流程的运行结果,即 accumulator 函数操作后的容器 A 进行合并 Function<A, R> finisher(): 函数式接口. 参数为容器 A, 返回类型为 collect 方法需要的结果类型 R Set< Chracteristics > characteristics(): 返回一个不可变的 Set 集合,用来表明该 Collector 的特性
// 将对象值转化为List
List<Integer> ageList = userList.stream().map(User :: getAge).collect(Collectors.toList());
// 将对象值转化为Set
Set<Integer> ageSet = userList.stream().map(User :: getAge).collect(Collectors.toMap());
// 将对象值转换为Map. 要求key的取值不能相同
Map<String, Integer> userMap = userList.stream().collect(Colletors.toMap(User :: getName, User :: getAge));
// 字符串分隔符连接
String joinName = userList.stream().map(User :: getName).collect(Collectors.join(",", "(", ")"))
// 聚合操作 - 计算对象总数
Long count = userList.stream().collect(Collectors.counting());
// 聚合操作 - 计算最大值
Integer maxAge = userList.stream().map(User :: getAge).collect(Collectors.maxBy(Integer :: compare)).get();
// 聚合操作 - 计算对象值总数
Integer sumAge = userList.stream().collect(Collectors.summingInt(User :: getAge));
// 聚合操作 - 计算对象值的平均数
Double averageAge = userList.stream().collet(Collectors.averagingDouble(User :: getAge));
// 根据Age分组
Map<Integer, List<User>> ageMap = userList.stream().collect(Collectors.groupingBy(User :: getAge));
// 根据条件分区: 一部分大于, 一部分小于10
Map<Boolean, List<User>> partMap = userList.stream().collect(Collectors.partitionBy(x -> x.getAge() >));
// 规约
Integer value = userList.stream().map(User :: getAge).collect(Collectors.reducing(Integer :: sum)).get();
parallelStream
- parallelStream是流并行处理程序的代替方法
List<String> strings = Arrays.asList("I", "want", "to", "be", "", "great");
long count = strings.parallelStream().filter(string -> string.isEmpty()).count();
- 可以很容易的在顺序运行和并行之间进行直接切换
Collectors
- Collectors 类实现很多归约操作,比如将流转换成集合和聚合元素
- Collectors 可用于返回列表和字符串
List<String> strings = Arrays.asList("I", "want", "to", "be", "", "great");
// 过滤掉空字符串
List<String> filtered = strings.stream().filter(string -> !string.isEmpty()).collect(Collectors.toList());
// 合并字符串
String mergedString = strings.stream().filter(string -> !string.isEmpty()).collect(Collectors.joining(","));
averagingDouble
/**
* 返回一个Collector,生成应用于输入元素double值函数的算术平均值
* 结果可能会存在误差
*
* @param mapper 获取需要计算算数平均值的值的映射器
* @return Collector<T, ?, Double> 计算算术平均值的Collector
*/ public static <T> Collector<T, ?, Double> averagingDouble(ToDoubleFunction<? super T> mapper) {
/*
* 在收集操作的数组中:
* 索引 - 运行总和的高位
* 索引 - 补偿计算总和的低位
* 索引 - 可见值的数量
*/ return new CollectorImpl<>(
() -> new double[],
(a, t) -> { double val = mapper.applyAsDouble(t); sumWithCompensation(a, val); a[]++; a[3]+= val;},
(a, b) -> { sumWithCompensation(a, b[]); sumWithCompensation(a, b[1]); a[2] += b[2]; a[3] += b[3]; return a; },
a -> (a[] == 0) ? 0.0d : (computeFinalSum(a) / a[2]),
CH_NOID);
}
averagingInt
/**
* 返回一个Collector,生成应用于输入元素int值函数的算术平均值
*
* @param mapper 获取需要计算算数平均值的值的映射器
* @return Collector<T, ?, Double> 计算算术平均值的Collector
*/ public static <T> Collector<T, ?, Double> averagingInt(ToIntFunction<? super T> mapper) {
return new CollectorImpl<>(
() -> new long[],
(a, t) -> { a[] += mapper.applyAsInt(t); a[1]++; },
(a, b) -> { a[] += b[0]; a[1] += b[1]; return a; },
a -> (a[] == 0) ? 0.0d : (double) a[0] / a[1], CH_NOID);
}
averagingLong
/**
* 返回一个Collector,生成应用于输入元素long值函数的算术平均值
*
* @param mapper 获取需要计算算数平均值的值的映射器
* @return Collector<T, ?, Double> 计算算术平均值的Collector
*/ public static <T> Collector<T, ?, Double> averagingLong(ToLongFunction<? super T> mapper) {
return new CollectorImpl<>(
() -> new long[],
(a, t) -> { a[] += mapper.applyAsLong(t); a[1]++; },
(a, b) -> { a[] += b[0]; a[1] += b[1]; return a; },
a -> (a[] == 0) ? 0.0d : (double) a[0] / a[1], CH_NOID);
}
collectingAndThen
/**
* 调整Collector进行额外的转换
*
* @param downstream 生成的下游收集器
* @param finisher 额外执行的操作
* @return Collector<T,A,RR> 进行额外操作的下游收集器
*/public static<T,A,R,RR> Collector<T,A,RR> collectingAndThen(Collector<T,A,R> downstream, Function<R,RR> finisher) {
Set<Collector.Characteristics> characteristics = downstream.characteristics();
if (characteristics.contains(Collector.Characteristics.IDENTITY_FINISH)) {
if (characteristics.size() ==)
characteristics = Collectors.CH_NOID;
else {
characteristics = EnumSet.copyOf(characteristics);
characteristics.remove(Collector.Characteristics.IDENTITY_FINISH);
characteristics = Collections.unmodifiableSet(characteristics);
}
}
return new CollectorImpl<>(downstream.supplier(),
downstream.accumulator(),
downstream.combiner(),
downstream.finisher().andThen(finisher),
characteristics);
}
counting
/**
* 返回Collector中指定类型元素的数量
*
* @return Collector<T, ?, Long> 指定输入元素数量的收集器
*/public static <T> Collector<T, ?, Long> counting() {
return summingLong(e ->L);
}
groupingBy
/**
* 返回一个对输入的指定类型的元素执行分组操作的Collector
* - 根据分组函数进行分组操作
* - 并将结果以Map类型返回
*
* @param classifier 指定的分组函数
* @return Collector<T, ?, Map<K, List<T>>> Map类型的返回结果
*/public static <T, K> Collector<T, ?, Map<K, List<T>>> groupingBy(Function<? super T, ? extends K> classifier) {
return groupingBy(classifier, toList());
}
/**
* 返回一个对输入的指定类型的元素执行级联分组操作的Collector
* - 根据分组函数进行分组操作
* - 然后根据下游收集器对关联key的value值执行指定的规约操作
*
* @param classifier 指定的分组函数
* @param downstream 指定执行规约操作的下游收集器
* @return Collector<T, ?, Map<K, D>> 实现了级联分组操作的结果
*/public static <T, K, A, D> Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream) {
return groupingBy(classifier, HashMap::new, downstream);
}
/**
* 返回一个对输入的指定类型的元素执行级联分组操作的,并以指定的Map实现方式保存结果返回的Collector
* - 根据分组函数进行分组操作
* - 再根据下游收集器对关联key的value值执行指定的规约操作
* - 然后将结果保存到指定方式实现的Map中
*
* @param classifier 指定的分组函数
* @param mapFactory 指定的Map实现
* @param downstream 指定执行规约操作的下游收集器
* @return Collector<T, ?, M> 实现了级联分组操作的Collector结果
*/public static <T, K, D, A, M extends Map<K, D>> Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier, Supplier<M> mapFactory, Collector<? super T, A, D> downstream) {
Supplier<A> downstreamSupplier = downstream.supplier();
BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
BiConsumer<Map<K, A>, T> accumulator = (m, t) -> {
K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
A container = m.computeIfAbsent(key, k -> downstreamSupplier.get());
downstreamAccumulator.accept(container, t);
};
BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner());
@SuppressWarnings("unchecked")
Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory;
if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID);
}
else {
@SuppressWarnings("unchecked")
Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
Function<Map<K, A>, M> finisher = intermediate -> {
intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v));
@SuppressWarnings("unchecked")
M castResult = (M) intermediate;
return castResult;
};
return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID);
}
}
groupingByConcurrent
/**
* 返回一个对输入的指定类型的元素执行分组操作的并行Collector
* - 根据分组函数进行分组操作
* - 并将结果以Map类型返回
*
* @param classifier 指定的分组函数
* @return Collector<T, ?, ConcurrentMap<K, List<T>>> 实现分组操作的并发无序的Collector
*/public static <T, K> Collector<T, ?, ConcurrentMap<K, List<T>>> groupingByConcurrent(Function<? super T, ? extends K> classifier) {
return groupingByConcurrent(classifier, ConcurrentHashMap::new, toList());
}
/**
* 返回一个对输入的指定类型的元素执行级联分组操作的并行Collector
* - 根据分组函数进行分组操作
* - 然后根据下游收集器对关联key的指定value值执行指定的规约操作
*
* @param classifier 指定的分组函数
* @downstream 指定执行规约操作的下游收集器
* @return Collector<T, ?, ConcurrentMap<K, D>> 实现分组操作的并发无序的Collector
*/public static <T, K, A, D> Collector<T, ?, ConcurrentMap<K, D>> groupingByConcurrent(Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream) {
return groupingByConcurrent(classifier, ConcurrentHashMap::new, downstream);
}
/**
* 返回一个对输入的指定类型的元素执行级联分组操作的并行Collector
* - 根据分组函数进行分组操作
* - 再根据下游收集器对关联key的指定value值执行指定的规约操作
* - 然后将结果保存到指定方式实现的Map中
*
* @param classifier 指定的分组函数
* @param mapFactory 指定的Map实现
* @downstream 指定执行规约操作的下游收集器
* @return Collector<T, ?, ConcurrentMap<K, D>> 实现分组操作的并发无序的Collector
*/public static <T, K, A, D, M extends ConcurrentMap<K, D>> Collector<T, ?, M> groupingByConcurrent(Function<? super T, ? extends K> classifier, Supplier<M> mapFactory, Collector<? super T, A, D> downstream) {
Supplier<A> downstreamSupplier = downstream.supplier();
BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
BinaryOperator<ConcurrentMap<K, A>> merger = Collectors.<K, A, ConcurrentMap<K, A>>mapMerger(downstream.combiner());
@SuppressWarnings("unchecked")
Supplier<ConcurrentMap<K, A>> mangledFactory = (Supplier<ConcurrentMap<K, A>>) mapFactory;
BiConsumer<ConcurrentMap<K, A>, T> accumulator;
if (downstream.characteristics().contains(Collector.Characteristics.CONCURRENT)) {
accumulator = (m, t) -> {
K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
A resultContainer = m.computeIfAbsent(key, k -> downstreamSupplier.get());
downstreamAccumulator.accept(resultContainer, t);
};
}
else {
accumulator = (m, t) -> {
K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
A resultContainer = m.computeIfAbsent(key, k -> downstreamSupplier.get());
synchronized (resultContainer) {
downstreamAccumulator.accept(resultContainer, t);
}
};
}
if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_CONCURRENT_ID);
}
else {
@SuppressWarnings("unchecked")
Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
Function<ConcurrentMap<K, A>, M> finisher = intermediate -> {
intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v));
@SuppressWarnings("unchecked")
M castResult = (M) intermediate;
return castResult;
};
return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_CONCURRENT_NOID);
}
}
joining
/**
* 返回一个根据接收元素的顺序将元素连接成一个String字符串的Collector
*
* @return Collector<CharSequence, ?, String> 根据接收元素的顺序将元素连接成一个字符串的Collector
*/public static Collector<CharSequence, ?, String> joining() {
return new CollectorImpl<CharSequence, StringBuilder, String>(
StringBuilder::new, StringBuilder::append,
(r, r2) -> { r1.append(r2); return r1; },
StringBuilder::toString, CH_NOID);
}
/**
* 返回一个根据接收元素的顺序将元素连接成一个String字符串并以指定的分隔符分割的Collector
*
* @param delimiter 用于每个元素之间的分割符
* @return Collector<CharSequence, ?, String> 根据接收元素的顺序将元素连接成一个字符串并以指定分割符分割的Collector
*/public static Collector<CharSequence, ?, String> joining(CharSequence delimiter) {
return joining(delimiter, "", "");
}
/**
* 返回一个根据接收元素的顺序将元素连接成一个String字符串并以指定的分隔符分割和指定前缀和后缀的Collector
*
* @param delimiter 用于每个元素之间的分割符
* @param prefix 指定的前缀
* @param suffix 指定的后缀
* @return Collector<CharSequence, ?, String> 根据接收元素的顺序将元素连接成一个字符串并以指定分割符分割和指定的前缀和后缀的Collector
*/public static Collector<CharSequence, ?, String> joining(CharSequence delimiter, CharSequence prefix, CharSequence suffix) {
return new CollectorImpl<>(
() -> new StringJoiner(delimiter, prefix, suffix),
StringJoiner::add, StringJoiner::merge,
StringJoiner::toString, CH_NOID);
}
mapping
/**
* 在累积元素之前应用映射函数将类型为U的收集器调整为类型为T的收集器
* mapping操作对多级规约操作最有用
*
* @param mapper 映射函数
* @param downstream 下游收集器
* @return Collector<T, ?, R> 执行映射函数操作后的收集器
*/public static <T, U, A, R> Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper, Collector<? super U, A, R> downstream) {
BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
return new CollectorImpl<>(downstream.supplier(),
(r, t) -> downstreamAccumulator.accept(r, mapper.apply(t)),
downstream.combiner(), downstream.finisher(),
downstream.characteristics());
}
maxBy
/**
* 返回一个根据给定的比较器Comparator生成最大元素的Collector,使用Optional<T>描述
*
* @param comparator 指定的比较器
* @return Collector<T, ?, Optional<T>> 根据比较器生成最大元素的Collector
*/public static <T> Collector<T, ?, Optional<T>> maxBy(Comparator<? super T> comparator) {
return reducing(BinaryOperator.maxBy(comparator));
}
minBy
/**
* 返回一个根据给定的比较器Comparator生成最小元素的Collector,使用Optional<T>描述
*
* @param comparator 指定的比较器
* @return Collector<T, ?, Optional<T>> 根据比较器生成最小元素的Collector
*/public static <T> Collector<T, ?, Optional<T>> (Comparator<? super T> comparator) {
return reducing(BinaryOperator.minBy(comparator));
}
partitioningBy
/**
* 返回一个根据指定规则分类的Collector,结果保存到Map<Boolean, List>中
*
* @param predicate 指定的分类规则
* @return Collector<T, ?, Map<Boolean, List<T>>> 按照指定规则进行分类,并将结果保存到Map<Boolean, List>的收集器
*/public static <T> Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(Predicate<? super T> predicate) {
return partitioningBy(predicate, toList());
}
/**
* 返回一个根据指定规则分类的Collector,并将结果值按照下游收集器执行规约操作,然后将规约后的结果保存到Map<Boolean, D>中
*
* @param predicate 指定的分类规则
* @param downstream 指定的执行规约操作的下游收集器
* @return Collector<T, ?, Map<Boolean, D>> 按照指定规则进行分类,并将结果值按照下游收集器执行规约操作,然后将规约后的结果保存到Map<Boolean, D>的收集器
*/public static <T, D, A> Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate, Collector<? super T, A, D> downstream) {
BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
BiConsumer<Partition<A>, T> accumulator = (result, t) ->
downstreamAccumulator.accept(predicate.test(t) ? result.forTrue : result.forFalse, t);
BinaryOperator<A> op = downstream.combiner();
BinaryOperator<Partition<A>> merger = (left, right) ->
new Partition<>(op.apply(left.forTrue, right.forTrue),
op.apply(left.forFalse, right.forFalse));
Supplier<Partition<A>> supplier = () ->
new Partition<>(downstream.supplier().get(),
downstream.supplier().get());
if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
return new CollectorImpl<>(supplier, accumulator, merger, CH_ID);
}
else {
Function<Partition<A>, Map<Boolean, D>> finisher = par ->
new Partition<>(downstream.finisher().apply(par.forTrue),
downstream.finisher().apply(par.forFalse));
return new CollectorImpl<>(supplier, accumulator, merger, finisher, CH_NOID);
}
}
reducing
/**
* 返回一个按照指定的规约操作获得结果的Colletor.结果以Optional<T>描述
*
* @param op 指定的规约操作
* @return Collector<T, ?, Optional<T>> 按照指定的规约操作执行的结果
*/public static <T> Collector<T, ?, Optional<T>> reducing(BinaryOperator<T> op) {
class OptionalBox implements Consumer<T> {
T value = null;
boolean present = false;
@Override
public void accept(T t) {
if (present) {
value = op.apply(value, t);
}
else {
value = t;
present = true;
}
}
}
return new CollectorImpl<T, OptionalBox, Optional<T>>(
OptionalBox::new, OptionalBox::accept,
(a, b) -> { if (b.present) a.accept(b.value); return a; },
a -> Optional.ofNullable(a.value), CH_NOID);
}
/**
* 返回一个使用指定标识,按照指定的规约操作获得结果的Colletor
*
* @param identity 指定的标识,也是结果为空时的返回值
* @param op 指定的规约操作
* @return Collector<T, ?, T> 按照指定的规约操作执行的结果,当结果为空时返回指定的标识identity
*/public static <T> Collector<T, ?, T> reducing(T identity, BinaryOperator<T> op) {
return new CollectorImpl<>(
boxSupplier(identity),
(a, t) -> { a[] = op.apply(a[0], t); },
(a, b) -> { a[] = op.apply(a[0], b[0]); return a; },
a -> a[],
CH_NOID);
}
/**
* 返回一个使用指定标识,按照指定的映射操作和规约操作获得结果的Colletor
*
* @param identity 指定的标识,也是结果为空时的返回值
* @param mapper 指定的映射操作
* @param op 指定的规约操作
* @return Collector<T, ?, Optional<T>> 按照指定的映射操作和规约操作执行的结果,当结果为空时返回指定的标识identity
*/public static <T, U> Collector<T, ?, U> reducing(U identity, Function<? super T, ? extends U> mapper, BinaryOperator<U> op) {
return new CollectorImpl<>(
boxSupplier(identity),
(a, t) -> { a[] = op.apply(a[0], mapper.apply(t)); },
(a, b) -> { a[] = op.apply(a[0], b[0]); return a; },
a -> a[], CH_NOID);
}
summarizingDouble
/**
* 返回一个使用double类型映射每一个输入元素并返回结果值汇总信息的Collector
*
* @param mapper 指定的映射操作
* @return Collector<T, ?, DoubleSummaryStatistics> 汇总信息的规约操作
*/public static <T> Collector<T, ?, DoubleSummaryStatistics> summarizingDouble(ToDoubleFunction<? super T> mapper) {
return new CollectorImpl<T, DoubleSummaryStatistics, DoubleSummaryStatistics>(
DoubleSummaryStatistics::new,
(r, t) -> r.accept(mapper.applyAsDouble(t)),
(l, r) -> { l.combine(r); return l; }, CH_ID);
}
summarizingInt
/**
* 返回一个使用int类型映射每一个输入元素并返回结果值汇总信息的Collector
*
* @param mapper 指定的映射操作
* @return Collector<T, ?, IntSummaryStatistics> 汇总信息的规约操作
*/public static <T> Collector<T, ?, IntSummaryStatistics> summarizingInt(ToIntFunction<? super T> mapper) {
return new CollectorImpl<T, IntSummaryStatistics, IntSummaryStatistics>(
IntSummaryStatistics::new,
(r, t) -> r.accept(mapper.applyAsInt(t)),
(l, r) -> { l.combine(r); return l; }, CH_ID);
}
summarizingLong
/**
* 返回一个使用long类型映射每一个输入元素并返回结果值汇总信息的Collector
*
* @param mapper 指定的映射操作
* @return Collector<T, ?, IntSummaryStatistics> 汇总信息的规约操作
*/public static <T> Collector<T, ?, LongSummaryStatistics> summarizingLong(ToLongFunction<? super T> mapper) {
return new CollectorImpl<T, LongSummaryStatistics, LongSummaryStatistics>(
LongSummaryStatistics::new,
(r, t) -> r.accept(mapper.applyAsLong(t)),
(l, r) -> { l.combine(r); return l; }, CH_ID);
}
summingDouble
/**
* 返回一个使用double类型映射每一个输入元素并返回元素之和的Collector
* 如果不存在元素则返回
*
* @param mapper 指定的映射操作
* @return Collector<T, ?, Double> 执行元素求和规约操作的Collector
*/public static <T> Collector<T, ?, Double> summingDouble(ToDoubleFunction<? super T> mapper) {
/*
* 在为收集操作分配的数组中:
* 索引 - 运行和的高位
* 索引 - 补偿计算的和的低阶位求和
* 索引 - 当存在无穷大结果和时正确的符号
*/ return new CollectorImpl<>(
() -> new double[],
(a, t) -> { double val = mapper.applyAsDouble(t);
sumWithCompensation(a, val);
a[] += val;},
(a, b) -> { sumWithCompensation(a, b[]);
a[] += b[2];
return sumWithCompensation(a, b[]); },
a -> computeFinalSum(a),
CH_NOID);
}
summingInt
/**
* 返回一个使用int类型映射每一个输入元素并返回元素之和的Collector
* 如果不存在元素则返回
*
* @param mapper 指定的映射操作
* @return Collector<T, ?, Integer> 执行元素求和规约操作的Collector
*/public static <T> Collector<T, ?, Integer> summingInt(ToIntFunction<? super T> mapper) {
return new CollectorImpl<>(
() -> new int[],
(a, t) -> { a[] += mapper.applyAsInt(t); },
(a, b) -> { a[] += b[0]; return a; },
a -> a[], CH_NOID);
}
summingLong
/**
* 返回一个使用long类型映射每一个输入元素并返回元素之和的Collector
* 如果不存在元素则返回
*
* @param mapper 指定的映射操作
* @return Collector<T, ?, Integer> 执行元素求和规约操作的Collector
*/public static <T> Collector<T, ?, Long> summingLong(ToLongFunction<? super T> mapper) {
return new CollectorImpl<>(
() -> new long[],
(a, t) -> { a[] += mapper.applyAsLong(t); },
(a, b) -> { a[] += b[0]; return a; },
a -> a[], CH_NOID);
}
toCollection
/**
* 返回一个将接收的元素按照接收顺序放入生成的集合的Collector
*
* @param collectionFactory 指定的生成的集合
* @return Collector<T, ?, C> 将接收的元素按照接收顺序生成集合的Collector
*/public static <T, C extends Collection<T>> Collector<T, ?, C> toCollection(Supplier<C> collectionFactory) {
return new CollectorImpl<>(collectionFactory, Collection<T>::add,
(r, r2) -> { r1.addAll(r2); return r1; },
CH_ID);
}
toConcurrentMap
/**
* 返回一个将接收的元素放入并发的Map中的并发Collector,其中Map的key和value按照指定的映射函数生成
*
* @param keyMapper 键的映射器
* @param valueMapper 值的映射器
* @return Collector<T, ?, ConcurrentMap<K,U>> 将接收的元素按照放入并发的Map中的并发无序Collector
*/public static <T, K, U> Collector<T, ?, ConcurrentMap<K,U>> toConcurrentMap(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends U> valueMapper) {
return new CollectorImpl<>(ConcurrentHashMap::new,
uniqKeysMapAccumulator(keyMapper, valueMapper),
uniqKeysMapMerger(),
CH_CONCURRENT_ID);
}
/**
* 返回一个将接收的元素放入并发的Map中的并发Collector,其中Map的key和value按照指定的映射函数生成
* 如果存在重复的键值,则会应用指定的合并函数
*
* @param keyMapper 键的映射器
* @param valueMapper 值的映射器
* @param mergeFunction 指定的合并函数
* @return Collector<T, ?, ConcurrentMap<K,U>> 将接收的元素按照放入并发的Map中的并发Collector
*/public static <T, K, U> Collector<T, ?, ConcurrentMap<K,U>> toConcurrentMap(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends U> valueMapper, BinaryOperator<U> mergeFunction) {
return toConcurrentMap(keyMapper, valueMapper, mergeFunction, ConcurrentHashMap::new);
}
/**
* 返回一个将接收的元素放入指定方式实现的并发Map中的并发Collector,其中Map的key和value按照指定的映射函数生成
* 如果存在重复的键值,则会应用指定的合并函数
*
* @param keyMapper 键的映射器
* @param valueMapper 值的映射器
* @param mergeFunction 指定的合并函数
* @param mapFactory 指定的Map实现方式
* @return Collector<T, ?, M> 将接收的元素按照放入指定方式实现的并发Map中的并发Collector
*/public static <T, K, U, M extends ConcurrentMap<K, U>> Collector<T, ?, M> toConcurrentMap(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends U> valueMapper,BinaryOperator<U> mergeFunction,Supplier<M> mapFactory) {
BiConsumer<M, T> accumulator
= (map, element) -> map.merge(keyMapper.apply(element),
valueMapper.apply(element), mergeFunction);
return new CollectorImpl<>(mapFactory, accumulator, mapMerger(mergeFunction), CH_CONCURRENT_ID);
}
toList
/**
* 返回一个将接收的元素按照接收的顺序放入List中的收集器Collector
*
* @return Collector<T, ?, List<T>> 包含输入元素的List类型的收集器Collector
*/public static <T> Collector<T, ?, List<T>> toList() {
return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
(left, right) -> { left.addAll(right); return left; },
CH_ID);
}
toMap
/**
* 返回一个将接收的元素放入Map中的Collector,其中Map的key和value按照指定的映射函数生成
*
* @param keyMapper 键映射函数
* @param valueMapper 值映射函数
* @return Collector<T, ?, Map<K,U>> 将接收的元素放入Map中的Collector
*/public static <T, K, U> Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends U> valueMapper) {
return new CollectorImpl<>(HashMap::new,
uniqKeysMapAccumulator(keyMapper, valueMapper),
uniqKeysMapMerger(),
CH_ID);
}
/**
* 返回一个将接收的元素放入Map中的Collector,其中Map的key和value按照指定的映射函数生成
* 如键重复,则使用指定的合并函数合并
*
* @param keyMapper 键映射函数
* @param valueMapper 值映射函数
* @param mergeFunction 指定的合并函数
* @return Collector<T, ?, Map<K,U>> 将接收的元素放入Map中的Collector
*/public static <T, K, U> Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends U> valueMapper, BinaryOperator<U> mergeFunction) {
return toMap(keyMapper, valueMapper, mergeFunction, HashMap::new);
}
/**
* 返回一个将接收的元素放入指定实现方式的Map中的Collector,其中Map的key和value按照指定的映射函数生成
* 如键重复,则使用指定的合并函数合并
*
* @param keyMapper 键映射函数
* @param valueMapper 值映射函数
* @param mergeFunction 指定的合并函数
* @param mapFactory 指定的Map实现方式
* @return Collector<T, ?, Map<K,U>> 将接收的元素放入指定实现方式的Map中的Collector
*/public static <T, K, U, M extends Map<K, U>> <T, ?, M> toMap(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends U> valueMapper, BinaryOperator<U> mergeFunction, <M> mapFactory) {
BiConsumer<M, T> accumulator
= (map, element) -> map.merge(keyMapper.apply(element),
valueMapper.apply(element), mergeFunction);
return new CollectorImpl<>(mapFactory, accumulator, mapMerger(mergeFunction), CH_ID);
}
toSet
/**
* 返回一个将接收的元素放入Set中的收集器Collector
*
* @return Collector<T, ?, Set<T>> 将接收的元素放入Set中的Collector
*/public static <T> Collector<T, ?, Set<T>> toSet() {
return new CollectorImpl<>((Supplier<Set<T>>) HashSet::new, Set::add,
(left, right) -> {
if (left.size() < right.size()) {
right.addAll(left); return right;
} else {
left.addAll(right); return left;
}
},
CH_UNORDERED_ID);
}
统计
- 产生统计结果的收集器,主要用于 int,double,long 等类型
List<Integer> numbers = Arrays.asList(, 2, 3, 7, 3, 5);
IntSummaryStatistics stats = numbers.stream().mapToInt((x) -> x).summaryStatistics();
System.out.println("列表中最大的数:" + stats.getMax());
System.out.println("列表中最小的数:" + stats.getMin());
System.out.println("所有数之和:" + stats.getSum());
System.out.println("平均数:" + stats.getAverage());