我用ChatGPT,给RabbitMQ加了个连接池
大家好呀,我是楼仔。
上次我把 RabbitMQ 集成到项目中,但是每次使用 RabbitMQ 时都去 New 一个连接,导致并发起不来,所以这次我们就给 RabbitMQ 加一个连接池。
为了偷个懒,我直接用 ChatGPT 教我加。
01 ChatGPT 生成的代码
我输入的关键字如下:
rabbitmq 实现一个连接池 java
下面是 GhatGPT 4.0 给我的回复:
以下是在Java中使用连接池创建RabbitMQ连接的示例:
首先,需要添加pika客户端依赖项,例如:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
创建一个RabbitMQ连接类:
import com.rabbitmq.client.Connection;
public class RabbitMQConnection {
private Connection connection;
public RabbitMQConnection(String host, int port, String username, String password) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
try {
connection = factory.newConnection();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
public Connection getConnection() {
return connection;
}
public void close() {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
创建一个RabbitMQ连接池类:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class RabbitMQConnectionPool {
private final BlockingQueue<RabbitMQConnection> pool;
public RabbitMQConnectionPool(String host, int port, String username, String password, int poolSize) {
pool = new LinkedBlockingQueue<>(poolSize);
for (int i = 0; i < poolSize; i++) {
pool.add(new RabbitMQConnection(host, port, username, password));
}
}
public RabbitMQConnection getConnection() throws InterruptedException {
return pool.take();
}
public void returnConnection(RabbitMQConnection connection) {
pool.add(connection);
}
public void close() {
pool.forEach(RabbitMQConnection::close);
}
}
创建一个消费者类:
import com.rabbitmq.client.*;
public class RabbitMQConsumer implements Runnable {
private final RabbitMQConnectionPool connectionPool;
private final String queueName;
public RabbitMQConsumer(RabbitMQConnectionPool connectionPool, String queueName) {
this.connectionPool = connectionPool;
this.queueName = queueName;
}
@Override
public void run() {
try {
Connection connection = connectionPool.getConnection().getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
channel.basicConsume(queueName, true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Received message: " + message);
}, consumerTag -> {});
} catch (InterruptedException | IOException e) {
e.printStackTrace();
}
}
}
创建一个生产者类:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
public class RabbitMQProducer {
private final RabbitMQConnectionPool connectionPool;
private final String queueName;
public RabbitMQProducer(RabbitMQConnectionPool connectionPool, String queueName) {
this.connectionPool = connectionPool;
this.queueName = queueName;
}
public void send(String message) {
try {
Connection connection = connectionPool.getConnection().getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("Sent message: " + message);
channel.close();
connection.close();
} catch (InterruptedException|IOException e) {
e.printStackTrace();
}
}
}
创建一个RabbitMQ连接池和多个消费者来消费队列:
public class Main {
public static void main(S
回复