java輕量級消息中間件。https://github.com/uncodecn/uncode-mq
說明:目前只在部分項目中使用,歡迎學(xué)習(xí)交流。
說明:集群由多個Group組成一個隊列,每個Group由Master和Salve兩個Broker組成,整體無中心架構(gòu)。
1 下載文件解壓umq-*.tar.gz到任意目錄。
2 配置信息
在conf/config.properties文件中填寫相關(guān)信息。
mq.host=192.168.1.43 #本機ip mq.port=9000 #端口 mq.replica.host=192.168.7.131 #本機作為備機的主機ip mq.replica.fetch.size=100 #每次備份時同步的數(shù)據(jù)條數(shù),默認30 mq.replica.fetch.interval=2 #備份同步時間間隔,默認2秒 mq.log.dir=./data #數(shù)據(jù)存儲目錄,默認data,不建議修改 mq.data.persistence.interval=2 #數(shù)據(jù)持久化的時間間隔,默認2秒 mq.enable.zookeeper=true #是否使用zk,集群環(huán)境下必須使用 mq.zk.connect=192.168.1.14:2181 #zk地址 mq.zk.username=admin #zk用戶名 mq.zk.password=password #zk密碼 mq.zk.connectiontimeout.ms=6000 #zk連接超時時間 mq.zk.sessiontimeout.ms=6000 #zk連接session過期時間 mq.zk.data.persistence.interval=6000 #zk數(shù)據(jù)同步時間,默認6秒
3 啟動執(zhí)行startup.sh,停止執(zhí)行shutdown.sh,查看運行狀態(tài)執(zhí)行status.sh,查看主題信息執(zhí)行info.sh,清除zk相關(guān)信息執(zhí)行zkclear.sh。
4 目錄
umq/conf 配置
umq/data 數(shù)據(jù)存儲
umq/logs 日志
umq/lib 依賴jar
生產(chǎn)者為單例,必須最少執(zhí)行一次connect操作,連接成功后不會重復(fù)connect。
String cfg = "file:/gitlib/uncode-mq/conf/config.properties"; Producer.getInstance().connect(cfg); for(int i=0;i<10000;i++){ List<Topic> list = new ArrayList<Topic>(); Topic topic = new Topic(); topic.setTopic("umq"); topic.addContent("umq作者juny=>"+i); list.add(topic); Producer.getInstance().send(list); } 或 Properties config = new Properties(); config.setProperty("mq.port", "9000"); config.setProperty("mq.zk.connect", "192.168.1.14:2181"); config.setProperty("mq.enable.zookeeper", "true"); ServerConfig serverConfig = new ServerConfig(config); Producer.getInstance().connect(serverConfig); for(int i=0;i<10000;i++){ List<Topic> list = new ArrayList<Topic>(); Topic topic = new Topic(); topic.setTopic("umq"); topic.addContent("umq作者juny=>"+i); list.add(topic); Producer.getInstance().send(list); }
1 普通方式
String cfg = "file:/gitlib/uncode-mq/conf/config.properties"; Consumer.runningConsumerRunnable(cfg); Consumer.addSubscriber(new ConsumerSubscriber(){ //訂閱主題 @Override public List<String> subscribeToTopic() { List<String> tps = new ArrayList<String>(); tps.add("umq"); return tps; } //通知 @Override public void notify(Topic topic) { System.err.println("consumer subscriber:"+topic.toString()); } });
2 與spring集成
@Service public class MyConsumerSubscriber implements ConsumerSubscriber { public static final String CFG = "file:/gitlib/uncode-mq/conf/config.properties"; @Autowired LogService logServiceImpl; public ExpressRecordConsumerSubscriber() { //注冊訂閱者 try { Consumer.runningConsumerRunnable(CFG); Consumer.addSubscriber(this); } catch (ConnectException e) { e.printStackTrace(); } } //訂閱主題 @Override public List<String> subscribeToTopic() { List<String> tps = new ArrayList<String>(); tps.add("umq"); return tps; } @Override public void notify(Topic topic) { //處理邏輯 } }
更多建議: