likes
comments
collection
share

JAVA并发课题

作者站长头像
站长
· 阅读数 69

守护线程

public class DaemonThreadTest {
    public static void main(String[] args) throws InterruptedException {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> System.out.println("jvm exit success!! ")));
        Thread testThread = new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(5000);
                    System.out.println("thread still running ....");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        testThread.setDaemon(true);
        testThread.start();
    }
}

1.使用arthas查看线上的程序线程情况 deamon为守护线程

JAVA并发课题

  1. redis分布式锁续命用守护线程实现

public Boolean tryLock(String key, String value, long expireTime) {
        try {
            //自旋上限
            int waitCount = timeout;
            while (waitCount > 0) {
                //SET命令返回OK ,则证明获取锁成功
                Boolean setIfAbsent = redisTemplate.opsForValue().setIfAbsent(key, value, expireTime, TimeUnit.SECONDS);
                if (setIfAbsent) {
                    //续命
                    Thread demo = new Thread(new Runnable() {
                        @Override
                        public void run() {
                            while (true) {
                                Boolean expire = redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);
//有可能已经主动删除key,不需要在续命
                                if(!expire){
                                    return;
                                }
                                try {
                                    Thread.sleep(1000);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                    });
                    demo.setDaemon(true);
                    demo.start();
                    return true;
                }
                //否则循环等待,在timeout时间内仍未获取到锁,则获取失败
                Thread.sleep(3000);
                waitCount--;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
 
        }
        //没设置到锁,即表示超时了
        return false;
    }

redission使用的就是这种形式,比如上锁10s,10s后就解锁了,守护线程会一直给锁续命,当主线程退出的时候,守护线程也会跟着退出。

  1. 那OOM时会不会调用钩子方法?

JAVA并发课题 OOM后钩子方法仍然生效,spring的销毁方法也是生效,分布式锁也可使用,nocas服务取消的时候也在使用

线程优先级

“优先级”这个参数通常并不是那么地“靠谱”,理论上说线程的优先级越高,分配到时间片的几率也就越高,但是在实际运行过程中却并非如此,优先级只能作为一个参考数值,而且具体的线程优先级还和操作系统有关

终止线程

  1. interrupt方法

 static class TestThread implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                System.out.print(i+" ");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    static class TestThreadWithSync implements Runnable {
        @Override
        public void run() {
            synchronized (this) {
                for (int i = 20; i < 30; i++) {
                    System.out.print(i+" ");
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    static class TestThreadWithLock implements Runnable {
        ReentrantLock reentrantLock = new ReentrantLock();
        @Override
        public void run() {
            reentrantLock.lock();
            try {
                for (int i = 30; i < 40; i++) {
                    System.out.print(i+" ");
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } finally {
                reentrantLock.unlock();
            }
        }
    }

    /**
     * 增加判断标识 如果标记中断就暂停执行
    */
    static class TestInterruptedStop implements Runnable {

        @Override
        public void run() {
            System.out.println("开始执行");
            synchronized (this) {
                //如果当前线程被中断,这里需要主动退出
                while (!Thread.currentThread().isInterrupted()) {
                }
                System.out.println("end");
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
//        Thread testThread = new Thread(new TestThread());
//        testThread.start();
        Thread testThreadWithSync = new Thread(new TestThreadWithSync());
        testThreadWithSync.start();
//        Thread testThreadWithLock = new Thread(new TestThreadWithLock());
//        testThreadWithLock.start();
//        Thread forEverThread = new Thread(new ForEverThread());
//        forEverThread.start();
//        Thread testInterruptedStop = new Thread(new TestInterruptedStop());
        Thread.sleep(2000);
//        testInterruptedStop.interrupt();
        // 如果线程正常执行 那么调用这个函数是无效的
//        forEverThread.interrupt();
//        testThread.interrupt();
        testThreadWithSync.interrupt();
//        testThreadWithLock.interrupt();

    }
  1. shutdownshutDownNow 网上说的shutDownNow会终止线程池中正在执行的线程,实际操作并不是,其实区别是shutDownNow将队列中没有执行的任务放入到一个 List 集合中,并且返回给调用线程。

  1. 锁升级 当加锁后一个线程访问时,会进入偏向锁状态,当多个线程访问会进入轻量级锁,当多个竞争的线程抢夺该 monitor 的时候,会采用 CAS 的方式,当抢夺次数超过 10 次,或者当前 CPU 资源占用大于 50% 的时候,该锁就会从轻量级锁的状态上升为了重量级锁。
  2. synchronized与lock
    • 支持获取锁超时机制;
    public void tryLockMethod_2() {
    try {
        if (reentrantLock.tryLock(1, TimeUnit.SECONDS)) {
            try {
                i++;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                reentrantLock.unlock();
            }
        } else {
            //todo
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    }
    
    
    • 支持非阻塞方式获取锁;
    • 支持可中断方式获取锁。

线程池

JAVA并发课题 1 spring 内部的异步注解


@Service
public class AsyncService {

    //加入注解之后,该方法自动就带有了异步的效果
    @Async
    public void testAsync(){
        try {
            Thread.sleep(1000*2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+" async test");
    }
}

直接使用会有一个问题,会无限创建线程,所以要增加配置


@Configuration
public class AsyncExecuteConfig extends AsyncConfigurerSupport {

    @Bean
    public ThreadPoolTaskExecutor asyncExecutor() {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        threadPool.setCorePoolSize(3);
        threadPool.setMaxPoolSize(3);
        threadPool.setWaitForTasksToCompleteOnShutdown(true);
        threadPool.setAwaitTerminationSeconds(60 * 15);
        return threadPool;
    }

    @Override
    public Executor getAsyncExecutor() {
        return asyncExecutor();
    }
}

@Async会失效吗? 其实他也是通过代理来实现的,如果同一个类中使用@Async也会失效

线程本地变量

ThreadLocal 对象中提供了线程局部变量,它使得每个线程之间互不干扰,一般可以通过重写它的 initialValue 方法机械能赋值。当线程第一次访问它的时候,就会直接触发 initialValue 方法。 是典型的以空间换时间的处理

  1. 原理
public void set(T value) {
    //获取当前请求的线程    
    Thread t = Thread.currentThread();
    //取出Thread类内部的threadLocals变量,这个变量是一个哈希表结构
    ThreadLocalMap map = getMap(t);
    if (map != null)
        //将需要存储的值放入到这个哈希表中
        map.set(this, value);
    else
        createMap(t, value);
}

void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}


static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

Java中当一个对象仅被一个弱引用引用时,如果GC运行, 那么这个对象就会被回收。 弱引用的一个特点是它何时被回收是不可确定的; 思考了一下,如果key是弱引用,那么被回收先不说内存泄漏问题,数据本身就回丢失呀,所以操作了一下


public class Demo {
    static ThreadLocal<OOMObject> local = new ThreadLocal<>();
    public static void main(String[] args) {
        local.set(new OOMObject("千云"));
        System.gc();
        OOMObject oomObject = local.get();

        System.out.println(oomObject);
    }
}

赋值GC后仍然可以得到结果,就很奇怪,弱引用并没有被回收,还要进一步的思考

JAVA并发课题

  1. 应用 获取路径内方法的执行时长
@Configuration
public class TimeCountInterceptor implements HandlerInterceptor
{
    static class CommonThreadLocal<Long> extends ThreadLocal{
        @Override
        protected Object initialValue() {
            return -1;
        }
    }

    private static ThreadLocal<Long> timeCount = new CommonThreadLocal<>();


    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        System.out.println("提前赋值的获取:"+ timeCount.get());
        //中间写逻辑代码,比如判断是否登录成功,失败则返回false
        timeCount.set(System.currentTimeMillis());
        return true;
    }

    @Override
    public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) {
        long currentTime = System.currentTimeMillis();
        long startTime = timeCount.get();
        long timeUse = currentTime - startTime;
        System.out.println(Thread.currentThread().getName() + "耗时:" + timeUse + "ms");
        timeCount.remove();
    }
}

多线程优化查询速度

CompletableFutureFuture都可以进行优化查询,java8的parallelStream很实用

public List<UserInfoDTO> batchQueryWithParallelV1(List<Long> userIdList) {
   List<UserInfoDTO> resultList = new ArrayList<>();
   //并发调用
   userIdList.parallelStream().forEach(userId -> {
       UserInfoDTO userInfoDTO = userQueryService.queryUserInfoWrapper(userId);
       resultList.add(userInfoDTO);
   });
   return resultList;
}

线程限流

单机版限流,使用Semaphore来实现,如果超过定义的数量那么就丢弃,如果是分布式的服务部署,这种形式就不ok了,要采用redis的形式了

/**
 * @Description:单机版限流
 * @author: yjw
 * @date: 2021/12/20
 */
@Slf4j
@RestController
public class SimpleLimitController {
    private Semaphore semaphore = new Semaphore(2);

    @GetMapping("do-test-limit")
    public void doTest() {
        boolean status = false;
        try {
            //限制流量速度
            status = semaphore.tryAcquire();
            if (status) {
                this.doSomeBiz();
            }
        } catch (Exception e) {
            log.error("[doTest] error is ", e);
        } finally {
            if (status) {
                semaphore.release();
            }
        }
    }


    /**
     * 执行业务逻辑
     */
    private void doSomeBiz() throws InterruptedException {
        System.out.println(Thread.currentThread().getName());
        Thread.sleep(20000);
    }
}
转载自:https://juejin.cn/post/7188820402896896061
评论
请登录