定时/延时消息

发布日期:2026-01-05 16:30:05 分类:365bet官方平台开户 浏览:2309

定时/延时消息发送import org.apache.rocketmq.client.apis.*;

import org.apache.rocketmq.client.apis.message.Message;

import org.apache.rocketmq.client.apis.producer.Producer;

import org.apache.rocketmq.client.apis.producer.SendReceipt;

public class ProducerExample {

public static void main(String[] args) throws ClientException {

/**

* 实例接入点,从控制台实例详情页的接入点页签中获取。

* 如果是在阿里云ECS内网访问,建议填写VPC接入点。

* 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。

*/

String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";

//消息发送的目标Topic名称,需要提前在控制台创建,如果不创建直接使用会返回报错。

String topic = "Your Topic";

ClientServiceProvider provider = ClientServiceProvider.loadService();

ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);

/**

* 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。

* 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。

* 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。

*/

//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));

ClientConfiguration configuration = builder.build();

Producer producer = provider.newProducerBuilder()

.setTopics(topic)

.setClientConfiguration(configuration)

.build();

//定时/延时消息发送

//以下示例表示:延迟时间为10分钟之后的Unix时间戳。

long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;

Message message = provider.newMessageBuilder()

.setTopic("topic")

//设置消息索引键,可根据关键字精确查找某条消息。

.setKeys("messageKey")

//设置消息Tag,用于消费端根据指定Tag过滤消息。

.setTag("messageTag")

.setDeliveryTimestamp(deliverTimeStamp)

//消息体

.setBody("messageBody".getBytes())

.build();

try {

//发送消息,需要关注发送结果,并捕获失败等异常。

SendReceipt sendReceipt = producer.send(message);

System.out.println(sendReceipt.getMessageId());

} catch (ClientException e) {

e.printStackTrace();

}

}

}PushConsumer消费import org.apache.rocketmq.client.apis.*;

import org.apache.rocketmq.client.apis.consumer.ConsumeResult;

import org.apache.rocketmq.client.apis.consumer.FilterExpression;

import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;

import org.apache.rocketmq.client.apis.consumer.PushConsumer;

import org.apache.rocketmq.shaded.org.slf4j.Logger;

import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;

import java.util.Collections;

public class PushConsumerExample {

private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);

private PushConsumerExample() {

}

public static void main(String[] args) throws ClientException, IOException, InterruptedException {

/**

* 实例接入点,从控制台实例详情页的接入点页签中获取。

* 如果是在阿里云ECS内网访问,建议填写VPC接入点。

* 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。

*/

String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";

//指定需要订阅哪个目标Topic,Topic需要提前在控制台创建,如果不创建直接使用会返回报错。

String topic = "Your Topic";

//为消费者指定所属的消费者分组,Group需要提前在控制台创建,如果不创建直接使用会返回报错。

String consumerGroup = "Your ConsumerGroup";

final ClientServiceProvider provider = ClientServiceProvider.loadService();

ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()

.setEndpoints(endpoints)

/**

* 如果使用公网接入点访问Serverless实例,需要设置实例ID。

*/

//.setNamespace("InstanceId")

/**

* 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。用户名和密码在控制台访问控制的智能身份识别页签中获取。

* 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。

* 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。

*/

//.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"))

.build();

//订阅消息的过滤规则,表示订阅所有Tag的消息。

String tag = "*";

FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);

//初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。

PushConsumer pushConsumer = provider.newPushConsumerBuilder()

.setClientConfiguration(clientConfiguration)

//设置消费者分组。

.setConsumerGroup(consumerGroup)

//设置预绑定的订阅关系。

.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))

//设置消费监听器。

.setMessageListener(messageView -> {

//处理消息并返回消费结果。

// LOGGER.info("Consume message={}", messageView);

System.out.println("Consume Message: " + messageView);

return ConsumeResult.SUCCESS;

})

.build();

Thread.sleep(Long.MAX_VALUE);

//如果不需要再使用PushConsumer,可关闭该进程。

//pushConsumer.close();

}

}

SimpleConsumer消费import org.apache.rocketmq.client.apis.*;

import org.apache.rocketmq.client.apis.consumer.FilterExpression;

import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;

import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;

import org.apache.rocketmq.client.apis.message.MessageId;

import org.apache.rocketmq.client.apis.message.MessageView;

import org.apache.rocketmq.shaded.org.slf4j.Logger;

import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;

import java.time.Duration;

import java.util.Collections;

import java.util.List;

public class SimpleConsumerExample {

private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);

private SimpleConsumerExample() {

}

public static void main(String[] args) throws ClientException, IOException {

/**

* 实例接入点,从控制台实例详情页的接入点页签中获取。

* 如果是在阿里云ECS内网访问,建议填写VPC接入点。

* 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。

*/

String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";

//指定需要订阅哪个目标Topic,Topic需要提前在控制台创建,如果不创建直接使用会返回报错。

String topic = "Your Topic";

//为消费者指定所属的消费者分组,Group需要提前在控制台创建,如果不创建直接使用会返回报错。

String consumerGroup = "Your ConsumerGroup";

final ClientServiceProvider provider = ClientServiceProvider.loadService();

ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()

.setEndpoints(endpoints)

/**

* 如果使用公网接入点访问Serverless实例,需要设置实例ID。

*/

//.setNamespace("InstanceId")

/**

* 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。用户名和密码在控制台访问控制的智能身份识别页签中获取。

* 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。

* 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。

*/

//.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"))

.build();

Duration awaitDuration = Duration.ofSeconds(10);

//订阅消息的过滤规则,表示订阅所有Tag的消息。

String tag = "*";

FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);

//初始化SimpleConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。

SimpleConsumer consumer = provider.newSimpleConsumerBuilder()

.setClientConfiguration(clientConfiguration)

//设置消费者分组。

.setConsumerGroup(consumerGroup)

//设置长轮询超时时间。

.setAwaitDuration(awaitDuration)

//设置预绑定的订阅关系。

.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))

.build();

//设置本次拉取的最大消息条数。

int maxMessageNum = 16;

//设置消息的不可见时间。

Duration invisibleDuration = Duration.ofSeconds(10);

//SimpleConsumer需要客户端一直主动循环获取消息,并进行消费处理。

//如果需要提高消费实时性,建议多线程并发拉取。

while (true) {

final List messages = consumer.receive(maxMessageNum, invisibleDuration);

messages.forEach(messageView -> {

// LOGGER.info("Received message: {}", messageView);

System.out.println("Received message: " + messageView);

});

for (MessageView message : messages) {

final MessageId messageId = message.getMessageId();

try {

//消费处理完成后,需要主动调用ACK向服务端提交消费结果。

consumer.ack(message);

System.out.println("Message is acknowledged successfully, messageId= " + messageId);

//LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);

} catch (Throwable t) {

t.printStackTrace();

//LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);

}

}

}

// 如果不需要再使用SimpleConsumer,可关闭该进程。

// consumer.close();

}

}