likes
comments
collection
share

Java基础之并发上篇

作者站长头像
站长
· 阅读数 19

序言

在当前的计算机领域,高效的并发编程对于Java开发人员而言变得越发重要。作为流行的编程语言,Java提供了强大的并发编程支持,使开发人员能够充分发挥多核处理器和线程的潜力,构建高性能、高吞吐量的应用程序。接下来我们一起探讨Java并发编程

关于并发绕不开的两个关键字‘进程’,‘线程’

进程 是程序的基本执行实体,是线程的容器。例如Java程序启动执行之后变成了进程。

线程 是操作系统能够进行运算调度的最小单位。它被包含在进程之中一个进程中可以并发多个线程,每条线程并行执行不同的任务,线程共享进程中的系统资源。

并发 是多个程序在一段重叠的时间段中开始、运行与结束,但这些程序并没有在任何一个时刻同时在执行。

并行 是意味着在同一个时刻,存在两个以上任务在同时运行。补充:并行是要求更严格的并发,同时性要求更高。

探索进程和线程的模型帮助加深理解

进程是线程的容器,设计的初衷为了解决资源分配的问题。 Java基础之并发上篇 在创建一个进程时会有一个主线程同时存在。

目前的操作系统是把计算资源(CPU)分配给了线程。进程是不会直接对接计算资源,进程对接的是内存、文件、用户权限、操作系统的命名空间。

了解内核线程和用户线程

到这里进程和线程介绍完了,开始深处的挖掘。

Java基础之并发上篇

内核空间 进程和硬件的沟通桥梁,存在应用和硬件中间。内核的优先级和权限非常高,是能看到所有的内存,必须给它一个单独的空间。 应用空间 Java程序是应用进程,运行在用户空间。 内核级线程和用户级线程 内核级线程由内核调度,用户级线程由应用自己调度。思考一个问题Java线程线程模型,是由内核线程调用还是用户级线程调用。

Java基础之并发上篇

Java老版本,Java程序运行在虚拟机上,进程创建时会创建一条主线程,主线程是由内控线程调度,Java的主线程是内核级,其它的线程是用户线程,操作系统是不会调用用户级线程,操作系统把CPU的执行权限给了主线程用户线程共享主线程的时间。

Java基础之并发上篇

现在的版本Java创建任何线程,都是由内核调度实现了并发并行。由M个内核线程去响应N个用户级线程执行。

线程常见状态

Java基础之并发上篇
  1. 初始(NEW) :新创建了一个线程对象,但还没有调用start()方法。
  2. 运行(RUNNABLE) :Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。 线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取CPU的使用权,此时处于就绪状态(ready)。就绪状态的线程在获得CPU时间片后变为运行中状态(running)。
  3. 阻塞(BLOCKED) :表示线程阻塞于锁。
  4. 等待(WAITING) :进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断)。
  5. 超时等待(TIMED_WAITING) :该状态不同于WAITING,它可以在指定的时间后自行返回。
  6. 终止(TERMINATED) :表示该线程已经执行完毕。

注释:

Thread.join 线程进入WAITING状态。

Thread.sleep 线程进入TIMED_WAITING状态。

网络请求线程进入BLOCKED状态。

线程切换CPU如何操作

Context Switch(上下文切换)切换是CPU的上下文。CPU的上下文就是寄存器和程序计数器

Java基础之并发上篇 线程A调用Thread.join、Thread.sleep或者网络请求造成线程A中断,操作系统保存当前线程寄存器。OS调用线程B,OS恢复B寄存器

小结:

对操作系统而言JVM是一种应用,对Java程序来说是真实的机器。JVM作为进程向操作系统申请内存资源文件资源,计算资源分配给了线程。

原子操作

操作不可分,这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch(上下文切换)原子操作很好理解 思考i++是不是原子操作?

而i++不是原子操作而是3个原子操作组合而成的,读取i的值、计算i+1、写入新的值。 既然是组合原子操作会面临以下问题竞争条件,也叫做竞争灾难

竞争条件

造成竞争灾难的原因多个线程并发的执行了非原子操作的操作。举例两个线程执行了 i++ 竞争灾难的结果是不确定的。 竞争条件一般发生在临界区发生问共享资源)

Java基础之并发上篇

1.减少竞争

问题我们清楚了该如何解决,首先我们要做的是减少竞争合理分化线程任务代码示例。

package com.summer;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;

public class VocabularyCounter {
    public static void main(String[] args) {
        String[] bookPaths = {
                "book1.txt",
                "book2.txt",
                // ... (add paths to 300 books)
        };

        int numThreads = Math.min(Runtime.getRuntime().availableProcessors(), bookPaths.length);
        ExecutorService executor = Executors.newFixedThreadPool(numThreads);
        CompletionService<Map<String, Integer>> completionService = new ExecutorCompletionService<>(executor);

        for (String bookPath : bookPaths) {
            completionService.submit(new WordCountTask(bookPath));
        }

        Map<String, Integer> totalWordCount = new HashMap<>();

        for (int i = 0; i < bookPaths.length; i++) {
            try {
                Future<Map<String, Integer>> future = completionService.take();
                Map<String, Integer> wordCount = future.get();

                // 将本书的字数合并到总字数中
                synchronized (totalWordCount) {
                    for (Map.Entry<String, Integer> entry : wordCount.entrySet()) {
                        totalWordCount.merge(entry.getKey(), entry.getValue(), Integer::sum);
                    }
                }
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }

        executor.shutdown();

        // 打印或处理 totalWordCount 映射
    }
}

class WordCountTask implements Callable<Map<String, Integer>> {
    private String bookPath;

