likes
comments
collection
share

线程同步之互斥锁、读写锁、信号量、条件变量、事件

作者站长头像
站长
· 阅读数 4

线程同步

线程同步是多线程编程中的一个概念,它涉及到多个线程访问共享资源时协调它们的执行以保持数据的一致性和防止竞态条件(race condition)。简单来说,线程同步是确保两个或多个并发执行的线程不会同时操作相同的数据或资源的一种机制。

在没有适当的线程同步机制的情况下,如果多个线程同时读写同一个变量或数据结构,可能会导致数据损坏或程序行为的不可预测。为了避免这类问题,需要使用不同的同步工具和技术来确保线程之间的合作和协调。

线程同步的一些常见机制包括:

  1. 互斥锁(Mutex):互斥锁是一种最基本的线程同步机制,它可以确保在任何时刻只有一个线程能够访问被保护的代码区域或资源。当一个线程获得互斥锁时,其它尝试访问资源的线程将被阻塞,直到锁被释放。
  2. 读写锁(Read-Write Lock):读写锁是一种特殊类型的锁,允许多个线程同时读共享资源,但写操作是互斥的。这意味着当没有线程写资源时,多个线程可以同时读,但写操作需要独占访问。
  3. 信号量(Semaphore):信号量是一个计数器,用于限制对资源的访问。线程在访问资源之前必须获取信号量,如果信号量的值大于0,线程可以继续执行并将信号量减一;如果信号量为0,线程进入等待状态。释放资源时,信号量值会增加。
  4. 条件变量(Condition Variable):条件变量用于在某个条件下阻塞线程,直到另一个线程改变这个条件并通知条件变量。这允许线程在特定条件下暂停执行,并在条件得到满足时恢复。
  5. 事件(Event):事件是线程之间通信的一种机制,通常用于一个线程向其他线程发出信号,比如任务完成或状态改变。

线程同步是一个复杂且容易出错的区域,因此在设计并发程序时。开发者需要仔细考虑如何同步线程,以确保程序的正确性和性能。过度同步可能会导致并发性能下降,而不足同步则可能会导致数据不一致。

竞态条件

竞态条件(Race Condition)是并发编程中的一个术语,描述了一个系统或者进程的输出依赖于不受控的事件序列或者时序的情况。简单来说,当多个线程或进程访问和改变共享数据,并且最终的结果取决于它们的执行顺序时,就可能发生竞态条件。 在没有适当的同步措施的情况下,如果两个或多个并发执行的进程或线程访问相同的资源,并试图同时对其进行读写操作,它们的操作可能会交叉在一起,导致数据损坏或不一致的结果。举一个简单的例子来解释竞态条件:

假设有两个线程A和B,它们都要读取同一个变量V的值,对其加1后再写回去。如果变量V的初始值是10,我们期望的结果是两个线程执行完毕后V的值变成12。但是,如果没有适当的同步措施,可能会出现以下情况:

  1. 线程A读取变量V的值,得到10。
  2. 线程B也读取变量V的值,同样得到10。
  3. 线程A将其读取的值加1,得到11,并将这个值写回变量V。
  4. 线程B也将其读取的值加1,得到11,并将这个值写回变量V。

在上述情况中,两个线程都完成了操作,但是变量V的最终值是11而不是12,因为线程B的操作覆盖了线程A的操作。这就是一个经典的竞态条件。

为了避免竞态条件,可以采取多种同步技术,例如使用互斥锁(mutex)、信号量(semaphores)、读写锁(read-write locks)等机制来协调线程或进程的访问,确保在任意时刻只有一个线程能够修改共享资源。

互斥锁(Mutex)

互斥锁(Mutex)是一种线程同步机制,用来确保在任一时刻只有一个线程可以访问临界区。临界区是指能够修改共享资源的那段代码。通过互斥锁可以避免竞态条件的发生。

在Java中,互斥锁的概念可以通过synchronized关键字或者显式的Lock对象来实现。下面我将用两种方式展示如何在Java中使用互斥锁来同步线程。

使用synchronized关键字

Java中的synchronized关键字可以用来标记方法或者代码块。当一个线程访问一个对象的synchronized方法或代码块时,它将获得该对象的锁,其他线程将无法同时进入这个对象的所有synchronized块。

