博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka滞销瓶颈解决方案
阅读量:6112 次
发布时间:2019-06-21

本文共 4757 字,大约阅读时间需要 15 分钟。

hot3.png

一、问题背景

       kafka消费者程序在即时消费时,发现下午仍在消费当天上午数据,这或多或少发生了kafka消费程序的延时或阻塞,需要寻找相关解决方案,解放kafka消费程序。

二、解决方案

        1、方案1:解放kafka消费者,剥离kafka数据业务处理程序并异步调用。

        kafka消费者:

public class GateWayPushInoListener implements MessageListener
{ private static final Logger logger = LoggerFactory.getLogger(GateWayPushInoListener.class); @Autowired private IPushService pushService; @Override public void onMessage(ConsumerRecord
msg) { String jsonString = msg.value(); logger.warn("消费kafka数据:" + jsonString); //异步调用业务方法 pushService.pushMsg(jsonString); }}

        业务实现类:

//public class PushService extends BaseService implements IPushService    @Async("jpushExecutor")	@Override	public void pushMsg(String jsonString) {		// 解析进出站包、gps包		/********************* 泰国警报推送业务代码新增 ********************/		// 解析报警异常信息		// 所有		List
allRes = findRegIdsByType(); // 所有 List
all = null; // 异常开关门 List
openCloseDoors = null; // 离线 List
leaveLines = null; // 超速 List
overSpeeds = null; // 疑似抛锚 List
breakDowns = null; // 超载 List
overloadAlarms = null; // 紧急报警 List
emergencyAlarms = null; for (PushPlatformGroup pushPlatformGroup : allRes) { if (pushPlatformGroup.getFlag() == 0) { all = pushPlatformGroup.getPhonePlats(); } else if (pushPlatformGroup.getFlag() == 1) { openCloseDoors = pushPlatformGroup.getPhonePlats(); } else if (pushPlatformGroup.getFlag() == 2) { leaveLines = pushPlatformGroup.getPhonePlats(); } else if (pushPlatformGroup.getFlag() == 3) { overSpeeds = pushPlatformGroup.getPhonePlats(); } else if (pushPlatformGroup.getFlag() == 4) { breakDowns = pushPlatformGroup.getPhonePlats(); } else if (pushPlatformGroup.getFlag() == 5) { overloadAlarms = pushPlatformGroup.getPhonePlats(); } else if (pushPlatformGroup.getFlag() == 99) { emergencyAlarms = pushPlatformGroup.getPhonePlats(); } } JSONObject jsonObj = JSONObject.parseObject(jsonString); String type = null; String lineName = null; String carName = null; String occurDate = null; // 修订警报描述 type = alarmTypeRevise(jsonObj.getString("type")); lineName = jsonObj.getString("lineName"); carName = jsonObj.getString("carName"); Date date = jsonObj.getDate("occurDate"); occurDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date); StringBuilder sb = new StringBuilder(); sb.append("{\"msg_content\": \"").append("Busline:").append(lineName).append(",").append("Vehicle number:") .append(carName).append(",").append("Time:").append(occurDate).append(" trigger ").append(type) .append(" alarm!").append("\",\"title\": \"").append(type).append("\"}"); String alarmMsg = sb.toString(); StringBuilder sb2 = new StringBuilder(); sb2.append("Busline:").append(lineName).append(",").append("Vehicle number:").append(carName).append(",") .append("Time:").append(occurDate).append(" trigger ").append(type).append(" alarm!"); String alarmMsg2 = sb2.toString(); // 向设置所有推送消息 jpush2Group(all, type, alarmMsg, alarmMsg2); // 向自定义设置推送消息 switch (type) { case "off route": jpush2Group(leaveLines, type, alarmMsg, alarmMsg2); break; case "overtime parking": jpush2Group(breakDowns, type, alarmMsg, alarmMsg2); break; case "overspeed": jpush2Group(overSpeeds, type, alarmMsg, alarmMsg2); break; case "abnormal door open": jpush2Group(openCloseDoors, type, alarmMsg, alarmMsg2); break; case "overload": jpush2Group(overloadAlarms, type, alarmMsg, alarmMsg2); break; case "SOS alarm": jpush2Group(emergencyAlarms, type, alarmMsg, alarmMsg2); break; default: break; } } /** *

* Title: 给用户组发送消息推送 *

*

* Description: *

*/ private void jpush2Group(List
phonePlats, String title, String msgContent, String msgContent2) { if (!CollectionUtils.isEmpty(phonePlats)) { phonePlats.forEach(g -> { if (g.getType() == 1 && !StringUtils.isEmpty(g.getRegId())) { JpushHelper.sendMessageAndroidWithExtra(title, msgContent, g.getRegId()); } else if (g.getType() == 2 && !StringUtils.isEmpty(g.getRegId())) { JpushHelper.sendNotifyIOSWithExtra(title, msgContent2, g.getRegId()); } }); } } // 修订警报提示 private String alarmTypeRevise(String type) { switch (type) { case "leaveLine": return "off route"; case "breakDown": return "overtime parking"; case "overSpeed": return "overspeed"; case "openCloseDoor": return "abnormal door open"; case "overLoad": return "overload"; case "emergencyAlarm": return "SOS alarm"; default: return ""; } } /*************************** end ***************************/

       线程池配置(仅配置1个活跃线程,极光推送免费版有限制):

queue-capacity="10" keep-alive="5" rejection-policy="DISCARD_OLDEST" />

       

        2、方案2:自定义一个ArrayBlockingQueue队列,  大小为10,kafka消费者消费时将数据存入此队列中,超过数量10则删除最先前的;另起一个单独线程,对此队列数据进行业务处理。    

         待续...

        

三、总结

        将kafka消费程序和业务处理程序分离,串行并行化,各司其职,提高了吞吐率。

          

转载于:https://my.oschina.net/Howard2016/blog/991558

你可能感兴趣的文章
CAP定理
查看>>
Permutations
查看>>
Android布局技巧——合并布局
查看>>
基于python的性能测试工具–locust
查看>>
Windows服务监视,如果停止则启动
查看>>
maven 一个phase绑定多个插件
查看>>
Node.js程序在node-windows中不能运行
查看>>
java编程 I/O编程--字节流
查看>>
UVA 11995 - I Can Guess the Data Structure!
查看>>
2016-12-04
查看>>
Mysql优化
查看>>
Lua的各种资源1
查看>>
了解大数据的特点、来源与数据呈现方式
查看>>
第十二周作业
查看>>
Server Develop (四) select实现非阻塞sever
查看>>
【转载】程序员新年要做的10个决定
查看>>
自学之jQuery
查看>>
CV、PR方向的资源
查看>>
使用FusionCharts Free显示图表(JSP)
查看>>
如何根据日志查看删除的数据(转译)
查看>>