1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
| public class DiuDiuRocketMqProducer { private Logger logger = LoggerFactory.getLogger(DiuDiuRocketMqProducer.class);
public DefaultMQProducer producer; private DiuDiuDefaultMessageCallBack callBack = new DiuDiuDefaultMessageCallBack(); private DiuDiuMessageQueueSelector selector = new DiuDiuMessageQueueSelector();
public DiuDiuRocketMqProducer() {
}
public void send (Message message) { }
public void send(String topic, String key, byte[] message) { Message msg = new Message();
msg.setBody(message); msg.setTopic(topic); msg.setKey(key); String uniqueId = "1234444"; DiuDiuSendResult diuDiuSendResult = new DiuDiuSendResult(topic, uniqueId); try { logger.debug("发送消息:topic " + topic + ", key " + key); producer.send(msg, new SendCallBack() { @Override public void onSuccess(SendResult sendResult) { diuDiuSendResult.setMsgId(sendResult.getMsgId()); diuDiuSendResult.setMessageQueue(diuDiuSendResult.getMessageQueue()); diuDiuSendResult.setQueueOffset(sendResult.getQueueOffset()); callBack.onSendSuccess(diuDiuSendResult); }
@Override public void onException(Throwable e) { diuDiuSendResult.setException(e); callBack.onSendFaild(diuDiuSendResult); } }); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }
public void send(String topic, String key, byte[] message, final IMessageCallBack messageCallBack) { Message msg = new Message();
msg.setBody(message); msg.setTopic(topic); msg.setKey(key); String uniqueId = "1234444"; DiuDiuSendResult diuDiuSendResult = new DiuDiuSendResult(topic, uniqueId); try { logger.debug("发送消息:topic " + topic + ", key " + key); producer.send(msg, new SendCallBack() { @Override public void onSuccess(SendResult sendResult) { diuDiuSendResult.setMsgId(sendResult.getMsgId()); diuDiuSendResult.setMessageQueue(diuDiuSendResult.getMessageQueue()); diuDiuSendResult.setQueueOffset(sendResult.getQueueOffset()); messageCallBack.onSendSuccess(diuDiuSendResult); }
@Override public void onException(Throwable e) { diuDiuSendResult.setException(e); messageCallBack.onSendFaild(diuDiuSendResult); } }); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }
public void send(Integer sequence, DiuDiuMessageQueueSelector selector, String topic, String key, byte[] message) { Message msg = new Message();
msg.setBody(message); msg.setTopic(topic); msg.setKey(key);
String uniqueId = "1234444"; DiuDiuSendResult diuDiuSendResult = new DiuDiuSendResult(topic, uniqueId); try { logger.debug("发送消息:topic " + topic + ", key " + key); producer.send(msg, selector, sequence, new SendCallBack() { @Override public void onSuccess(SendResult sendResult) { diuDiuSendResult.setMsgId(sendResult.getMsgId()); diuDiuSendResult.setMessageQueue(diuDiuSendResult.getMessageQueue()); diuDiuSendResult.setQueueOffset(sendResult.getQueueOffset()); messageCallBack.onSendSuccess(diuDiuSendResult); }
@Override public void onException(Throwable e) { diuDiuSendResult.setException(e); messageCallBack.onSendFaild(diuDiuSendResult); } }); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }
public void send(Message message) { send(message.getTopic(), message.getKey(), message.encoder()); }
public void send(Message message, IMessageCallBack callBack) { send(message.getTopic(), message.getKey(), message.encoder(), callBack); }
public void send(Integer sequence, String topic, String key, byte[] message) { send(sequence, selector, topic, key, message); }
public void send(Integer sequence, Message message) { send(sequence, selector, message.getTopic(), message.getKey(), message.encoder()); }
public void init(DiuDiuMqProducerConfig config) { producer = new DefaultMQProducer(); producer.setNamesrvAddr(config.getClusterAddresses()); producer.setProducerGroup(config.getProducerName()); producer.setInstanceName(config.getProducerName() + new Random(100).nextInt()); try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); } }
public void destroy() { producer.shutdown(); } }
|
如果您有任何关于博客内容的相关讨论,欢迎前往微信公众号(网站右上角)后台给我留言。