Async异步队列编程模型
压测通过,未deploy。
应用场景
Before:ContextRequest——>workflow(contextRequest)
contextRequest直接随着本线程进入处理流程
After: ContextRequest——>TaskImpl(contextRequest, asyncInvokeImpl)——>AsyncHandle.offer(task)………Worker——>AsyncHandle.poll(task)——>executor.execute(task)
contextRequest和封装处理它的流程的实例作为参数进入TaskImpl的构造器以获得task实例,再将task放入AsyncHandle里的阻塞队列里。AsyncHandle在实例化时启动独立线程不断将task取出来给Executor的线程组处理。
需求:RPC中,将Acceptor(bossGroup)与workGroup异步并以队列存放task,以提高性能。 压测结果 qps>20k/s
AsyncHandle设计
public class AsyncHandle {
private static final int DEFAULT_QUEUE_MAX_SIZE = 500;
private static final int DEFAULT_WORKER_MIN_SIZE = Runtime.getRuntime().availableProcessors();
private static final int DEFAULT_WORKER_MAX_SIZE = Runtime.getRuntime().availableProcessors() * 2;
private int workerCount = DEFAULT_WORKER_MAX_SIZE;
private BlockingQueue<AsyncTask> tasks = null;
private Executor executor = new ThreadPoolExecutor(
DEFAULT_WORKER_MIN_SIZE,
workerCount,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new VThreadFactory("async-invoker"),
new ThreadPoolExecutor.CallerRunsPolicy());
/**
* 默认队列大小
* @param workerCount 处理任务线程数
*/
public AsyncHandle(int workerCount) {
this(DEFAULT_QUEUE_MAX_SIZE, workerCount);
}
/**
* 默认参数 queueSize 500; workerCount CUP * 2
*/
public AsyncHandle() {
this(DEFAULT_QUEUE_MAX_SIZE, DEFAULT_WORKER_MAX_SIZE);
}
/**
* 默认 FIFRBlockingQueue 作为队列容器
* @param queueMaxSize 队列最大容量
* @param workerCount 处理队列的线程数
*/
public AsyncHandle(int queueMaxSize, int workerCount) {
this.workerCount = workerCount;
tasks = new FIFRBlockingQueue<>(queueMaxSize, new VOfferPolicy());
new Thread(new AsyncWorker()).start();
}
/**
* 可指定队列作为容器
* @param workerCount 处理线程数
* @param queue 队列
*/
public AsyncHandle(int workerCount, BlockingQueue<AsyncTask> queue) {
this.workerCount = workerCount;
this.tasks = queue;
new Thread(new AsyncWorker()).start();
}
/**
* Task 入队
* @param task 任务
* @throws AsyncInvokerException
*/
public void offer(AsyncTask task) throws AsyncInvokerException {
tasks.offer(task);
}
/**
* 处理任务类
*/
class AsyncWorker implements Runnable {
@Override
public void run() {
while (true) {
try {
AsyncTask task = tasks.poll(100, TimeUnit.MILLISECONDS);
if(task == null) {
continue;
}
executor.execute(task);
} catch (Exception e) {
}
}
}
}
}
FIFRBlockingQueue作为容器,是根据jdk提供的FIFOBlockingQueue加入移除策略改写的,为了满足日后业务需求提供了指定容器的构造器。
AsyncTask设计
import lombok.Data;
@Data
public class AsyncTask<T> implements Runnable{
private AsyncInvoke<T> invoke;
private long currentTime;
private T ctx;
public void init(AsyncInvoke<T> invoke, long currentTime, T t) {
this.invoke = invoke;
this.currentTime = currentTime;
this.ctx = t;
}
public void cancelTask() {
try {
if (invoke.enter(ctx)) {
invoke.exceptionCatch(ctx, new AsyncInvokerException("Task was abandon"));
}
}catch (Exception e) {
invoke.exceptionCatch(ctx, e);
}finally {
invoke.exit(ctx);
}
}
@Override
public void run() {
try {
if(invoke.enter(ctx)) {
ctx = invoke.run(ctx);
invoke.messageReceiver(ctx);
}
}catch (Throwable t) {
invoke.exceptionCatch(ctx, t);
}finally {
invoke.exit(ctx);
}
}
}
init()用于封装task,cancelTask()在移除策略里使用,run()继承Runnable表明这是可以提供给Worker执行的线程组的任务
AsyncInvoke接口设计
Interface
public interface AsyncInvoke<T> {
default boolean enter(T t) throws Exception {
return true;
}
T run(T t) throws Exception;
default T messageReceiver(T t) throws Exception {
return t;
}
void exceptionCatch(T t, Throwable e);
default void exit(T t) {
}
}
enter()作为预处理方法,与主要业务处理方法run()分开,因为即使任务因为超时等原因取消了也需要走流程返回ncp的response。exceptionCatch()是异常处理流程,exit()是放在finally里。
impl
public class VineAsyncInvokeImpl implements AsyncInvoke<CtxRequest> {
@Inject
private Injector injector;
@Inject
private RequestScope scope;
private boolean ctxClose=false;
@Inject
public VineAsyncInvokeImpl(){}
private Log logger = VineLogFactory.getLog(VineAsyncInvokeImpl.class);
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);
HttpAdapter adapter = (HttpAdapter) injector.getInstance(VineGlobal.getGlobal().getAdapterClass());
@Override
public boolean enter(CtxRequest ctxRequest) throws ServiceException {
scope.enter();
InetSocketAddress insocket = (InetSocketAddress) ctxRequest.getChannelHandlerContext().channel()
.remoteAddress();
adapter.initialize(ctxRequest.getRequest(), response, insocket.getAddress().getHostAddress());
return adapter.preProcess();//
}
@Override
public CtxRequest run(CtxRequest ctxRequest) throws ServiceException {
VineFilterChain filterChain = injector.getInstance(VineFilterChain.class);
filterChain.execute();
return ctxRequest;
}
@Override
public CtxRequest messageReceiver(CtxRequest ctxRequest) throws ServiceException {
return ctxRequest;
}
@Override
public void exceptionCatch(CtxRequest ctxRequest, Throwable e) {
ctxClose=true;
if (e instanceof InvalidInvocationException) {
adapter.getResponse().setError((InvalidInvocationException) e);
} else {
adapter.getResponse().setError((new ServerErrorException()));
}
}
@Override
public void exit(CtxRequest ctxRequest) {
try {
try {
adapter.postProcess();
} catch (InvalidMethodException e) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
logger.error("Invalid Method Exception ", e);
} catch (InvalidNotFindPathException e) {
response.setStatus(HttpResponseStatus.NOT_FOUND);
logger.error("Invalid Not Find Path Exception", e);
} catch (Exception e) {
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
logger.error("adapter post process error", e);
}
boolean isKeepAlive = HttpHeaders.isKeepAlive(ctxRequest.getRequest());
if (isKeepAlive) {
response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
ChannelFuture future = ctxRequest.getChannelHandlerContext().writeAndFlush(response);
if (!isKeepAlive) {
try {
future.sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
ctxRequest.getChannelHandlerContext().close();
}
if(ctxClose){
ctxRequest.getChannelHandlerContext().close();
}
} finally {
scope.exit();
}
}
}
杂项
FIFRBlockingQueue简要
public class FIFRBlockingQueue<T> extends AbstractQueue<T> implements BlockingQueue<T> {
final OfferPolicy<T> policy;
public FIFRBlockingQueue(int capacity, OfferPolicy<T> policy) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(false);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
this.policy = policy;
}
public boolean offer(T obj) {
checkNotNull(obj);
T lastNode = null;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length) {
lastNode = rpqueue(obj);
} else {
enqueue(obj);
}
} finally {
lock.unlock();
}
if (lastNode != null) {
policy.cancel(lastNode);
}
return true;
}
public T poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
}
OfferPolicy接口
public interface OfferPolicy<T> {
public static final OfferPolicy<Object> Default = new OfferPolicy<Object>() {
@Override
public void cancel(Object t) {}
};
void cancel(T t);
}
VofferPolicy实现
public class VOfferPolicy implements OfferPolicy<AsyncTask> {
@Override
public void cancel(AsyncTask asyncTask) {
asyncTask.cancelTask();
}
}
VThreadFactory
public class VThreadFactory implements ThreadFactory {
private final String threadNamePrefix;
private final AtomicInteger number = new AtomicInteger(0);
public VThreadFactory(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName(threadNamePrefix + "-" + number.getAndIncrement());
return thread;
}
}
This work is licensed under a CC A-S 4.0 International License.