RabbitMQ生产者模型

Posted by Alexander Wang on June 13, 2016

RabbitMQ生产者模型

经过压测,未实际业务使用。

RabbitMQ连接池设计

采用阻塞队列作为连接池的存储结构,避免使用代理,再生产者那一层完成归还连接的操作,提高性能。连接池connection总数固定,使用委托的自动恢复连接的机制。

初始化

public RabbitConnectionPool(String uri, int rabbitPoolSize, long waitTimes) {
        try {
            rtnConnTask();
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUri(uri);
            factory.setAutomaticRecoveryEnabled(true);
            factory.setRequestedHeartbeat(5);
            factory.setConnectionTimeout(2000);
            int poolSize = rabbitPoolSize == 0 ? DEFAULT_POOL_SIZE : rabbitPoolSize;
            waitTime = waitTimes == 0 ? DEFAULT_WAIT_TIME : waitTimes;
            for (int i = 0; i < poolSize; i++) {
                connList.add(factory.newConnection());
            }
        } catch (Exception e) {
            logger.error("create rabbitmq pool fail.", e);
            throw new RuntimeException(e);
        }
    }

通过两个队列分别存放正常的和异常的connection

   private LinkedBlockingDeque<Connection> connList = new LinkedBlockingDeque<Connection>();
    private LinkedBlockingDeque<Connection> connClosedList = new LinkedBlockingDeque<Connection>();

获得connection的实例

通过pool的getConnection()

    public Connection getConnection() throws VineRMQException {
        Connection connection = null;
        try {
            connection = connList.poll(waitTime, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            throw new VineRMQException(e);
        }
        if (connection == null) {
            logger.error("rabbitMQ connections pool full and conn are all in using");
            throw new VineRMQException("rabbitMQ connections pool full and conn are all in using");
        }
        if (!connection.isOpen()) {
            connClosedList.offer(connection);
            return getConnection();
        }
        return connection;
    }

将恢复的异常connection放入正常的queue的task

    public void rtnConnTask() {
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                try {
                    rtnConnToClosedConnList();
                } catch (VineRMQException e) {
                    logger.error("rtnConnToClosedConnList failed", e);
                    throw new RuntimeException(e);
                }
            }
        }, DEFAULT_RETURN_INTERVAL_MILLS, DEFAULT_RETURN_INTERVAL_MILLS);
    }
    private void rtnConnToClosedConnList() throws VineRMQException {
        Connection connection = connClosedList.poll();
        if (connection == null) {
            return;
        } else {
            boolean flag = connection.isOpen() ? connList.offer(connection) : connClosedList.offer(connection);
            if (!flag) {
                logger.error("RabbitMQConnectionLost : rtnConnToClosedConnList cause ERROR");
            }
        }

    }

归还connection的方法

    public void returnConnection(Connection connection) {
        if (!connList.offer(connection)) logger.error("RabbitMQConnectionLost : returnConnection cause ERROR");
    }

生产者设计

初始化

    public VineMQProducer() {
        if (!inited) {
            try {
                initialized(false);
            } catch (Exception e) {
                logger.error("MQ Producer init failed, configKey:" + getConfigKey(), e);
            }
        }
    }

    public void initialized(boolean throwExc) {
        try {
            Config conf = DefaultConfig.getConfig();
            config = conf.get(getConfigKey(), MQProducerConfig.class);
            connPool = new RabbitConnectionPool(config.getUri(), config.getPoolSize(), config.getWaitTime());
            inited = true;
        } catch (Exception e) {
            if (throwExc) {
                throw e;
            }
            logger.warn("MQ Producer init failed, configKey:" + getConfigKey(), e);
            inited = true;
        }
    }

发送消息

消息的发送可以传入routingKey进行路由

    public void send(String message, String routingKey) throws VineRMQException {
        Channel channel = null;
        try {
            Connection conn = connPool.getConnection();

            try {
                channel = conn.createChannel();
                if (channel == null) {
                    throw new VineRMQException("connection is null");
                }
            } finally {
                connPool.returnConnection(conn);
            }

            if (config.getExchange() != null) {
                if (routingKey == null) {
                    channel.basicPublish(config.getExchange(), config.getRoutingKey(), null, message.getBytes());
                } else {
                    channel.basicPublish(config.getExchange(), routingKey, null, message.getBytes());
                }
            }

        } catch (Exception e) {
            logger.error("send message fail. message:" + message);
            throw new VineRMQException(e);
        } finally {
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    throw new VineRMQException(e);
                }
            }
        }
    }

线程组发送

每个线程都单独获得一个channel,避免了排队,虽让channel本身是线程安全的

    private ExecutorService executorService = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors(),
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(10_000));
    public void asyncSend(String message, String routingKey) {
        executorService.execute(() -> {
            try {
                send(message, routingKey);
            } catch (VineRMQException e) {
            logger.error(message + "send failed, exception: " + e);
            }
        });
    }

踩过的坑

坑1: 一开始是设计了,connection和channel的双连接池的设计,但是实际测试发现channel处理能力非常强大,2000q/s也只需要一个connection产生平均1.2个channel就足够使用了,所以舍弃了该方案。

坑2:连接池使用了代理的方式,在压测的时候没发现问题,上了生产,结果订单发生了错发!错发!非常可怕的bug,我仔细看了代码,没发现问题,在并发量小时候是没有问题的,结果量大就出现这种问题了,推测可能是RabbitMQ的问题。

心得

越是简单的设计越是好设计。


Creative Commons License
This work is licensed under a CC A-S 4.0 International License.