工程结构
此处利用RocketMQ先构建一个消息生产者体系,因此,如下结构图,不涉及消费者体系。完整的工程结构不应该如此结构图这般。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 . ├── build.gradle ├── mq-lib.iml └── src └── main └── java └── cn └── diudiu └── mq ├── config │ ├── DiuDiuMqConfig.java │ └── DiuDiuMqProducerConfig.java ├── message │ ├── IMessageHandler.java │ ├── Message.java │ └── body │ ├── LoginMessage.java │ └── UserMessage.java ├── producer │ ├── IMessageCallBack.java │ ├── IProducer.java │ ├── DiuDiuMqProducer.java │ └── DiuDiuSendResult.java ├── rocketmq │ └── producer │ ├── DiuDiuDefaultMessageCallBack.java │ └── DiuDiuRocketMqProducer.java ├── selector │ └── DiuDiuMessageQueueSelector.java └── topic ├── OrderTopic.java └── UserTopic.java
依赖管理
在build.gradle中引入rocketMq的依赖。
1 2 3 4 dependencies { compile 'com.alibaba.rocketmq:rocketmq-client:3.2 .6 ' }
集群配置
MQ集群的配置,多台集群之间使用逗号分隔。后期应该可以增加list配置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class DiuDiuMqConfig { private String clusterAddresses; public String getClusterAddresses () { return clusterAddresses; } public String setClusterAddresses (String clusterAddresses) { this .clusterAddresses = clusterAddresses; } }
生产者
生成者配置
生产者配置类,需要指定生产者的名字,发送超时时间(默认3000ms),消息压缩(默认超过4k才压缩)以及大小限制(默认最大128k),发送失败重试次数(默认2次)等参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 pulic class DiuDiuMqProducerConfig extends DiuDiuMqConfig { private static final int DEFAULT_SEND_MSG_TIMEOUT = 3000 ; private String producerName; private int sendMsgTimeout = DEFAULT_SEND_MSG_TIMEOUT; private int compressMsgBodyOverHowMuch = 1024 * 4 ; private int retryTimesWhenSendFailed = 2 ; private int maxMessageSize = 1024 * 128 ; }
生产消息
消息体
封装一个包含topic、key以及消息体Body的Message,主要用于发送消息的接口可以将消息组装好,调用IProducer.send接口发送即可。
为了方便对消息传输,该类中增加了依据ProtostuffSerialize序列化对象的方法encode。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class Message <T extends Base > extends Base { private static final Log LOGGER = LogFactory.getLog(Message.class); private String topic; private String key; private T body; public Message () { } private static final Map<Class, Object> SERIALIZERS = new ConcurrentHashMap <>(); @SuppressWarnings("unchecked") private ProtostuffSerialize<T> getSerialize () { Class clazz = this .body.getClass(); if (SERIALIZERS.containsKey(clazz)) { return (ProtostuffSerialize) SERIALIZERS.get(clazz); } ProtostuffSerialize serialize = ProtostuffSerialize.getInstance(clazz); SERIALIZERS.put(clazz, serialize); return serialize; } public byte [] encoder() { return getSerialize().encoder(body); } }
接口定义
消息发送方式分为两类:有序发送(sendByOrder)和随机发送(send)。这样做的目的也是为了消息消费时可能需需求有序消费或随机消费。
关于消息有序消费请看下面的有序消息队列。
清单:IProducer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public interface IProducer { void send (Message message) ; void sendByOrder (Integer sequence, Message message) ; void send (String topic, String key, byte [] message) ; void sendByOrder (Integer sequence, String topic, String key, byte [] message) ; void send (Message message, IMessageCallBack callBack) ; void send (String topic, String key, byte [] message, IMessageCallBack callBack) ; }
实现生产
清单:DiuDiuMqProducer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class DiuDiuMqProducer extends DiuDiuMqProducerConfig implements IProducer { private DiuDiuRocketMqProducer delegate = new DiuDiuRocketMqProducer (); @Override public void send (Message message) { delegate.send(message); } @Override public void sendOrder (Integer sequence, Message message) { delegate.send(sequence, message); } @Override public void send (String topic, String key, byte [] message) { delegate.send(topic, key, message); } @Override public void sendOrder (Integer sequence, String topic, String key, byte [] message) { delegate.send(sequence, topic, key, message); } @Override public void send (String topic, String key, byte [] message, IMessageCallBack callBack) { delegate.send(topic, key, message, callBack); } @Override public void send (Message message, IMessageCallBack callBack) { delegate.send(message, callBack); } public void init () { delegate.init(this ); } public void destory () { delegate.destroy(); } }
清单:DiuDiuRocketMqProducer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 public class DiuDiuRocketMqProducer { private Logger logger = LoggerFactory.getLogger(DiuDiuRocketMqProducer.class); public DefaultMQProducer producer; private DiuDiuDefaultMessageCallBack callBack = new DiuDiuDefaultMessageCallBack (); private DiuDiuMessageQueueSelector selector = new DiuDiuMessageQueueSelector (); public DiuDiuRocketMqProducer () { } public void send (Message message) { } public void send (String topic, String key, byte [] message) { Message msg = new Message (); msg.setBody(message); msg.setTopic(topic); msg.setKey(key); String uniqueId = "1234444" ; DiuDiuSendResult diuDiuSendResult = new DiuDiuSendResult (topic, uniqueId); try { logger.debug("发送消息:topic " + topic + ", key " + key); producer.send(msg, new SendCallBack () { @Override public void onSuccess (SendResult sendResult) { diuDiuSendResult.setMsgId(sendResult.getMsgId()); diuDiuSendResult.setMessageQueue(diuDiuSendResult.getMessageQueue()); diuDiuSendResult.setQueueOffset(sendResult.getQueueOffset()); callBack.onSendSuccess(diuDiuSendResult); } @Override public void onException (Throwable e) { diuDiuSendResult.setException(e); callBack.onSendFaild(diuDiuSendResult); } }); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public void send (String topic, String key, byte [] message, final IMessageCallBack messageCallBack) { Message msg = new Message (); msg.setBody(message); msg.setTopic(topic); msg.setKey(key); String uniqueId = "1234444" ; DiuDiuSendResult diuDiuSendResult = new DiuDiuSendResult (topic, uniqueId); try { logger.debug("发送消息:topic " + topic + ", key " + key); producer.send(msg, new SendCallBack () { @Override public void onSuccess (SendResult sendResult) { diuDiuSendResult.setMsgId(sendResult.getMsgId()); diuDiuSendResult.setMessageQueue(diuDiuSendResult.getMessageQueue()); diuDiuSendResult.setQueueOffset(sendResult.getQueueOffset()); messageCallBack.onSendSuccess(diuDiuSendResult); } @Override public void onException (Throwable e) { diuDiuSendResult.setException(e); messageCallBack.onSendFaild(diuDiuSendResult); } }); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public void send (Integer sequence, DiuDiuMessageQueueSelector selector, String topic, String key, byte [] message) { Message msg = new Message (); msg.setBody(message); msg.setTopic(topic); msg.setKey(key); String uniqueId = "1234444" ; DiuDiuSendResult diuDiuSendResult = new DiuDiuSendResult (topic, uniqueId); try { logger.debug("发送消息:topic " + topic + ", key " + key); producer.send(msg, selector, sequence, new SendCallBack () { @Override public void onSuccess (SendResult sendResult) { diuDiuSendResult.setMsgId(sendResult.getMsgId()); diuDiuSendResult.setMessageQueue(diuDiuSendResult.getMessageQueue()); diuDiuSendResult.setQueueOffset(sendResult.getQueueOffset()); messageCallBack.onSendSuccess(diuDiuSendResult); } @Override public void onException (Throwable e) { diuDiuSendResult.setException(e); messageCallBack.onSendFaild(diuDiuSendResult); } }); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public void send (Message message) { send(message.getTopic(), message.getKey(), message.encoder()); } public void send (Message message, IMessageCallBack callBack) { send(message.getTopic(), message.getKey(), message.encoder(), callBack); } public void send (Integer sequence, String topic, String key, byte [] message) { send(sequence, selector, topic, key, message); } public void send (Integer sequence, Message message) { send(sequence, selector, message.getTopic(), message.getKey(), message.encoder()); } public void init (DiuDiuMqProducerConfig config) { producer = new DefaultMQProducer (); producer.setNamesrvAddr(config.getClusterAddresses()); producer.setProducerGroup(config.getProducerName()); producer.setInstanceName(config.getProducerName() + new Random (100 ).nextInt()); try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } public void destroy () { producer.shutdown(); } }
发送消息回调处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class DiuDiuDefaultMessageCallBack implements IMessageCallBack { Logger interfaceLogger = LoggerFactory.getLogger(LoggerName.INTERFACE); Logger mqLogger = LoggerFactory.getLogger(LoggerName.MQ); @Override public void onSendSuccess (DiuDiuSendResult diuDiuSendResult) { String loggerMsg = "send message successfully, " + "topic " + diuDiuSendResult.getTopic() + " msgId " + diuDiuSendResult.getMsgId() + " queue " + diuDiuSendResult.getMessageQueue() + " offset " + diuDiuSendResult.getQueueOffset(); interfaceLogger.info(loggerMsg,diuDiuSendResult.getUniqueId()); mqLogger.info(loggerMsg,diuDiuSendResult.getMsgId()); } @Override public void onSendFaild (DiuDiuSendResult diuDiuSendResult) { String loggerMsg = "send message faild, cause by " + diuDiuSendResult.getException().getMessage(); interfaceLogger.info(loggerMsg,diuDiuSendResult.getUniqueId()); } }
DiuDiuSendResult
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class DiuDiuSendResult { private String uniqueId; private String topic; private String msgId; private MessageQueue messageQueue; private long queueOffset; private Throwable exception; public DiuDiuSendResult (String topic, String uniqueId) { this .topic = topic; this .uniqueId = uniqueId; } }
有序消息队列
1 2 3 4 5 6 7 8 9 public class DiuDiuMessageQueueSelector implements MessageQueueSelector { @Override public MessageQueue select (List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }