Grpc应用(二)、搭配commons-pool2连接池

commons-pool2是apache的连接池框架,方便我们创建连接池,实现tcp连接的复用,不需要每次请求都发送握手请求、断开请求等操作。

上一篇的Grpc应用博客只是简单地实现了一个Grpc应用,但是,一个致命的缺点在于每次创建连接、断开连接时的高消耗,这次博客使用commons-pool2来实现一个Grpc的连接池。

首先,我们需要知道,服务端是不需要连接池的,google的Grpc服务端使用的是netty这个nio框架,能有效的处理每一个请求,而不是每次请求创建一个线程进行处理,等下我们会看到具体操作。而客户端则不同,我们需要自己创建一个连接池来维持tcp连接。

废话不多说了,上代码:

pom.xml
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.4.2</version>
</dependency>

首先添加commons-pool2依赖 。

接下来创建一个GrpcClientFactory,以及测试代码:

public class GrpcClientFactory extends BasePooledObjectFactory<GrpcClient> {


    @Override
    public GrpcClient create throws Exception {
        return new GrpcClient("localhost", 38628);
    }

    @Override
    public PooledObject<GrpcClient> wrap(GrpcClient client) {
        return new DefaultPooledObject<>(client);
    }

    public static void main(String[] args) throws Exception {

        /** 连接池的配置 */
        GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig;
        
        /** 下面的配置均为默认配置,默认配置的参数可以在BaseObjectPoolConfig中找到 */
        poolConfig.setMaxTotal(8); // 池中的最大连接数
        poolConfig.setMinIdle(0); // 最少的空闲连接数
        poolConfig.setMaxIdle(8); // 最多的空闲连接数
        poolConfig.setMaxWaitMillis(-1); // 当连接池资源耗尽时,调用者最大阻塞的时间,超时时抛出异常 单位:毫秒数
        poolConfig.setLifo(true); // 连接池存放池化对象方式,true放在空闲队列最前面,false放在空闲队列最后
        poolConfig.setMinEvictableIdleTimeMillis(1000L * 60L * 30L); // 连接空闲的最小时间,达到此值后空闲连接可能会被移除,默认即为30分钟
        poolConfig.setBlockWhenExhausted(true); // 连接耗尽时是否阻塞,默认为true

        /** 对象池创建 */
        GenericObjectPool<GrpcClient> objectPool = new GenericObjectPool<>(new GrpcClientFactory, poolConfig);

        new Thread(makeTask(objectPool)).start;
        new Thread(makeTask(objectPool)).start;
        new Thread(makeTask(objectPool)).start;
        new Thread(makeTask(objectPool)).start;

    }

    private static Runnable makeTask(GenericObjectPool<GrpcClient> objectPool){
        return  -> {
 GrpcClient client = null;
 try {
 client = objectPool.borrowObject;
 } catch (Exception e) {
 e.printStackTrace;
 }
 try {
 String req = "world!";
 String resp = client.request(req);
 System.out.println(resp);
 } finally {
 try {
 client.shutdown;
 } catch (InterruptedException e) {
 e.printStackTrace;
 }
 }
        };
    }
}

代码很短,但是该有的功能却已经齐全了,如果觉得格式不好看的话,可以自行将main方法、maskTask方法的代码再移出到一个测试类中,这里为了代码的紧凑,所以使用了一个类完成了所有操作。

首 先看GrpcClientFactory这个类,它继承了BasePooledObjectFactory<GrpcClient>,用来实 现创建连接池中连接对象的功能,如果觉得连接池这个名字不容易理解的话,可以将连接池看成对象池,将连接对象看成GrpcClient,这样依赖,便容易 理解些了。

这个类中需要实现两个方法,但其实这就是一个方法,因为BasePooledObjectFactory<GrpcClient>中是这样创建我们的连接对象的:

这个PooledObject,将我们的GrpcClient作为它的一个属性,并另外添加了一些其他的属性,例如创建时间、最后一次调用时间等:

继续看main方法,里面先创建了一个GenericObjectPoolConfig,用来配置连接池的属性,最主要属性已经在代码中了,并且配上了注解,相信大家都能看懂:)

最后一步,连接池的创建,commons-pool2为我们提供了连接池的快捷创建方式。它接受两个参数,分别对应连接池中连接对象创建工厂、连接池配置,如下:

连接池对象 = 连接池所维护的对象的创建工厂 + 连接池对象配置

至此,一个连接池对象创建完毕,接下来测试一下吧。修改一下我们之前的HelloWorldRpcServiceImpl:

public class HelloWorldRpcServiceImpl implements HelloWorldRpcServiceGrpc.HelloWorldRpcService {

    /** Grpc并不是单线程的 */
//    public static int count = 0;

    /** 原子Integer */
    public static AtomicInteger count = new AtomicInteger(0);

    @Override
    public void sayHello(HelloWorldRequest request, StreamObserver<HelloWorldResponse> responseObserver) {
        String req = request.getRequest;
        HelloWorldResponse resp = HelloWorldResponse.newBuilder
 .setResponse("hello " + req + " ")
 .build;
        responseObserver.onNext(resp);
        responseObserver.onCompleted;

        System.out.println(count.incrementAndGet + Thread.currentThread.getName);
    }

}

我们使用了一个共有属性count,
Thread.currentThread.getName来测试线程安全性,毕竟服务端单线程就太坑爹了。

先启动服务端,然后运行我们的GrpcClientFactory,测试结果如下:

可以看到,我们的Grpc服务端至少开了4条线程,你也可以试试将我们的任务线程多开点看看,试试极限在哪,由此可知公有可变属性需要进行同步。

至此,Grpc的commons-pool2连接池篇也结束了,希望对大家有所帮助。

Grpc应用(二)、搭配commons-pool2连接池,
http://zk-chs.iteye.com/blog/2308730

原文链接:,转发请注明来源!