public class Counter {
    private int count = 0;

    // 使用synchronized关键字标记方法
    public synchronized void increment() {
        count++;
    }

    public synchronized int getCount() {
        return count;
    }
}

public class SynchronizedExample {
    public static void main(String[] args) throws InterruptedException {
        Counter counter = new Counter();

        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                counter.increment();
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                counter.increment();
            }
        });

        t1.start();
        t2.start();

        // 等待两个线程执行完成
        t1.join();
        t2.join();

        // 输出结果
        System.out.println("Count: " + counter.getCount());
    }
}

在上面的代码中,我们定义了一个Counter类,它有一个方法increment用于递增内部的count变量。这个方法被标记为synchronized,所以当线程进入这个方法时,其他线程必须等待直到该方法完成并释放对象锁。

main方法中,我们创建了两个线程,它们都对同一个Counter对象调用increment方法1000次。由于increment方法是synchronized的,所以即使两个线程同时尝试访问,每次也只有一个线程能够进入该方法。因此,count的最终结果将正确地被更新到2000。

使用Lock接口

Java的java.util.concurrent.locks.Lock接口提供了更复杂的锁操作。它允许更灵活的结构,可以尝试非阻塞地获得锁,尝试获得锁并可中断,或者在获得锁时等待超时。

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class CounterWithLock {
    private final Lock lock = new ReentrantLock();
    private int count = 0;

    public void increment() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }

    public int getCount() {
        return count;
    }
}

public class LockExample {
    public static void main(String[] args) throws InterruptedException {
        CounterWithLock counter = new CounterWithLock();

        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                counter.increment();
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                counter.increment();
            }
        });

        t1.start();
        t2.start();

        // 等待两个线程执行完成
        t1.join();
        t2.join();

        // 输出结果
        System.out.println("Count: " + counter.getCount());
    }
}

在上述代码中,我们使用了ReentrantLock类来实现Lock接口。与synchronized不同,lockunlock操作是显式的,并且为了确保总是能释放锁,unlock方法被放置在try块之后的finally块中。这样可以防止因异常而导致的死锁情况。

main方法的其余部分,代码逻辑与使用synchronized关键字的例子非常相似。我们创建了两个线程对共享的CounterWithLock对象执行递增操作,并确保最终结果是正确的。

这两种方式都能够确保在多线程环境下对共享资源的访问是安全的,但在某些场合下Lock接口提供的灵活性可能更受欢迎。例如,当你需要尝试锁定,或者在等待锁时对中断做出响应时,就可以使用ReentrantLock

读写锁(Read-Write Lock)

读写锁(Read-Write Lock)是一种特殊的锁,用于优化对共享资源的访问。与互斥锁相比,读写锁允许多个线程同时读取资源,但写操作必须是独占的。这意味着如果没有线程正在写入资源,多个线程可以同时读取资源;如果一个线程正在写入资源,那么其他所有读写操作都必须等待。

在Java中,读写锁由java.util.concurrent.locks.ReadWriteLock接口表示,它有两个主要的实现类:ReentrantReadWriteLock。使用读写锁时,你会获得两个锁:一个读锁(共享锁)和一个写锁(排他锁)。

下面是使用ReentrantReadWriteLock的Java代码示例:

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class SharedResource {
    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private final Object resource = new Object(); // 假设这是需要同步访问的共享资源

    // 读取资源的方法
    public void readResource() {
        readWriteLock.readLock().lock(); // 获取读锁
        try {
            // 执行读取操作
            // 例如: System.out.println(resource);
        } finally {
            readWriteLock.readLock().unlock(); // 释放读锁
        }
    }

    // 写入资源的方法
    public void writeResource(Object newValue) {
        readWriteLock.writeLock().lock(); // 获取写锁
        try {
            // 执行写入操作
            // 例如: resource = newValue;
        } finally {
            readWriteLock.writeLock().unlock(); // 释放写锁
        }
    }
}

