17370845950

并发处理共享列表并收集结果的方案

本文旨在介绍如何利用 Java 并行流高效地处理大型列表,尤其是在每个元素的处理过程耗时较长的情况下。并行流能够将列表分割成多个子任务,并在多个线程上并发执行,从而显著提升处理速度。但同时,并发编程也带来了共享资源同步的问题,需要谨慎处理。

使用并行流并发处理列表

假设我们有一个 Foo 类,其 process 方法需要处理一个 Bar 类型的列表,并且 handle 方法的处理过程比较耗时。为了提高效率,我们可以将列表分割成多个子列表,然后使用并行流并发处理每个子列表。

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

class Foo {
    private int len;

    public Foo(int len) {
        this.len = len;
    }

    public void process(List list) {
        List> sublists = new ArrayList<>();
        for (int i = 0; i < list.size(); i += len) {
            sublists.add(list.subList(i, Math.min(i + len, list.size())));
        }

        // 并行处理子列表
        sublists.parallelStream()
                .forEach(this::handle);
    }

    private void handle(List sublist) {
        // 耗时的处理逻辑
        System.out.println("Processing sublist: " + sublist);
        try {
            Thread.sleep(100); // 模拟耗时操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Bar {
    private int id;

    public Bar(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "Bar{" +
                "id=" + id +
                '}';
    }
}

public class Main {
    public static void main(String[] args) {
        List list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            list.add(new Bar(i));
        }

        Foo foo = new Foo(3);
        foo.process(list);
    }
}

在这个例子中,我们首先将原始列表分割成多个大小为 len 的子列表。然后,我们使用 sublists.parallelStream().forEach(this::handle) 并行处理每个子列表。parallelStream() 方法将列表转换为并行流,forEach() 方法对流中的每个元素执行指定的操作。

收集并行处理的结果

如果 handle 方法返回一个结果,并且我们需要收集所有结果,可以使用 map 和 collect 方法。

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

class Foo {
    private int len;

    public Foo(int len) {
        this.len = len;
    }

    public List process(List list) {
        List> sublists = new ArrayList<>();
        for (int i = 0; i < list.size(); i += len) {
            sublists.add(list.subList(i, Math.min(i + len, list.size())));
        }

        // 并行处理子列表并收集结果
        return sublists.parallelStream()
                .map(this::handle)
                .collect(Collectors.toList());
    }

    private String handle(List sublist) {
        // 耗时的处理逻辑,并返回结果
        System.out.println("Processing sublist: " + sublist);
        try {
            Thread.sleep(100); // 模拟耗时操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Result of " + sublist;
    }
}

class Bar {
    private int id;

    public Bar(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "Bar{" +
                "id=" + id +
                '}';
    }
}

public class Main {
    public static void main(String[] args) {
        List list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            list.add(new Bar(i));
        }

        Foo foo = new Foo(3);
        List results = foo.process(list);
        System.out.println("Results: " + results);
    }
}

在这个例子中,handle 方法返回一个字符串结果。我们使用 sublists.parallelStream().map(this::handle).collect(Collectors.toList()) 并行处理每个子列表,并将结果收集到一个列表中。map 方法将流中的每个元素转换为另一个元素,collect 方法将流中的所有元素收集到一个集合中。

并发环境下的同步问题

需要注意的是,当 handle 方法访问共享资源时,需要进行同步处理,以避免出现线程安全问题。例如,如果 handle 方法需要修改一个共享的变量,可以使用 synchronized 关键字或 java.util.concurrent 包中的并发工具类来保证线程安全。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

class Foo {
    private int len;
    private AtomicInteger counter = new AtomicInteger(0); // 使用 AtomicInteger 保证线程安全

    public Foo(int len) {
        this.len = len;
    }

    public void process(List list) {
        List> sublists = new ArrayList<>();
        for (int i = 0; i < list.size(); i += len) {
            sublists.add(list.subList(i, Math.min(i + len, list.size())));
        }

        // 并行处理子列表
        sublists.parallelStream()
                .forEach(this::handle);
    }

    private void handle(List sublist) {
        // 耗时的处理逻辑,并访问共享资源
        System.out.println("Processing sublist: " + sublist);
        try {
            Thread.sleep(100); // 模拟耗时操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 原子性地增加计数器
        counter.addAndGet(sublist.size());
    }

    public int getCounter() {
        return counter.get();
    }
}

class Bar {
    private int id;

    public Bar(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "Bar{" +
                "id=" + id +
                '}';
    }
}

public class Main {
    public static void main(String[] args) {
        List list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            list.add(new Bar(i));
        }

        Foo foo = new Foo(3);
        foo.process(list);

        // 等待所有任务完成
        try {
            Thread.sleep(2000); // 确保所有任务都已完成
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Counter: " + foo.getCounter());
    }
}

在这个例子中,我们使用 AtomicInteger 来保证计数器的线程安全。AtomicInteger 提供了原子性的 addAndGet 方法,可以安全地增加计数器的值。

总结

并行流是 Java 中一种强大的并发处理工具,可以显著提高列表处理的效率。但是,在使用并行流时,需要注意共享资源的同步问题,并选择合适的并发工具类来保证线程安全。同时,需要合理分割任务,避免过多的线程切换带来的性能损耗。