mac docker 单机部署 RocketMQ5 以及 SDK 消息收发测试

2024-02-19/2024-02-20
, 1,027 浏览

由于 RocketMQ 官网没有 docker 部署方式推荐,网上的教程看了下写的不是很好,所以自己做一个整理、记录。

1、创建容器共享网络

RocketMQ中有多个服务,需要创建多个容器,创建 docker 网络便于容器间相互通信。

rocker network create rocketmq
# docker network ls #列出所有docker网络
# docker network inspect rocketmq #查看网络信息

2、部署RocketMQ NameServer

# 创建映射目录, 并给rocketmq用户写权限
mkdir -p  /Users/buguniao/docker/volumes/rocketmq/logs/
chmod o+w /Users/buguniao/docker/volumes/rocketmq/logs/

# 启动NameServer
docker run -d --name mqnamesrv -p 9876:9876 --network rocketmq 
-v /Users/buguniao//docker/volumes/rocketmq/logs/:/home/rocketmq/logs 
apache/rocketmq:5.1.3 sh mqnamesrv

随后确认服务是否启动成功

docker logs -f mqnamesrv

1708353122869.jpg
如图启动成功

3、部署Broker+Proxy

Local 模式,Broker 和 Proxy 是同进程部署,Proxy本身无状态,不需要额外启动Proxy。
部署命令:

docker run -d --name mqbroker -p 10911:10911 -p 10909:10909 --network rocketmq
-v /Users/buguniao/docker/volumes/rocketmq/log/:/home/rocketmq/logs
apache/rocketmq:5.1.3 sh mqbroker -n mqnamesrv:9876 --enable-proxy autoCreateTopicEnable=true
-c /home/rocketmq/rocketmq-5.1.3/conf/broker.conf

查看日志检查是否启动成功

docker exec -it mqbroker bash -c "tail -n 10 /home/rocketmq/logs/rocketmqlogs/proxy.log"

159810407.png

4、部署 RocketMQ Dashboard

现在来安装RocketMQ Dashboard

RocketMQ Dashboard 是 RocketMQ 的管控利器,为用户提供客户端和应用程序的各种事件、性能的统计信息,支持以可视化工具代替 Topic 配置、Broker 管理等命令行操作。

部署方式可见官网,点击链接
按照官网描述操作

  1. 拉取镜像
$ docker pull apacherocketmq/rocketmq-dashboard:latest
  1. 运行容器
docker run -d --name mqconsole -p 8098:8080 --network rocketmq \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=mqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \
styletang/rocketmq-console-ng

查看http://localhost:8098/可见控制台并在Cluster下查看到 broker 代表部署成功。1708391546872.jpg

5、使用RocketMQ SDK实现简单的收发消息

创建一个简单的 maven 工程,引入 junit 和 rocketmq-client-java

<dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>5.0.5</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.36</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.12</version>
        </dependency>
</dependencies>

创建测试类 ProducerExample

package mqtest;

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerExample {
    private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);

    public static void main(String[] args) throws ClientException {
        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
        String endpoint = "172.18.0.3:8081";
        // 消息发送的目标Topic名称,需要提前创建。
        String topic = "test_topic";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
        ClientConfiguration configuration = builder.build();
        // 初始化Producer时需要设置通信配置以及预绑定的Topic。
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();
        // 普通消息发送。
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                // 设置消息索引键,可根据关键字精确查找某条消息。
                .setKeys("messageKey")
                // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("messageTag")
                // 消息体。
                .setBody("messageBody".getBytes())
                .build();
        try {
            // 发送消息,需要关注发送结果,并捕获失败等异常。
            SendReceipt sendReceipt = producer.send(message);
            logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
            logger.error("Failed to send message", e);
        }
        // producer.close();
    }
}

运行后报错了:

Exception in thread "main" java.lang.IllegalStateException: Expected the service ProducerImpl-0 [FAILED] to be RUNNING, but the service has FAILED
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:381)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractService.awaitRunning(AbstractService.java:305)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractIdleService.awaitRunning(AbstractIdleService.java:165)
	at org.apache.rocketmq.client.java.impl.producer.ProducerBuilderImpl.build(ProducerBuilderImpl.java:93)
	at mqtest.ProducerExample.main(ProducerExample.java:28)
Caused by: java.util.concurrent.ExecutionException: org.apache.rocketmq.shaded.io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 2.922725542s. [buffered_nanos=2925292375, waiting_for_connection]
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:588)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:567)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:91)
	at org.apache.rocketmq.client.java.impl.ClientImpl.startUp(ClientImpl.java:188)
	at org.apache.rocketmq.client.java.impl.producer.ProducerImpl.startUp(ProducerImpl.java:114)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractIdleService$DelegateService$1.run(AbstractIdleService.java:62)
	at org.apache.rocketmq.shaded.com.google.common.util.concurrent.Callables.lambda$threadRenaming$3(Callables.java:103)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.rocketmq.shaded.io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 2.922725542s. [buffered_nanos=2925292375, waiting_for_connection]
	at org.apache.rocketmq.shaded.io.grpc.Status.asRuntimeException(Status.java:539)
	at org.apache.rocketmq.shaded.io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544)
	at org.apache.rocketmq.shaded.io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at org.apache.rocketmq.shaded.io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at org.apache.rocketmq.shaded.io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:563)
	at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
	at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:744)
	at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
	at org.apache.rocketmq.shaded.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at org.apache.rocketmq.shaded.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more

