Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.samples.springboot.consumer;

import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.api.consumer.ConsumeResult;
import org.apache.rocketmq.client.api.message.MessageView;
import org.apache.rocketmq.client.support.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

/**
* SQL92 Message Filter Consumer Example
*
* This consumer uses SQL92 expression to filter messages, consuming only messages that meet specific conditions:
* - type = 'vip': VIP type messages
* - amount > 500: Amount greater than 500
*
* Configuration properties that need to be set in application.properties:
* demo.sql92.rocketmq.endpoints=localhost:8081
* demo.sql92.rocketmq.topic=orderTopic
* demo.sql92.rocketmq.consumer-group=sql92VipConsumerGroup
* demo.sql92.rocketmq.tag=(type = 'vip' AND amount > 500)
* demo.sql92.rocketmq.filter-expression-type=sql92
*/
@Service
@RocketMQMessageListener(
endpoints = "${demo.sql92.rocketmq.endpoints:}",
topic = "${demo.sql92.rocketmq.topic:}",
consumerGroup = "${demo.sql92.rocketmq.consumer-group:}",
tag = "${demo.sql92.rocketmq.tag:}",
filterExpressionType = "${demo.sql92.rocketmq.filter-expression-type:sql92}"
)
public class SQL92FilterConsumer implements RocketMQListener {

private static final Logger log = LoggerFactory.getLogger(SQL92FilterConsumer.class);

@Override
public ConsumeResult consume(MessageView messageView) {
log.info("Received SQL92 filtered message - ID: {}, Topic: {}, Tag: {}",
messageView.getMessageId(),
messageView.getTopic(),
messageView.getTag().orElse(""));

// Print message properties
log.info("Message properties: {}", messageView.getProperties());

// Business logic can be added here
// For example: parse message content, process orders, etc.

return ConsumeResult.SUCCESS;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.samples.springboot.producer;

import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.core.RocketMQClientTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;

/**
* SQL92 Message Producer Example
*
* Demonstrates how to send messages with attributes so consumers can use SQL92 for filtering
*
* Configuration properties that need to be set in application.properties:
* rocketmq.producer.endpoints=localhost:8081
* rocketmq.producer.topic=orderTopic
* demo.rocketmq.order-topic=orderTopic
*/
@SpringBootApplication
public class SQL92ProducerApplication implements CommandLineRunner {

private static final Logger log = LoggerFactory.getLogger(SQL92ProducerApplication.class);

@Resource
private RocketMQClientTemplate rocketMQClientTemplate;

public static void main(String[] args) {
SpringApplication.run(SQL92ProducerApplication.class, args);
}

@Override
public void run(String... args) throws ClientException {
log.info("Starting to send SQL92 test messages...");

// Send VIP high-value orders (will be matched by SQL92 filter)
sendVipOrder(1L, 600.0, "Beijing");
sendVipOrder(2L, 800.0, "Shanghai");

// Send normal orders (will not be matched by SQL92 filter)
sendNormalOrder(3L, 200.0, "Guangzhou");
sendNormalOrder(4L, 300.0, "Shenzhen");

// Send VIP low-value orders (will not be matched by SQL92 filter)
sendVipOrder(5L, 100.0, "Hangzhou");

log.info("All messages have been sent");
}

/**
* Send VIP order
*/
private void sendVipOrder(Long orderId, Double amount, String region) throws ClientException {
Order order = new Order();
order.setId(orderId);
order.setAmount(amount);
order.setType("vip");
order.setRegion(region);

Message<?> message = MessageBuilder.withPayload(order)
.setHeader("type", "vip")
.setHeader("amount", amount)
.setHeader("region", region)
.build();

rocketMQClientTemplate.syncSendNormalMessage("orderTopic", message);
log.info("VIP order sent - ID: {}, Amount: {}, Region: {}", orderId, amount, region);
}

/**
* Send normal order
*/
private void sendNormalOrder(Long orderId, Double amount, String region) throws ClientException {
Order order = new Order();
order.setId(orderId);
order.setAmount(amount);
order.setType("normal");
order.setRegion(region);

Message<?> message = MessageBuilder.withPayload(order)
.setHeader("type", "normal")
.setHeader("amount", amount)
.setHeader("region", region)
.build();

rocketMQClientTemplate.syncSendNormalMessage("orderTopic", message);
log.info("Normal order sent - ID: {}, Amount: {}, Region: {}", orderId, amount, region);
}

/**
* Order entity class
*/
public static class Order {
private Long id;
private Double amount;
private String type;
private String region;

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

public Double getAmount() {
return amount;
}

public void setAmount(Double amount) {
this.amount = amount;
}

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}

public String getRegion() {
return region;
}

public void setRegion(String region) {
this.region = region;
}

@Override
public String toString() {
return "Order{" +
"id=" + id +
", amount=" + amount +
", type='" + type + '\'' +
", region='" + region + '\'' +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@

String ACCESS_KEY_PLACEHOLDER = "${rocketmq.simple-consumer.accessKey:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.simple-consumer.secretKey:}";
String TAG_PLACEHOLDER = "${rocketmq.simple-consumer.tag:}";
String TAG_PLACEHOLDER = "${rocketmq.simple-consumer.tag:*}";
String TOPIC_PLACEHOLDER = "${rocketmq.simple-consumer.topic:}";
String ENDPOINTS_PLACEHOLDER = "${rocketmq.simple-consumer.endpoints:}";
String CONSUMER_GROUP_PLACEHOLDER = "${rocketmq.simple-consumer.consumerGroup:}";
String FILTER_EXPRESSION_TYPE_PLACEHOLDER = "${rocketmq.simple-consumer.filterExpressionType:}";
String FILTER_EXPRESSION_TYPE_PLACEHOLDER = "${rocketmq.simple-consumer.filterExpressionType:tag}";

/**
* The component name of the Consumer configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
String SECRET_KEY_PLACEHOLDER = "${rocketmq.push-consumer.secret-key:}";
String ENDPOINTS_PLACEHOLDER = "${rocketmq.push-consumer.endpoints:}";
String TOPIC_PLACEHOLDER = "${rocketmq.push-consumer.topic:}";
String TAG_PLACEHOLDER = "${rocketmq.push-consumer.tag:}";
String TAG_PLACEHOLDER = "${rocketmq.push-consumer.tag:*}";

/**
* The property of "access-key".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public static class SimpleConsumer {
/**
* Tag of consumer.
*/
private String tag;
private String tag = "*";

/**
* Topic name of consumer.
Expand Down
Loading