public class ReadWriteLockExample {
    public static void main(String[] args) {
        SharedResource sharedResource = new SharedResource();

        // 创建读取线程
        Thread readerThread = new Thread(() -> {
            while (true) { // 一个简单的无限循环,表示不断读取操作
                sharedResource.readResource();
            }
        });

        // 创建写入线程
        Thread writerThread = new Thread(() -> {
            while (true) { // 一个简单的无限循环,表示不断写入操作
                sharedResource.writeResource(new Object());
            }
        });

        readerThread.start();
        writerThread.start();
    }
}

在这个示例中,SharedResource类封装了一个假想的共享资源,提供了readResourcewriteResource方法来读取和写入资源。为了同步访问,我们使用ReentrantReadWriteLock实例来创建读锁和写锁。

当一个线程想要读取资源时,它通过readResource方法获取读锁,执行读取操作,然后释放读锁。如果有多个线程试图获取读锁,它们都可以同时获取,因为读操作是共享的。

相反,当一个线程想要写入资源时,它通过writeResource方法获取写锁,执行写入操作,然后释放写锁。当写锁被持有时,其他尝试读取或写入的线程都将被阻塞,直到写锁被释放。

main方法中,我们创建了一个读取线程和一个写入线程分别执行读取和写入操作。由于使用了读写锁,这个程序能够允许多个读取线程并行执行,但只有在没有写入线程活动的情况下。

读写锁对于那些读操作远多于写操作的场景特别有用,因为它可以显著提高并发性能。然而,需要注意的是,如果写操作频繁,或者读操作很耗时,则读写锁可能导致写饥饿(写线程长时间等待)的问题。

信号量(Semaphore)

信号量(Semaphore)是一种同步工具,用于控制对共享资源的访问。它维护了一组许可(permits),线程在访问共享资源之前必须先获得许可,如果许可不可用,线程将被阻塞直到许可变为可用。信号量可以用于实现资源池或给定数量的资源槽。

在Java中,信号量通过java.util.concurrent.Semaphore类实现。这个类提供了 acquire()release() 方法来获取和释放许可。

下面是使用Semaphore的Java代码示例:

import java.util.concurrent.Semaphore;

public class SemaphoreExample {

    // 假设我们有一个打印机资源池,只有两台打印机可用
    public static class PrinterPool {
        private final Semaphore semaphore;

        public PrinterPool(int printerCount) {
            // 创建一个信号量,并设置许可数量等于可用打印机数量
            semaphore = new Semaphore(printerCount);
        }

        public void printJob(Object document) {
            try {
                // 从信号量获取一个许可。如果没有许可可用,这个方法将阻塞直到有许可
                semaphore.acquire();

                // 模拟打印工作
                System.out.println(Thread.currentThread().getName() + ": 打印工作正在进行");
                Thread.sleep(1000); // 假设打印任务需要1秒钟

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 打印完成后,释放许可,允许其他线程打印
                semaphore.release();
                System.out.println(Thread.currentThread().getName() + ": 打印工作完成");
            }
        }
    }

    public static void main(String[] args) {
        // 创建一个有两台打印机的打印机池
        final PrinterPool printerPool = new PrinterPool(2);

        // 创建10个打印任务,模拟打印
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(() -> printerPool.printJob(new Object()));
            thread.start();
        }
    }
}

在这个示例中,我们定义了一个PrinterPool类,表示了一个打印机资源池。我们使用了一个Semaphore来限制对打印机资源的访问。PrinterPool构造函数接受一个printerCount参数,这是打印机的数量,它被用来初始化Semaphore的许可数量。

printJob方法尝试从信号量获取一个许可。如果没有许可可用,调用acquire()方法的线程将阻塞,直到其他线程释放许可。一旦获取了许可,该线程就可以执行打印任务。打印任务完成后,线程必须通过调用release()方法释放许可,这样其他线程才能获取许可来执行它们的打印任务。

main方法中,我们创建了一个拥有两台打印机的PrinterPool对象。然后,我们启动了10个线程来模拟打印任务。由于打印机池只提供了两个许可,任何时候最多只有两个打印任务可以同时进行。其他的任务将会等待,直到其中一个正在进行的打印任务完成,释放出许可。

通过这种方式,信号量保证了不会有超过两个线程同时进入打印任务的临界区,这模拟了一个真实的资源池,其中的资源数是有限的。

条件变量(Condition Variable)