反应过来宿主机和 docker 容器内网络并不相通。

网络隔离导致生产者消息推送失败

为了使宿主机可以访问容器内部网络,这边推荐一个工具
desktop-docker-connector

按照文档操作:

  1. 先安装Mac端的服务mac-docker-connector
$ brew tap wenjunxiao/brew
$ brew install docker-connector
  1. 首次配置通过以下命令把所有Docker所有bridge子网放入配置文件
$ docker network ls --filter driver=bridge --format "{{.ID}}" | xargs docker network inspect --format "route {{range .IPAM.Config}}{{.Subnet}}{{end}}" >> "$(brew --prefix)/etc/docker-connector.conf"
  1. 启动 mac 端服务
$ sudo brew services start docker-connector
  1. 安装Docker端的容器mac-docker-connector
$ docker pull wenjunxiao/mac-docker-connector
  1. 启动容器
$ docker run -it -d --restart always --net host --cap-add NET_ADMIN --name mac-connector wenjunxiao/mac-docker-connector
  • 配置参数请见github 的使用文档

启动后再次进行测试,结果:

09:43:46.543 [main] INFO mqtest.ProducerExample - Send message successfully, messageId=012EDEFF29778EA86105E59DD200000000

见Dashboard
查看消息.jpg

消息进来了,没有问题。

继续编写消费者测试用例

package mqtest;

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.List;

public class SimpleConsumerExample {
    private static final Logger logger = LoggerFactory.getLogger(SimpleConsumerExample.class);

    private SimpleConsumerExample() {
    }

    @SuppressWarnings({"resource"})
    public static void main(String[] args) throws ClientException {
        // 消费示例:使用 SimpleConsumer 消费普通消息,主动获取消息处理并提交。
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        String topic = "test_topic";
        String tag = "test_tag";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
                // 设置消费者分组。
                .setConsumerGroup("test_group")
                // 设置接入点。
                .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("172.18.0.3:8081").build())
                // 设置预绑定的订阅关系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 设置从服务端接受消息的最大等待时间
                .setAwaitDuration(Duration.ofSeconds(3L))
                .build();
        try {
            // SimpleConsumer 需要主动获取消息,并处理。
            List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
            messageViewList.forEach(messageView -> {
                System.out.println(messageView);
                // 消费处理完成后,需要主动调用 ACK 提交消费结果。
                try {
                    simpleConsumer.ack(messageView);
                } catch (ClientException e) {
                    logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);
                }
            });
        } catch (ClientException e) {
            // 如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
            logger.error("Failed to receive message", e);
        }
    }
}

测试通过

MessageViewImpl{messageId=012EDEFF29778E9CC505E4F4C200000000, topic=test_topic, bornHost=buguniaodeMacBook-Pro.local, bornTimestamp=1708350146358, endpoints=ipv4:172.18.0.3:8081, deliveryAttempt=1, tag=test_tag, keys=[test_keys], messageGroup=null, deliveryTimestamp=null, properties={}}
MessageViewImpl{messageId=012EDEFF29778E8EE705E496A500000000, topic=test_topic, bornHost=buguniaodeMacBook-Pro.local, bornTimestamp=1708326053159, endpoints=ipv4:172.18.0.3:8081, deliveryAttempt=1, tag=test_tag, keys=[test_keys], messageGroup=null, deliveryTimestamp=null, properties={}}
MessageViewImpl{messageId=012EDEFF29778E8DE405E4934E00000000, topic=test_topic, bornHost=buguniaodeMacBook-Pro.local, bornTimestamp=1708325198285, endpoints=ipv4:172.18.0.3:8081, deliveryAttempt=1, tag=test_tag, keys=[test_keys], messageGroup=null, deliveryTimestamp=null, properties={}}
  • 这里只编写了 SimpleConsumer 这一类消费者。

到这里就全部结束了,需要注意的点是RocketMQ5 版本Local 模式启动后会开启Proxy 的 8081 端口,你需要连接的是Proxy的地址和端口。还有 docker 容器内部和宿主机网络通信的问题。

参考:

  1. Docker 部署 RocketMQ 5.x
  2. Apache RocketMQ
  3. Mac宿主机访问Docker容器网络
  4. desktop-docker-connector 操作文档

标题:mac docker 单机部署 RocketMQ5 以及 SDK 消息收发测试
作者:buguniao
地址:https://thunderdemon.cn/articles/2024/02/19/1708353710994.html

       
       
取消