mac docker 单机部署 RocketMQ5 以及 SDK 消息收发测试
2024-02-19/2024-02-20
由于 RocketMQ 官网没有 docker 部署方式推荐,网上的教程看了下写的不是很好,所以自己做一个整理、记录。
1、创建容器共享网络
RocketMQ中有多个服务,需要创建多个容器,创建 docker 网络便于容器间相互通信。
rocker network create rocketmq
# docker network ls #列出所有docker网络
# docker network inspect rocketmq #查看网络信息
2、部署RocketMQ NameServer
# 创建映射目录, 并给rocketmq用户写权限
mkdir -p /Users/buguniao/docker/volumes/rocketmq/logs/
chmod o+w /Users/buguniao/docker/volumes/rocketmq/logs/
# 启动NameServer
docker run -d --name mqnamesrv -p 9876:9876 --network rocketmq
-v /Users/buguniao//docker/volumes/rocketmq/logs/:/home/rocketmq/logs
apache/rocketmq:5.1.3 sh mqnamesrv
随后确认服务是否启动成功
docker logs -f mqnamesrv

如图启动成功
3、部署Broker+Proxy
Local 模式,Broker 和 Proxy 是同进程部署,Proxy本身无状态,不需要额外启动Proxy。
部署命令:
docker run -d --name mqbroker -p 10911:10911 -p 10909:10909 --network rocketmq
-v /Users/buguniao/docker/volumes/rocketmq/log/:/home/rocketmq/logs
apache/rocketmq:5.1.3 sh mqbroker -n mqnamesrv:9876 --enable-proxy autoCreateTopicEnable=true
-c /home/rocketmq/rocketmq-5.1.3/conf/broker.conf
查看日志检查是否启动成功
docker exec -it mqbroker bash -c "tail -n 10 /home/rocketmq/logs/rocketmqlogs/proxy.log"

4、部署 RocketMQ Dashboard
现在来安装RocketMQ Dashboard
RocketMQ Dashboard 是 RocketMQ 的管控利器,为用户提供客户端和应用程序的各种事件、性能的统计信息,支持以可视化工具代替 Topic 配置、Broker 管理等命令行操作。
部署方式可见官网,点击链接
按照官网描述操作
- 拉取镜像
$ docker pull apacherocketmq/rocketmq-dashboard:latest
- 运行容器
docker run -d --name mqconsole -p 8098:8080 --network rocketmq \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=mqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \
styletang/rocketmq-console-ng
查看
http://localhost:8098/可见控制台并在Cluster下查看到 broker 代表部署成功。
5、使用RocketMQ SDK实现简单的收发消息
创建一个简单的 maven 工程,引入 junit 和 rocketmq-client-java
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.12</version>
</dependency>
</dependencies>
创建测试类 ProducerExample
package mqtest;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerExample {
private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);
public static void main(String[] args) throws ClientException {
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
String endpoint = "172.18.0.3:8081";
// 消息发送的目标Topic名称,需要提前创建。
String topic = "test_topic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
// 初始化Producer时需要设置通信配置以及预绑定的Topic。
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// 普通消息发送。
Message message = provider.newMessageBuilder()
.setTopic(topic)
// 设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
// 设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
// 消息体。
.setBody("messageBody".getBytes())
.build();
try {
// 发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (ClientException e) {
logger.error("Failed to send message", e);
}
// producer.close();
}
}
运行后报错了:
Exception in thread "main" java.lang.IllegalStateException: Expected the service ProducerImpl-0 [FAILED] to be RUNNING, but the service has FAILED
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:381)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractService.awaitRunning(AbstractService.java:305)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractIdleService.awaitRunning(AbstractIdleService.java:165)
at org.apache.rocketmq.client.java.impl.producer.ProducerBuilderImpl.build(ProducerBuilderImpl.java:93)
at mqtest.ProducerExample.main(ProducerExample.java:28)
Caused by: java.util.concurrent.ExecutionException: org.apache.rocketmq.shaded.io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 2.922725542s. [buffered_nanos=2925292375, waiting_for_connection]
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:588)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:567)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:91)
at org.apache.rocketmq.client.java.impl.ClientImpl.startUp(ClientImpl.java:188)
at org.apache.rocketmq.client.java.impl.producer.ProducerImpl.startUp(ProducerImpl.java:114)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractIdleService$DelegateService$1.run(AbstractIdleService.java:62)
at org.apache.rocketmq.shaded.com.google.common.util.concurrent.Callables.lambda$threadRenaming$3(Callables.java:103)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.rocketmq.shaded.io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 2.922725542s. [buffered_nanos=2925292375, waiting_for_connection]
at org.apache.rocketmq.shaded.io.grpc.Status.asRuntimeException(Status.java:539)
at org.apache.rocketmq.shaded.io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544)
at org.apache.rocketmq.shaded.io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at org.apache.rocketmq.shaded.io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at org.apache.rocketmq.shaded.io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:563)
at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:744)
at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
at org.apache.rocketmq.shaded.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.rocketmq.shaded.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
反应过来宿主机和 docker 容器内网络并不相通。
网络隔离导致生产者消息推送失败
为了使宿主机可以访问容器内部网络,这边推荐一个工具
desktop-docker-connector
按照文档操作:
- 先安装Mac端的服务
mac-docker-connector
$ brew tap wenjunxiao/brew
$ brew install docker-connector
- 首次配置通过以下命令把所有Docker所有
bridge子网放入配置文件
$ docker network ls --filter driver=bridge --format "{{.ID}}" | xargs docker network inspect --format "route {{range .IPAM.Config}}{{.Subnet}}{{end}}" >> "$(brew --prefix)/etc/docker-connector.conf"
- 启动 mac 端服务
$ sudo brew services start docker-connector
- 安装Docker端的容器
mac-docker-connector
$ docker pull wenjunxiao/mac-docker-connector
- 启动容器
$ docker run -it -d --restart always --net host --cap-add NET_ADMIN --name mac-connector wenjunxiao/mac-docker-connector
- 配置参数请见github 的使用文档
启动后再次进行测试,结果:
09:43:46.543 [main] INFO mqtest.ProducerExample - Send message successfully, messageId=012EDEFF29778EA86105E59DD200000000
见Dashboard