条件变量是一种线程同步机制,允许线程在某个条件不满足时挂起(等待),直到另一个线程修改了这个条件并通知条件变量后才被唤醒。条件变量通常与一个互斥锁结合使用,以确保对共享资源的一致性访问。

在Java中,条件变量可以通过Object类的wait(), notify(), 和 notifyAll()方法来实现,或者使用java.util.concurrent.locks.Condition接口与ReentrantLock一起使用。

使用Object的内置监视器方法

public class ProducerConsumerExample {
    private static final Object lock = new Object();
    private static int[] buffer;
    private static int count;

    static {
        // 初始化缓冲区
        buffer = new int[10];
        count = 0;
    }

    // 生产方法
    static void produce() throws InterruptedException {
        synchronized (lock) {
            while (isFull(buffer)) {
                // 缓冲区已满,等待消费者消费
                lock.wait();
            }
            buffer[count++] = 1; // 生产一个单位的数据
            lock.notify(); // 通知正在等待的消费者
        }
    }

    // 消费方法
    static void consume() throws InterruptedException {
        synchronized (lock) {
            while (isEmpty(buffer)) {
                // 缓冲区为空,等待生产者生产
                lock.wait();
            }
            buffer[--count] = 0; // 消费一个单位的数据
            lock.notify(); // 通知正在等待的生产者
        }
    }

    // 检查缓冲区是否已满
    static boolean isFull(int[] buffer) {
        return count == buffer.length;
    }

    // 检查缓冲区是否为空
    static boolean isEmpty(int[] buffer) {
        return count == 0;
    }

