博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ入门-Topic模式
阅读量:7072 次
发布时间:2019-06-28

本文共 4007 字,大约阅读时间需要 13 分钟。

上篇《RabbitMQ入门-Routing直连模式》我们介绍了可以定向发送消息,并可以根据自定义规则派发消息。看起来,这个Routing模式已经算灵活的了,但是,这还不够,我们还有更加多样灵活的Topic模式。

Topic模式

image.png

  • 模型组成相较前几种没有什么变化,一个生产者P,一个交换机X,多个消息队列Q以及多个消费者C

  • 在Exchange派发消息到消息队列Queue所用的规则不同,我们看到了有符号"*"以及"#",可以认为是通配符

  • "*"用于匹配一个单词,比如"a","abc"等;"#"用于匹配0个或者多个单词,比如"", "abc", "abc.def"等

发送端

/** * Created by jackie on 17/8/7. */public class EmitLogDirect {    private static final String EXCHANGE_NAME = "direct_logs";    public static void main(String[] argv) throws Exception {        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("192.168.3.161");        Connection connection = factory.newConnection();        Channel channel = connection.createChannel();        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        String severity = getSeverity(argv);        String message = getMessage(argv);        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");        channel.close();        connection.close();    }    private static String getSeverity(String[] strings){        if (strings.length < 1)            return "info";        return strings[0];    }    private static String getMessage(String[] strings){        if (strings.length < 2)            return "Hello World!";        return joinStrings(strings, " ", 1);    }    private static String joinStrings(String[] strings, String delimiter, int startIndex) {        int length = strings.length;        if (length == 0 ) return "";        if (length < startIndex ) return "";        StringBuilder words = new StringBuilder(strings[startIndex]);        for (int i = startIndex + 1; i < length; i++) {            words.append(delimiter).append(strings[i]);        }        return words.toString();    }}
  • channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);这里指定的Exchagne模式为Topic模式

  • 通过String routingKey = getRouting(argv);实现在Program arguments中填写routing key参数

  • 通过String message = getMessage(argv);实现在Program arguments中填写发送的消息

这时候我们给Program argument赋值如下,并启动发送端程序

image.png

程序运行完,可以在RabbitMQ管理应用中看到名为“topic_logs”的Exchange。

接收端

/** * Created by jackie on 17/8/7. */public class ReceiveLogsDirect {    private static final String EXCHANGE_NAME = "direct_logs";    public static void main(String[] argv) throws Exception {        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("192.168.3.161");        Connection connection = factory.newConnection();        Channel channel = connection.createChannel();        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);        String queueName = channel.queueDeclare().getQueue();        if (argv.length < 1){            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");            System.exit(1);        }        for(String severity : argv){            channel.queueBind(queueName, EXCHANGE_NAME, severity);        }        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");        Consumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope,                                       AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");            }        };        channel.basicConsume(queueName, true, consumer);    }}
  • 和Routing模式异曲同工,声明与发送端一样的Exchange名称

  • 通过Program arguments得到的routing key的输入参数,并将其与Exchange绑定,这时候就可以使用灵活的通配符了

运行情况

我们将启动两个消费者,并分别制定两套Routing key的规则。

第一个消费者

image.png

第二个消费者

image.png

启动两个消费者后,使用发送端发送一条消息,我们可以发现两个消费者都通过Routing key规则派发到了消息

31ff000395b3dedc07a2

注意:实际上如果Routing key写成了“#”表示能够接受所有的消息,类似广播模式。

这就是Topic模式,到此为止,几大主要RabbitMQ模式已经讲完了。你是否对于RabbitMQ有了一个基本的了解了?

如果您觉得阅读本文对您有帮助,请点一下“推荐”按钮,您的“推荐”将是我最大的写作动力!如果您想持续关注我的文章,请扫描二维码,关注JackieZheng的微信公众号,我会将我的文章推送给您,并和您一起分享我日常阅读过的优质文章。

1240

转载于:https://www.cnblogs.com/bigdataZJ/p/rabbitmq7.html

你可能感兴趣的文章
孟子>正文 活动目录(Active Directory)域故障解决实例(转载)
查看>>
NoSuchMethodError: org.hibernate.SessionFactory.openSession
查看>>
textarea自动调整高宽
查看>>
python基础---面向对象高级
查看>>
vim的分屏
查看>>
windows客户端安装
查看>>
关于大型网站技术演进的思考(十八)--网站静态化处理—反向代理(10)
查看>>
Centos7怎么安装gnome桌面及远程桌面VNC
查看>>
mount挂载报错mount:you must specify the filesystem type
查看>>
yaf 模块与控制器
查看>>
Python 模块调用和global的用法
查看>>
Ubuntu 12.04 修改/etc/resolv.conf重启后还原成修改前状态解决办法
查看>>
Python—redis
查看>>
HPE牵手DDN打造整合的高性能服务器存储产品组合
查看>>
mycat分片规则之范围约定规则(auto-sharding-long)
查看>>
windows配置java环境变量
查看>>
python流程处理
查看>>
<kubernetes in action>看书笔记
查看>>
python密码破解工具patator
查看>>
众筹网站Kickstarter不准备上市:转型公益企业
查看>>