异步转同步的几种实现
- 定时轮询
- 等待通知
定时轮询
定时轮询的方法实现非常简单,也易于实现,对于简单场景 优点在于实现简单,而缺点在于耗时受限于轮询周期而且不太优雅。
public class LongPollSyncer<T, R> implements Syncer<T, R> {
private final Map<T, R> resourceMap;
private final ExecutorService executor;
public LongPollSyncer(int size) {
resourceMap = new ConcurrentHashMap<>(size);
}
@Override
public R tryAcquireWith(T key, int timeOutMilliSec) throws InterruptedException {
long start = System.currentTimeMillis();
while(true){
if(System.currentTimeMillis() - start > timeOutMilliSec){
break;
}
if(isDone(key)){
break;
}
Thread.sleep(200);
}
return resourceMap.getOrDefault(key, null);
}
@Override
public void setResource(T key, R resource) {
resourceMap.putIfAbsent(key, resource);
}
public boolean isDone(T key){
return resourceMap.containsKey(key);
}
}
等待通知
利用Lock与Condition等待通知模式来实现 以下是一个并发量不高时的简单的实现
- 同步调用:请求线程发起异步请求后,通过调用tryAcquireWith方法,等待key的资源,新建锁与condition,等待await
- 结果返回:等待另一个线程返回结果,通过同一个key关联,设置资源,signalAll唤醒等待线程。
- 同步返回:请求线程被唤醒获取资源返回结果。
- 超时返回:请求线程等待超时返回。
- 删除锁与Condition
public class Syncer<T, R> {
private final Map<T, ReentrantLock> locks;
private final Map<T, Condition> conditions;
private final Map<T, R> result;
public NotifySyncer() {
int initSize = 16;
locks = new ConcurrentHashMap<>(initSize);
conditions = new ConcurrentHashMap<>(initSize);
result = new ConcurrentHashMap<>(initSize);
}
/**
* 同步等待资源
*
* @param key 资源key
* @param timeOutMilliSec 超时时间
* @return R 返回资源,如果超时返回null
*/
@Override
public R tryAcquireWith(T key, int timeOutMilliSec) {
if (locks.containsKey(key)) {
throw new IllegalArgumentException("该key在等待结果中");
}
//create lock and condition
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
locks.put(key, lock);
conditions.put(key, condition);
try {
lock.lock();
while (!isDone(key)) {
boolean inTime = condition.await(timeOutMilliSec, TimeUnit.MILLISECONDS);
if (isDone(key) || !inTime) {
break;
}
}
return result.get(key);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
locks.remove(key);
conditions.remove(key);
}
}
/**
* 异步资源设置
*
* @param key 资源key
* @param resource 资源
*/
@Override
public void setResource(T key, R resource) {
if (!locks.containsKey(key)) {
throw new IllegalArgumentException("key未找到,请检查任务");
}
ReentrantLock lock = locks.get(key);
Condition condition = conditions.get(key);
lock.lock();
result.put(key, resource);
condition.signalAll();
lock.unlock();
}
/**
* 检查资源是否设置完成
* @param key
* @return
*/
public boolean isDone(T key) {
return result.containsKey(key);
}
}
实现future接口,将每个资源的请求放到一个future中,更符合java异步编程规范。 同时Future内可设置异常的情况下结束,调用方能在异常发生时就退出,而不是等待超时后才获取失败的结果。
public class SyncFuture<T> implements Future<T> {
private final ReentrantLock lock;
private final Condition condition;
private Exception exception;
private T value;
private Integer state;
private static final Integer RUNNING = 1;
private static final Integer CANCELED = 2;
private static final Integer ERROR = 3;
private static final Integer FINISHED = 4;
public SyncFuture() {
this.state = RUNNING;
this.lock = new ReentrantLock();
this.condition = lock.newCondition();
}
/**
* 这种实现实际上不是中断,而是signalAll,因为没有condition.interruptAll
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return
*/
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
//当前已经完成,无法取消
if (isDone()) {
return false;
}
try {
lock.lock();
state = CANCELED;
if(mayInterruptIfRunning && lock.hasWaiters(condition)) {
condition.signalAll();
}
} finally {
lock.unlock();
}
return true;
}
@Override
public boolean isCancelled() {
return CANCELED.equals(state);
}
@Override
public boolean isDone() {
return !RUNNING.equals(state);
}
@Override
public T get() throws InterruptedException, ExecutionException {
try {
lock.lock();
while (!isDone()) {
condition.await();
}
} finally {
lock.unlock();
}
return returnValue();
}
@Override
public T get(long timeout, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
try {
lock.lock();
while (!isDone()) {
boolean inTime = condition.await(timeout, unit);
if (isDone() || !inTime) {
break;
}
}
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException();
}
return value;
}
public void set(T value) {
try {
lock.lock();
if (RUNNING.equals(state)) {
state = FINISHED;
this.value = value;
}
condition.signalAll();
} finally {
lock.unlock();
}
}
public void completeException(Exception ex) {
try {
lock.lock();
if (RUNNING.equals(state)) {
exception = ex;
state = ERROR;
}
condition.signalAll();
} finally {
lock.unlock();
}
}
private T returnValue() {
if (CANCELED.equals(state)) {
throw new RuntimeException("cancel");
}
if (ERROR.equals(state)) {
throw new RuntimeException(exception);
}
return value;
}
}
也可以使用CountDownLatch和CyclicBarrier实现,但本质也是等待通知机制。在此就不贴代码了。