博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
一个简单的消息处理框架
阅读量:5846 次
发布时间:2019-06-18

本文共 3341 字,大约阅读时间需要 11 分钟。

项目里碰到一个不错的消息处理框架,把核心代码抽出来做个备注。核心源码分几块。

1、一个消息管理类,在hashmap里创建了多个阻塞式消息队列

package com.fredric.demo;import java.util.HashMap;import java.util.Map;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;public class QueueManager {    private Map map;    public QueueManager(){        map = new HashMap();        //创建两个阻塞式队列        map.put("queue_type1", new ArrayBlockingQueue(100));        map.put("queue_type2", new ArrayBlockingQueue(100));    }    public boolean put(String queueType, Map data){        BlockingQueue queue = (BlockingQueue) map.get(queueType);        if(null == queue){            throw new RuntimeException();        }else{            try {                queue.put(data);                return true;            } catch (InterruptedException e) {                e.printStackTrace();            }            return false;        }    }    public Map take(String queueType){        BlockingQueue queue = (BlockingQueue) map.get(queueType);        if(null == queue){            throw new RuntimeException();        }else{            try {                Map map = (Map) queue.take();                return map;            } catch (InterruptedException e) {                e.printStackTrace();            }            return null;        }    }}

2、构筑消息的处理,即消息的消费者。这里要注意的是消息具体的处理业务是单独再起线程的,如下:

package com.fredric.demo;import java.util.Map;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class Type1Consumer implements Runnable{    private ExecutorService executorService;    public Type1Consumer(){        //创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程        executorService = Executors.newCachedThreadPool();    }    public void run() {        do{            final Map msg = MsgProducer.queueManager.take("queue_type1");            if(msg != null){                executorService.execute(new Runnable() {                    public void run() {                        System.out.println("queue_type1 do msg handler" + msg.toString());                    }                });            }        }while(true);    }}

3、消息的生产者,即往消息队列管理类中插入数据

package com.fredric.demo;import java.util.Date;import java.util.HashMap;import java.util.Map;import java.util.concurrent.Executors;public class MsgProducer {    public static QueueManager queueManager = new QueueManager();    public void initJobQueue(){        //创建一个单线程化的线程池        Executors.newSingleThreadExecutor().execute(new Type1Consumer());        Executors.newSingleThreadExecutor().execute(new Type2Consumer());    }    public void sendType1Msg(String data){        Map map = new HashMap
(); map.put("time", new Date().toString()); map.put("data", data); queueManager.put("queue_type1", map); } public void sendType2Msg(String data){ Map map = new HashMap
(); map.put("time", new Date().toString()); map.put("data", data); queueManager.put("queue_type2", map); }}

4、Main方法测试如下:

public class App {    public static void main(String[] args){        MsgProducer producer = new MsgProducer();        producer.initJobQueue();        for(int i = 0; i < 10; i++){            producer.sendType1Msg("type1_msg");            producer.sendType2Msg("type2_msg");        }    }}

 

转载于:https://www.cnblogs.com/Fredric-2013/p/8533810.html

你可能感兴趣的文章
091031 T PowerShell Solution
查看>>
Android中SQLite应用详解
查看>>
【hibernate框架】一对多(多对一)双向CRUD-Cascade1
查看>>
【jQuery】2.jquery基础知识2
查看>>
Java线程:创建与启动(二)
查看>>
Android Studio 小技巧合集
查看>>
使用vmime收取邮件
查看>>
JPA2.1 中三个提升应用性能的新功能
查看>>
[程序员面试题精选100题]50.树的子结构
查看>>
Android 架构之高可用移动网络连接
查看>>
django数据模型中null和blank的区分
查看>>
2.理解JavaScript的浮点数
查看>>
Jenkins+XCode9自动打包错误处理
查看>>
Android自定义View-------Canvas动画的误解
查看>>
svg 编辑器的点击事件兼容pc端和移动端方案
查看>>
高性能图片压缩 —— libjpeg-turbo 的编译与集成
查看>>
阿里云新用户:巧用余额预警防止自动扣费
查看>>
Spring+MyBatis实现读写分离
查看>>
spring自己对AOP的运用 -- spring事物(transaction)原理
查看>>
git 常用命令
查看>>