异步转同步的几种实现

异步转同步的几种实现

Posted by Link on October 11, 2022

异步转同步的几种实现

  1. 定时轮询
  2. 等待通知

定时轮询

定时轮询的方法实现非常简单,也易于实现,对于简单场景 优点在于实现简单,而缺点在于耗时受限于轮询周期而且不太优雅。

public class LongPollSyncer<T, R> implements Syncer<T, R> {
    private final Map<T, R> resourceMap;

    private final ExecutorService executor;

    public LongPollSyncer(int size) {
        resourceMap = new ConcurrentHashMap<>(size);
    }

    @Override
    public R tryAcquireWith(T key, int timeOutMilliSec) throws InterruptedException {
        long start = System.currentTimeMillis();
        while(true){
            if(System.currentTimeMillis() - start > timeOutMilliSec){
                break;
            }

            if(isDone(key)){
                break;
            }
            Thread.sleep(200);
        }

        return resourceMap.getOrDefault(key, null);
    }

    @Override
    public void setResource(T key, R resource) {
        resourceMap.putIfAbsent(key, resource);
    }

    public boolean isDone(T key){
        return resourceMap.containsKey(key);
    }
}

等待通知

利用Lock与Condition等待通知模式来实现 以下是一个并发量不高时的简单的实现

  1. 同步调用:请求线程发起异步请求后,通过调用tryAcquireWith方法,等待key的资源,新建锁与condition,等待await
  2. 结果返回:等待另一个线程返回结果,通过同一个key关联,设置资源,signalAll唤醒等待线程。
  3. 同步返回:请求线程被唤醒获取资源返回结果。
  4. 超时返回:请求线程等待超时返回。
  5. 删除锁与Condition
public class Syncer<T, R> {
private final Map<T, ReentrantLock> locks;

    private final Map<T, Condition> conditions;

    private final Map<T, R> result;


    public NotifySyncer() {
        int initSize = 16;
        locks = new ConcurrentHashMap<>(initSize);
        conditions = new ConcurrentHashMap<>(initSize);
        result = new ConcurrentHashMap<>(initSize);
    }

    /**
     * 同步等待资源
     *
     * @param key             资源key
     * @param timeOutMilliSec 超时时间
     * @return R 返回资源,如果超时返回null
     */
    @Override
    public R tryAcquireWith(T key, int timeOutMilliSec) {
        if (locks.containsKey(key)) {
            throw new IllegalArgumentException("该key在等待结果中");
        }

        //create lock and condition
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        locks.put(key, lock);
        conditions.put(key, condition);

        try {
            lock.lock();
            while (!isDone(key)) {
                boolean inTime = condition.await(timeOutMilliSec, TimeUnit.MILLISECONDS);
                if (isDone(key) || !inTime) {
                    break;
                }
            }
            return result.get(key);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
            locks.remove(key);
            conditions.remove(key);
        }
    }

    /**
     * 异步资源设置
     *
     * @param key      资源key
     * @param resource 资源
     */
    @Override
    public void setResource(T key, R resource) {
        if (!locks.containsKey(key)) {
            throw new IllegalArgumentException("key未找到,请检查任务");
        }
        ReentrantLock lock = locks.get(key);
        Condition condition = conditions.get(key);

        lock.lock();
        result.put(key, resource);
        condition.signalAll();
        lock.unlock();
    }

    /**
     * 检查资源是否设置完成
     * @param key
     * @return
     */
    public boolean isDone(T key) {
        return result.containsKey(key);
    }
}

实现future接口,将每个资源的请求放到一个future中,更符合java异步编程规范。 同时Future内可设置异常的情况下结束,调用方能在异常发生时就退出,而不是等待超时后才获取失败的结果。

public class SyncFuture<T> implements Future<T> {


    private final ReentrantLock lock;

    private final Condition condition;

    private Exception exception;

    private T value;
    private Integer state;

    private static final Integer RUNNING = 1;

    private static final Integer CANCELED = 2;

    private static final Integer ERROR = 3;

    private static final Integer FINISHED = 4;

    public SyncFuture() {
        this.state = RUNNING;
        this.lock = new ReentrantLock();
        this.condition = lock.newCondition();
    }

    /**
     * 这种实现实际上不是中断,而是signalAll,因为没有condition.interruptAll
     * @param mayInterruptIfRunning {@code true} if the thread executing this
     * task should be interrupted; otherwise, in-progress tasks are allowed
     * to complete
     * @return
     */
    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        //当前已经完成,无法取消
        if (isDone()) {
            return false;
        }

        try {
            lock.lock();
            state = CANCELED;
            if(mayInterruptIfRunning && lock.hasWaiters(condition)) {
                condition.signalAll();
            }
        } finally {
            lock.unlock();
        }
        return true;
    }

    @Override
    public boolean isCancelled() {
        return CANCELED.equals(state);
    }

    @Override
    public boolean isDone() {
        return !RUNNING.equals(state);
    }

    @Override
    public T get() throws InterruptedException, ExecutionException {
        try {
            lock.lock();
            while (!isDone()) {
                condition.await();
            }
        } finally {
            lock.unlock();
        }

        return returnValue();
    }

    @Override
    public T get(long timeout, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        try {
            lock.lock();
            while (!isDone()) {
                boolean inTime = condition.await(timeout, unit);
                if (isDone() || !inTime) {
                    break;
                }
            }
        } finally {
            lock.unlock();
        }

        if (!isDone()) {
            throw new TimeoutException();
        }
        return value;
    }

    public void set(T value) {
        try {
            lock.lock();
            if (RUNNING.equals(state)) {
                state = FINISHED;
                this.value = value;
            }
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void completeException(Exception ex) {
        try {
            lock.lock();
            if (RUNNING.equals(state)) {
                exception = ex;
                state = ERROR;
            }
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    private T returnValue() {
        if (CANCELED.equals(state)) {
            throw new RuntimeException("cancel");
        }

        if (ERROR.equals(state)) {
            throw new RuntimeException(exception);
        }

        return value;
    }
}

也可以使用CountDownLatch和CyclicBarrier实现,但本质也是等待通知机制。在此就不贴代码了。