返回顶部

Java多线程并发工具包漫游指南

[复制链接]
阿莫Lv.2 显示全部楼层 发表于 2022-7-12 11:53:29 |阅读模式 打印 上一主题 下一主题
  多线程安全是一个非常重要的技术点,在Java中提供了很多并发工具用于解决线程安全问题。本文的整理基于Java8,梳理常用的并发工具,包括队列、容器、线程池、锁、原子操作及同步工具。


  一、阻塞队列

  阻塞队列有BlockingQueue、BlockingDueue、TransferQueue三种类型,具有插入/取值操作(put/take)无法成功时阻塞当前线程的特点,也可以在指定的超时时间(offer/poll)后超时返回。阻塞队列的特点是:

  生产者可以持续向队列插入新的元素,直到队列满为止,队列满后生产者线程被阻塞;

  消费者可以持续从队列取出元素,直到队列空为止,队列空后消费者线程被阻塞。

  BlockingQueue是单向的阻塞队列,生产者只能在尾部放入新元素,消费者只能在头部获取已有元素。Java提供了多种类型的BlockingQueue实现类:

  ArrayBlockingQueue:基于数组实现的有界阻塞队列,创建后不能修改队列的大小;

  LinkedBlockingQueue:基于链表实现的有界阻塞队列,默认大小为Integer.MAX_VALUE,有较好的吞吐量,但可预测性差。

  PriorityBlockingQueue:具有优先级的无界阻塞队列,不允许插入null,所有元素都必须可比较(Comparable接口)。

  SynchronousQueue:只有一个元素的同步队列。若队列中有元素插入操作将被阻塞,直到队列中的元素被其他线程取走。

  DelayQueue:无界阻塞队列,每个元素都有一个延迟时间,在延迟时间之后才释放元素。

  BlockingDueue是双向的阻塞队列,头部/尾部都可以放入新元素,也都可以获取已有元素。Dueue是“DoubleEndedQueue”的缩写。调用带有first/last后缀的put/take方法即可实现头部/尾部的插入/取值阻塞操作,同时带有后缀的offer/poll方法可以指定超时时间。Java只提供了一种类型的BlockingDueue实现类:

  LinkedBlockingDeque:基于双向链表实现的有界阻塞队列,默认大小为Integer.MAX_VALUE,有较好的吞吐量,但可预测性差。

  TransferQueue被定义为转移队列,继承了BlockingQueue接口,也是单向的阻塞队列。扩展的转移功能是指通过transfer可以转移一个新元素给到消费者,如果消费者恰好在等待就立即转给消费者,否则阻塞生产者直到该元素被取走。Java提供了一个基于链表的实现类LinkedTransferQueue。

  二、并发容器

  工具包提供了队列的并发实现类ConcurrentLinkedQueue和ConcurrentLinkedDeque,两者都是无界非阻塞线程安全的队列。

  ConcurrentMap接口继承了普通的Map接口,提供了线程安全和原子操作特性。Java8提供了实现类ConcurrentHashMap,ConcurrentHashMap不锁定整个Map,只锁定需要写入的部分,因此并发性能比HashTable要高很多。

  ConcurrentNavigableMap接口继承了ConcurrentMap和NavigableMap接口,支持并发访问NavigableMap,还能让子Map具备并发访问的能力。NavigableMap是扩展的SortedMap,具有了针对给定搜索目标返回最接近匹配项的导航方法。

  Java提供了实现类ConcurrentSkipListMap,并没有使用lock来保证线程的并发访问和修改,而是使用了非阻塞算法来保证并发访问,高并发时相对于TreeMap有明显的优势。

  工具包提供了NavigableSet的并发实现类ConcurrentSkipListSet,是线程安全的有序集合,适用于高并发的场景,通过ConcurrentSkipListMap实现。

  工具包提供了两个写时复制容器,即CopyOnWriteArrayList和CopyOnWriteArraySet。写时复制技术是一种优化策略,多个线程可以并发访问同一份数据,当有线程要修改时才进行复制然后修改。在Linux系统中,fork进程后,子进程先与父进程共享数据,需要修改时才用写时复制得到自己的副本。在Java中,写时复制容器在修改数据后,把原来容器的引用指向新容器,来实现读写分离,在并发读写中不需要加锁。写时复制容器适用于读多写少的场景,在复制时会占用较多内存,能够保证最终一致性,但无法保证瞬时一致性。

  三、线程池

  工具包中Executor接口定义了执行器的基本功能,即execute方法,接收Runnable对象参数并执行Runnable中的操作。ExecutorService接口继承Executor接口后增加了关于执行器服务的定义,如关闭、立即关闭、检查关闭、等待终止、提交有返回值的任务、批量提交任务等。通过Executors的工厂方法获取ExecutorService的具体实现,目前Executors可以返回的实现类型如下:

  FixedThreadPool:固定大小的线程池,创建时指定大小;

  WorkStealingPool:拥有多个任务队列(以便减少连接数)的线程池;

  SingleThreadExecutor:单线程执行器,顾名思义只有一个线程执行任务;

  CachedThreadPool:根据需要创建线程,可以重复利用已存在的线程来执行任务;

  SingleThreadScheduledExecutor:根据时间计划延迟创建单个工作线程或者周期性创建的单线程执行器;

  ScheduledThreadPool:能够延后执行任务,或者按照固定的周期执行任务。

  如果希望在任务执行完成后得到任务的返回值,可以调用submit方法传入Callable任务,并通过返回的Future对象查看任务执行是否完成,并获取返回值。

  四、任务分解与合并

  ForkJoinPool可以对任务进行递归分解与合并,从而充分发挥多线程的潜力。其中:

  分解任务,就是把大任务拆分成小任务,小任务再给到ForkJoinPool再次分解,直到不能分解;

  合并任务,则是小任务执行完成后,把结果给到ForkJoinPool再次合并,直到得到最终结果。

  把任务分割成子任务有一定开销,只有当给的任务过大,把它分割成几个子任务才有意义。

  五、并发锁

  Java提供了读写锁和可重入锁。使用锁实现的同步机制很像synchronized块,但是比synchronized块更灵活。锁和synchronized的主要区别在于:

  Synchronized块不能保证等待进入块的线程的访问顺序;

  Synchronized块无法接收参数,不能在有超时时间限制的情况下尝试访问;

  Synchronized块必须包含在单个方法中,而锁的lock和unlock操作可以在单独的方法中。

  ReadWriteLock读写锁接口,允许多个线程读取某个资源,但是一次只能有一个线程进行写操作。内部有读锁、写锁两个接口,分别保护读操作和写操作。实现类为ReentrantReadWriteLock。

  ReentrantLock可重入锁,具有与使用synchronized方法和语句所访问的隐式监视器锁定相同的一些基本行为和语义,但功能更强大。ReentrantLock将由最近成功获得锁,并且还没有释放该锁的线程所拥有。当锁没有被另一个线程所拥有时,调用lock的线程将成功获取该锁并返回。如果当前线程已经拥有该锁,此方法将立即返回。内部有一个计数器,拥有锁的线程每锁定一次,计数器加1,每释放一次计数器减1。

  六、原子类型

  工具包提供了一些可以用原子方式进行读写的变量类型,支持无锁线程安全的单变量编程。本质上,这些类都扩展了volatile的概念,使用一个volatile类型的变量来存储实际数据。工具包提供了4种类型的原子变量类型:

  AtomicBoolean:可原子操作的布尔对象;

  AtomicInteger:可原子操作的整形对象;

  AtomicLong:可原子操作的长整形对象;

  AtomicReference:可原子操作的对象引用。

  在此基础上,工具包还提供了原子性的数组类型,包括AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray。

  原子变量可以实现线程安全的i++、i--、++i、--i等类型的操作。

  七、多线程同步

  Java提供了多个线程之间同步的工具,包括CountDownLatch、CyclicBarrier、Exchanger、Semaphore。

  CountDownLatch用于一个或者多个线程等待一系列指定操作的完成。初始化时,给定一个数量,每调用一次countDown()方法数量减一。其他线程调用await方法等待时,线程会阻塞到数量减到0才开始执行。

  CyclicBarrier是一种同步机制,它能够对处理一些算法的线程实现同步。换句话讲,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这里,然后所有线程才可以继续做其他事情。可以设置多个栅栏。实现多线程多次同步等待。

  Exchanger表示一种会合点,两个线程可以在这里交换对象。两个线程各自调用exchange方法进行交换,当线程A调用exchange()方法后,它会陷入阻塞状态,直到线程B也调用了exchange()方法,然后以线程安全的方式交换数据,之后线程A和B继续运行。

  Semaphore是一种信号量,可以控制某个资源可被同时访问的个数。通过acquire()获取一个许可,如果没有就等待,调用release()释放一个许可。

  免责声明:文章来源于程序之心,作者丁仪。若涉及侵权联系尽快删除!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

达内教育:成立于2002年。致力于面向IT互联网行业,培养软件开发工程师、测试工程师、系统管理员、智能硬件工程师、UI设计师、网络营销、会计等职场人才 达内使命:缔造年轻人的中国梦、缔造达内员工的中国梦 达内愿景:做管理一流的教育公司
  • 商务合作

  • 微信公众号

  • Powered by Discuz! X3.4 | Copyright © 2002-2021, 达内教育 Tedu.cn
  • 京ICP备08000853号-56 |网站地图 | 京公网安备 11010802029508号