    public static void main(String[] args) throws InterruptedException {
        // 创建生产者和消费者线程
        Thread producerThread = new Thread(() -> {
            try {
                produce();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        Thread consumerThread = new Thread(() -> {
            try {
                consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producerThread.start();
        consumerThread.start();

        producerThread.join();
        consumerThread.join();
    }
}

在上面的代码中,我们实现了一个生产者-消费者问题的解决方案。produce方法和consume方法分别对应生产数据和消费数据的逻辑。生产者在生产数据之前会检查缓冲区是否已满,如果是,则调用lock.wait()陷入等待,直到消费者消费数据后使用lock.notify()唤醒它。类似地,消费者在消费数据之前会检查缓冲区是否为空,如果是,则陷入等待,直到生产者生产数据后被唤醒。

这里的lock对象是用来同步对缓冲区的访问的。当调用lock.wait()时,线程会释放掉它持有的lock对象上的锁,并进入等待状态。当另一个线程调用lock.notify()lock.notifyAll()时,一个或所有等待的线程将被唤醒,并且试图重新获得lock对象上的锁。

使用ReentrantLockCondition

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumerWithLock {
    private static final Lock lock = new ReentrantLock();
    private static final Condition notFull = lock.newCondition();
    private static final Condition notEmpty = lock.newCondition();
    private static int[] buffer;
    private static int count;

    static {
        // 初始化缓冲区
        buffer = new int[10];
        count = 0;
    }

    // 生产方法
    static void produce() throws InterruptedException {
        lock.lock();
        try {
            while (isFull(buffer)) {
                // 缓冲区已满,等待直到不满
                notFull.await();
            }
            buffer[count++] = 1;
            notEmpty.signal(); // 通知notEmpty条件的消费者线程可以继续执行
        } finally {
            lock.unlock();
        }
    }

    // 消费方法
    static void consume() throws InterruptedException {
        lock.lock();
        try {
            while (isEmpty(buffer)) {
                // 缓冲区为空,等待直到不为空
                notEmpty.await();
            }
            buffer[--count] = 0;
            notFull.signal(); // 通知notFull条件的生产者线程可以继续执行
        } finally {
            lock.unlock();
        }
    }

    // 检查缓冲区是否已满
    static boolean isFull(int[] buffer) {
        return count == buffer.length;
    }

    // 检查缓冲区是否为空
    static boolean isEmpty(int[] buffer) {
        return count == 0;
    }

    public static void main(String[] args) throws InterruptedException {
        // 创建生产者和消费者线程
        Thread producerThread = new Thread(() -> {
            try {
                produce();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        Thread consumerThread = new Thread(() -> {
            try {
                consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producerThread.start();
        consumerThread.start();

        producerThread.join();
        consumerThread.join();
    }
}

以上代码与之前的Object监视器方法类似,不过这里我们使用了ReentrantLock和它提供的Condition接口。notFullnotEmpty是两个条件,分别代表缓冲区不满和不为空的状态。生产者在缓冲区满时等待notFull条件,消费者在缓冲区空时等待notEmpty条件。

使用await()方法使线程等待特定条件的满足。当条件得到满足时,可以通过signal()signalAll()方法唤醒一个或所有等待该条件的线程。这种方式提供了与内置监视器方法类似的功能,但提供了更高的灵活性和更好的性能。

事件(Event)

在并发编程中,事件(Event)通常指一种机制,它允许一个线程通知一个或多个其他线程有关某些重要状态变更的发生。当事件发生时,等待该事件的线程将被唤醒并继续执行。

Java中并没有直接名为"Event"的类,但是可以通过各种同步机制来实现事件通知的功能,例如通过Object类的wait()notify() 方法,以及java.util.concurrent包中的CountDownLatchCyclicBarrierFuture 等。

使用 Object 类的 wait()notify() 方法

public class EventExample {
    private final Object lock = new Object();
    private boolean eventOccurred = false;

    public void eventOccurred() {
        synchronized (lock) {
            eventOccurred = true;
            lock.notifyAll(); // 通知所有等待事件的线程
        }
    }

    public void waitForEvent() throws InterruptedException {
        synchronized (lock) {
            while (!eventOccurred) { // 循环检查以处理虚假唤醒
                lock.wait(); // 等待事件发生
            }
        }
        // 处理事件
        System.out.println(Thread.currentThread().getName() + " received the event notification.");
    }

    public static void main(String[] args) {
        EventExample eventExample = new EventExample();
        
        // 创建线程等待事件
        Thread waitingThread = new Thread(() -> {
            try {
                eventExample.waitForEvent();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        waitingThread.start();

        // 模拟在另一个线程中事件发生
        Thread triggeringThread = new Thread(() -> {
            try {
                Thread.sleep(2000); // 假设这里我们等待2秒钟模拟一些操作
                eventExample.eventOccurred(); // 事件发生,通知等待线程
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        triggeringThread.start();
    }
}

在这个例子中,EventExample 类中有一个名为 waitForEvent 的方法,它调用 lock.wait() 方法来等待事件的发生。当事件发生时,eventOccurred 方法会设置 eventOccurred 标志为 true 并调用 lock.notifyAll() 来通知所有等待的线程。

使用 CountDownLatch

CountDownLatch 是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

import java.util.concurrent.CountDownLatch;

public class EventWithCountDownLatch {
    private final CountDownLatch latch = new CountDownLatch(1);

    public void eventOccurred() {
        latch.countDown(); // 触发事件,倒数减一
    }

    public void waitForEvent() throws InterruptedException {
        latch.await(); // 等待事件发生
        // 处理事件
        System.out.println(Thread.currentThread().getName() + " received the event notification.");
    }

    public static void main(String[] args) {
        EventWithCountDownLatch eventExample = new EventWithCountDownLatch();
        
        // 创建线程等待事件
        Thread waitingThread = new Thread(() -> {
            try {
                eventExample.waitForEvent();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        waitingThread.start();

        // 模拟在另一个线程中事件发生
        Thread triggeringThread = new Thread(() -> {
            try {
                Thread.sleep(2000); // 假设这里我们等待2秒钟模拟一些操作
                eventExample.eventOccurred(); // 事件发生,通知等待线程
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        triggeringThread.start();
    }
}

在这个例子中,EventWithCountDownLatch 类使用一个 CountDownLatch,其计数器初始化为 1。waitForEvent 方法通过调用 latch.await() 来等待事件。当事件发生时,eventOccurred 方法通过调用 latch.countDown() 来减少计数器,一旦计数器达到零,等待的线程将被释放并继续执行。

CountDownLatch 是一种简单而强大的同步工具,非常适用于这种一次性事件通知的场景。不过需要注意,CountDownLatch 的计数器不能被重置,一旦计数器到达零,它就不能再次被使用了。如果需要能够重置计数器的版本,可以考虑使用 CyclicBarrier