    public WordCountTask(String bookPath) {
        this.bookPath = bookPath;
    }

    @Override
    public Map<String, Integer> call() throws Exception {
        Map<String, Integer> wordCount = new HashMap<>();

        try (BufferedReader reader = new BufferedReader(new FileReader(bookPath))) {
            String line;
            while ((line = reader.readLine()) != null) {
                String[] words = line.split("\s+");
                for (String word : words) {
                    word = word.toLowerCase().replaceAll("[^a-zA-Z]", "");
                    if (!word.isEmpty()) {
                        wordCount.merge(word, 1, Integer::sum);
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

        return wordCount;
    }
}

2.实现原子操作CAS

CAS(Compare-And-Swap)是CPU底层支持的指令。作用设置一个地址值类似变量赋值,有意思的是如果你想给这个变量赋值,需要知道它原有值的内容。

Java基础之并发上篇 分别买两个物品第一个是1000 ,第二个是2000此时小明的余额是多少,如果发生竞争条件小明余额会是8000或者9000。

避免这类事情发生 CAS 是怎么操作的代码示例。

package com.summer;

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicCASExample {
    public static void main(String[] args) {
        AtomicInteger money = new AtomicInteger(10000);
        Thread thread1 = new Thread(() -> {
            int price = 1000;
            int currentMoney;
            do {
                currentMoney = money.get();
            } while (!money.compareAndSet(currentMoney, currentMoney - price));
            System.out.println("Thread 1: Bought item worth 1000, remaining money: " + money.get());
        });

        Thread thread2 = new Thread(() -> {
            int price = 2000;
            int currentMoney;
            do {
                currentMoney = money.get();
            } while (!money.compareAndSet(currentMoney, currentMoney - price));
            System.out.println("Thread 2: Bought item worth 2000, remaining money: " + money.get());
        });

        thread1.start();
        thread2.start();

        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Final money: " + money.get());
    }
}

AtomicIntegercompareAndSet方法的原理。当compareAndSet方法被调用时,它首先检查共享变量的值是否等于期望值。如果相等,就将共享变量的值更新为新的值,并返回true表示操作成功。如果不相等,表示其他线程已经修改了共享变量的值,此时返回false表示操作失败。

实际的CAS操作是基于处理器指令的原子性特性来实现的,确保在单个指令周期内执行读取、比较和更新操作,避免了竞态条件。这使得CAS成为一种高效的无锁同步操作,适用于并发编程中保证变量更新的原子性。

3.TAS 互斥锁

TAS(Test-And-Set)是一种基本的原子操作,通常用于实现互斥锁(mutex lock)的机制,用于在多线程环境中保护临界区的访问。TAS操作通过检查并设置一个特定的标志位来实现,以确保只有一个线程可以进入临界区。

boolean flag = false;
while (true) {
    if (!flag) {
        flag = true;
        break; // 进入临界区
    }
}

小结

CAS 解决了部分问题,解决了竞争条件,并没有解决两个线程同时i++这类问题。当两个线程读取到i的值是100时,是无法做到线程1,i++结果是101线程线程2i++结果是102,线程2想要执行成功必须在读取i++之前的值是101。注意CAS还存在一个ABA的问题。

前面接触CAS、TAS接下来让我们一起JAVA锁的世界

同步器

同步分执行同步和数据同步,JAVA中并发控制就是执行同步,缓存和存储的同步就是数据同步的一种了。

Java架构下的同步器

Java基础之并发上篇

Synchronized关键字是依赖于C和C++写的Monitor,而其它全部依赖于ASQ。Monitor存在于JVM层。

Synchronized的缺陷:

  1. 不够灵活: synchronized关键字是内置的,因此它的灵活性有限。你只能使用它来实现基本的同步需求,无法在更高级别上进行自定义操作。
  2. 隐式锁: synchronized使用隐式锁,这意味着锁的获取和释放都是由JVM自动管理的,无法手动控制,可能导致一些不可控的情况。
  3. 性能问题: synchronized在某些情况下可能会引起性能问题。当多个线程竞争同一个锁时,会导致其他线程被阻塞,从而降低并发性能。

ReentrantLock的区别:

  1. 显示锁: ReentrantLock是显式锁,你可以手动控制锁的获取和释放,从而更加灵活地实现你的同步需求。
  2. 公平锁和非公平锁: ReentrantLock可以配置为公平锁或非公平锁。公平锁按照线程请求锁的顺序获得锁,而非公平锁不保证按照顺序。
  3. 可中断锁: ReentrantLock允许使用lockInterruptibly()方法来实现可中断的锁获取,这意味着当一个线程在等待锁的时候,可以响应中断。
  4. 超时锁: ReentrantLock允许使用tryLock(long time, TimeUnit unit)方法来实现超时锁获取,可以在一段时间内尝试获取锁,如果超过时间则放弃。
  5. 更好的性能: 在高度竞争的场景下,ReentrantLock的性能可能会优于synchronized,因为它提供了更多的灵活性和更少的上下文切换。

综上所述,ReentrantLock相对于synchronized提供了更多的灵活性和控制,但使用它需要更多的编程工作,并且需要小心处理锁的获取和释放,以避免死锁等问题。在选择使用哪种机制时,你应该根据具体的情况来决定。

总语

从进程出发介绍线程和进程关系,内核空间与用户空间M对N的线程关系到CPU上下文切换。
通过举例i++了解CAS乐观锁、TAS互斥锁,最后到同步器。并发的基础内容就到这里了,希望我的文章对你有所帮助。
想了解更多相关的内容后续会发出,这里没有更深度的挖掘。