diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/SQL92FilterConsumer.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/SQL92FilterConsumer.java new file mode 100644 index 00000000..4dceb658 --- /dev/null +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/SQL92FilterConsumer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.samples.springboot.consumer; + +import org.apache.rocketmq.client.annotation.RocketMQMessageListener; +import org.apache.rocketmq.client.api.consumer.ConsumeResult; +import org.apache.rocketmq.client.api.message.MessageView; +import org.apache.rocketmq.client.support.RocketMQListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +/** + * SQL92 Message Filter Consumer Example + * + * This consumer uses SQL92 expression to filter messages, consuming only messages that meet specific conditions: + * - type = 'vip': VIP type messages + * - amount > 500: Amount greater than 500 + * + * Configuration properties that need to be set in application.properties: + * demo.sql92.rocketmq.endpoints=localhost:8081 + * demo.sql92.rocketmq.topic=orderTopic + * demo.sql92.rocketmq.consumer-group=sql92VipConsumerGroup + * demo.sql92.rocketmq.tag=(type = 'vip' AND amount > 500) + * demo.sql92.rocketmq.filter-expression-type=sql92 + */ +@Service +@RocketMQMessageListener( + endpoints = "${demo.sql92.rocketmq.endpoints:}", + topic = "${demo.sql92.rocketmq.topic:}", + consumerGroup = "${demo.sql92.rocketmq.consumer-group:}", + tag = "${demo.sql92.rocketmq.tag:}", + filterExpressionType = "${demo.sql92.rocketmq.filter-expression-type:sql92}" +) +public class SQL92FilterConsumer implements RocketMQListener { + + private static final Logger log = LoggerFactory.getLogger(SQL92FilterConsumer.class); + + @Override + public ConsumeResult consume(MessageView messageView) { + log.info("Received SQL92 filtered message - ID: {}, Topic: {}, Tag: {}", + messageView.getMessageId(), + messageView.getTopic(), + messageView.getTag().orElse("")); + + // Print message properties + log.info("Message properties: {}", messageView.getProperties()); + + // Business logic can be added here + // For example: parse message content, process orders, etc. + + return ConsumeResult.SUCCESS; + } +} diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/producer/SQL92ProducerApplication.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/producer/SQL92ProducerApplication.java new file mode 100644 index 00000000..40692cd0 --- /dev/null +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/src/main/java/org/apache/rocketmq/samples/springboot/producer/SQL92ProducerApplication.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.samples.springboot.producer; + +import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.core.RocketMQClientTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import javax.annotation.Resource; + +/** + * SQL92 Message Producer Example + * + * Demonstrates how to send messages with attributes so consumers can use SQL92 for filtering + * + * Configuration properties that need to be set in application.properties: + * rocketmq.producer.endpoints=localhost:8081 + * rocketmq.producer.topic=orderTopic + * demo.rocketmq.order-topic=orderTopic + */ +@SpringBootApplication +public class SQL92ProducerApplication implements CommandLineRunner { + + private static final Logger log = LoggerFactory.getLogger(SQL92ProducerApplication.class); + + @Resource + private RocketMQClientTemplate rocketMQClientTemplate; + + public static void main(String[] args) { + SpringApplication.run(SQL92ProducerApplication.class, args); + } + + @Override + public void run(String... args) throws ClientException { + log.info("Starting to send SQL92 test messages..."); + + // Send VIP high-value orders (will be matched by SQL92 filter) + sendVipOrder(1L, 600.0, "Beijing"); + sendVipOrder(2L, 800.0, "Shanghai"); + + // Send normal orders (will not be matched by SQL92 filter) + sendNormalOrder(3L, 200.0, "Guangzhou"); + sendNormalOrder(4L, 300.0, "Shenzhen"); + + // Send VIP low-value orders (will not be matched by SQL92 filter) + sendVipOrder(5L, 100.0, "Hangzhou"); + + log.info("All messages have been sent"); + } + + /** + * Send VIP order + */ + private void sendVipOrder(Long orderId, Double amount, String region) throws ClientException { + Order order = new Order(); + order.setId(orderId); + order.setAmount(amount); + order.setType("vip"); + order.setRegion(region); + + Message message = MessageBuilder.withPayload(order) + .setHeader("type", "vip") + .setHeader("amount", amount) + .setHeader("region", region) + .build(); + + rocketMQClientTemplate.syncSendNormalMessage("orderTopic", message); + log.info("VIP order sent - ID: {}, Amount: {}, Region: {}", orderId, amount, region); + } + + /** + * Send normal order + */ + private void sendNormalOrder(Long orderId, Double amount, String region) throws ClientException { + Order order = new Order(); + order.setId(orderId); + order.setAmount(amount); + order.setType("normal"); + order.setRegion(region); + + Message message = MessageBuilder.withPayload(order) + .setHeader("type", "normal") + .setHeader("amount", amount) + .setHeader("region", region) + .build(); + + rocketMQClientTemplate.syncSendNormalMessage("orderTopic", message); + log.info("Normal order sent - ID: {}, Amount: {}, Region: {}", orderId, amount, region); + } + + /** + * Order entity class + */ + public static class Order { + private Long id; + private Double amount; + private String type; + private String region; + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + + public Double getAmount() { + return amount; + } + + public void setAmount(Double amount) { + this.amount = amount; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getRegion() { + return region; + } + + public void setRegion(String region) { + this.region = region; + } + + @Override + public String toString() { + return "Order{" + + "id=" + id + + ", amount=" + amount + + ", type='" + type + '\'' + + ", region='" + region + '\'' + + '}'; + } + } +} diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java index f0b942f5..4f7dd2f3 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java @@ -31,11 +31,11 @@ String ACCESS_KEY_PLACEHOLDER = "${rocketmq.simple-consumer.accessKey:}"; String SECRET_KEY_PLACEHOLDER = "${rocketmq.simple-consumer.secretKey:}"; - String TAG_PLACEHOLDER = "${rocketmq.simple-consumer.tag:}"; + String TAG_PLACEHOLDER = "${rocketmq.simple-consumer.tag:*}"; String TOPIC_PLACEHOLDER = "${rocketmq.simple-consumer.topic:}"; String ENDPOINTS_PLACEHOLDER = "${rocketmq.simple-consumer.endpoints:}"; String CONSUMER_GROUP_PLACEHOLDER = "${rocketmq.simple-consumer.consumerGroup:}"; - String FILTER_EXPRESSION_TYPE_PLACEHOLDER = "${rocketmq.simple-consumer.filterExpressionType:}"; + String FILTER_EXPRESSION_TYPE_PLACEHOLDER = "${rocketmq.simple-consumer.filterExpressionType:tag}"; /** * The component name of the Consumer configuration. diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java index 6f0f836d..2c9588e9 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/RocketMQMessageListener.java @@ -31,7 +31,7 @@ String SECRET_KEY_PLACEHOLDER = "${rocketmq.push-consumer.secret-key:}"; String ENDPOINTS_PLACEHOLDER = "${rocketmq.push-consumer.endpoints:}"; String TOPIC_PLACEHOLDER = "${rocketmq.push-consumer.topic:}"; - String TAG_PLACEHOLDER = "${rocketmq.push-consumer.tag:}"; + String TAG_PLACEHOLDER = "${rocketmq.push-consumer.tag:*}"; /** * The property of "access-key". diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java index 8f3d4941..38df9324 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java @@ -187,7 +187,7 @@ public static class SimpleConsumer { /** * Tag of consumer. */ - private String tag; + private String tag = "*"; /** * Topic name of consumer.