消息进来了,没有问题。
继续编写消费者测试用例
package mqtest;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
public class SimpleConsumerExample {
private static final Logger logger = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
@SuppressWarnings({"resource"})
public static void main(String[] args) throws ClientException {
// 消费示例:使用 SimpleConsumer 消费普通消息,主动获取消息处理并提交。
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "test_topic";
String tag = "test_tag";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
// 设置消费者分组。
.setConsumerGroup("test_group")
// 设置接入点。
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("172.18.0.3:8081").build())
// 设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 设置从服务端接受消息的最大等待时间
.setAwaitDuration(Duration.ofSeconds(3L))
.build();
try {
// SimpleConsumer 需要主动获取消息,并处理。
List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// 消费处理完成后,需要主动调用 ACK 提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);
}
});
} catch (ClientException e) {
// 如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
logger.error("Failed to receive message", e);
}
}
}
测试通过
MessageViewImpl{messageId=012EDEFF29778E9CC505E4F4C200000000, topic=test_topic, bornHost=buguniaodeMacBook-Pro.local, bornTimestamp=1708350146358, endpoints=ipv4:172.18.0.3:8081, deliveryAttempt=1, tag=test_tag, keys=[test_keys], messageGroup=null, deliveryTimestamp=null, properties={}}
MessageViewImpl{messageId=012EDEFF29778E8EE705E496A500000000, topic=test_topic, bornHost=buguniaodeMacBook-Pro.local, bornTimestamp=1708326053159, endpoints=ipv4:172.18.0.3:8081, deliveryAttempt=1, tag=test_tag, keys=[test_keys], messageGroup=null, deliveryTimestamp=null, properties={}}
MessageViewImpl{messageId=012EDEFF29778E8DE405E4934E00000000, topic=test_topic, bornHost=buguniaodeMacBook-Pro.local, bornTimestamp=1708325198285, endpoints=ipv4:172.18.0.3:8081, deliveryAttempt=1, tag=test_tag, keys=[test_keys], messageGroup=null, deliveryTimestamp=null, properties={}}
- 这里只编写了
SimpleConsumer这一类消费者。
到这里就全部结束了,需要注意的点是RocketMQ5 版本Local 模式启动后会开启Proxy 的 8081 端口,你需要连接的是Proxy的地址和端口。还有 docker 容器内部和宿主机网络通信的问题。
参考:
标题:mac docker 单机部署 RocketMQ5 以及 SDK 消息收发测试
作者:buguniao
地址:https://thunderdemon.cn/articles/2024/02/19/1708353710994.html
