事务:运用IPartitionedTridentSpout
DRPC:Web端通过DRPC获得结果数据
topN:按销售额排行
基于Hbase存储的Trident state:支持Topo重启时不丢数据,也可支撑Web端读数据
数据获得实现
通过开源Storm-kafka项目提供
采用Storm-kafka 项目中提供的TransactionalTridentKafkaSpout
开发思路
1、业务逻辑处理,bolt中实现;
2、数据落地格式可更加前台HighCharts的需要而定,有时需要特别为HighCharts的数据格式来存储落地数据,如项目三中就会这样。
采用内存+磁盘方式避免断电、重启等会造成数据丢失的问题。
3、必须前后台分离,有利于稳定性;
4、Web端开发思路相对简单,就是长链接读数据,推数据到HighCharts
5、HighCharts开发,很多实例及代码,可以现学现用。
优点:1、前台、后台分离,重启维护互不影响;
2、Storm重启不会影响结果数据,不影响HighCharts图表展示;
3、Tomcat重启不会影响数据处理;
4、前台后台数据传输通过Hbase 或 DRPC,注意DRPC是通过服务访问,稳定性不如DB。
package cn.wh.trident;
import storm.kafka.BrokerHosts;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.trident.TransactionalTridentKafkaSpout;
import storm.kafka.trident.TridentKafkaConfig;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
importstorm.trident.operation.builtin.FirstN;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Sum;
import storm.trident.testing.MemoryMapState;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import cn.wh.storm.KafkaProperties;
publicclass TridentTopo {
/**
* @param args
*/
publicstaticvoid main(String[] args) {
BrokerHosts zkHosts = new ZkHosts(KafkaProperties.zkConnect);
String topic = "track";
TridentKafkaConfig config = new TridentKafkaConfig(zkHosts, topic);
config.forceFromStart = false; //测试时用true,上线时必须改为false
//输入格式
config.scheme = new SchemeAsMultiScheme(new StringScheme());
config.fetchSizeBytes = 100 ; //batch size
LocalDRPC drpc = new LocalDRPC();
TransactionalTridentKafkaSpout spout = newTransactionalTridentKafkaSpout(config) ;
TridentTopology topology = new TridentTopology() ;
//销售额
TridentState amtState = topology.newStream("spout", spout)
.parallelismHint(3)
.each(new Fields(StringScheme.STRING_SCHEME_KEY),new OrderSplit("\\t"), new Fields("order_id","order_amt","create_date","province_id"))
.shuffle()
.groupBy(new Fields("create_date","province_id"))
.persistentAggregate(new MemoryMapState.Factory(), new Fields("order_amt"), new Sum(), new Fields("sum_amt"));
topology.newDRPCStream("getOrderAmt", drpc)
.each(new Fields("args"), new Split(" "), new Fields("arg"))
.each(new Fields("arg"), new SplitBy("\\:"), new Fields("create_date","province_id"))
.groupBy(new Fields("create_date","province_id"))
.stateQuery(amtState, new Fields("create_date","province_id"), new MapGet(), new Fields("sum_amt"))
// .applyAssembly(new FirstN(5, "sum_amt", true))
;
//订单数
TridentState orderState = topology.newStream("orderSpout", spout)
.parallelismHint(3)
.each(new Fields(StringScheme.STRING_SCHEME_KEY),new OrderSplit("\\t"), new Fields("order_id","order_amt","create_date","province_id"))
.shuffle()
.groupBy(new Fields("create_date","province_id"))
.persistentAggregate(new MemoryMapState.Factory(), new Fields("order_id"), new Count(), new Fields("order_num"));
topology.newDRPCStream("getOrderNum", drpc)
.each(new Fields("args"), new Split(" "), new Fields("arg"))
.each(new Fields("arg"), new SplitBy("\\:"), new Fields("create_date","province_id"))
.groupBy(new Fields("create_date","province_id"))
.stateQuery(orderState, new Fields("create_date","province_id"), new MapGet(), new Fields("order_num"))
// .applyAssembly(new FirstN(5, "order_num", true))
;
Config conf = new Config() ;
conf.setDebug(false);
LocalCluster cluster = new LocalCluster() ;
cluster.submitTopology("myTopo", conf, topology.build());
while(true){
// System.err.println("销售额:"+drpc.execute("getOrderAmt", "2014-08-19:1 2014-08-19:2 2014-08-19:3 2014-08-19:4 2014-08-19:5")) ;
System.err.println("订单数:"+drpc.execute("getOrderNum", "2014-08-19:1 2014-08-19:2 2014-08-19:3 2014-08-19:4 2014-08-19:5")) ;
Utils.sleep(5000);
}
}
}
DRPC方式基于hbase state方式
package cn.wh.stormtest;
import hbase.state.HBaseAggregateState;
import hbase.state.TridentConfig;
import kafka.productor.KafkaProperties;
import storm.kafka.BrokerHosts;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.trident.TransactionalTridentKafkaSpout;
import storm.kafka.trident.TridentKafkaConfig;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.FirstN;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Sum;
import storm.trident.state.StateFactory;
import storm.trident.testing.MemoryMapState;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
public class TridentTopo {
public static StormTopology builder(LocalDRPC drpc)
{
TridentConfig tridentConfig = new TridentConfig("state");
StateFactory state = HBaseAggregateState.transactional(tridentConfig);
BrokerHosts zkHosts = new ZkHosts(KafkaProperties.zkConnect);
String topic = "track";
TridentKafkaConfig config = new TridentKafkaConfig(zkHosts, topic);
config.forceFromStart = false; //测试时用true,上线时必须改为false
config.scheme = new SchemeAsMultiScheme(new StringScheme());
config.fetchSizeBytes = 100 ;//batch size
TransactionalTridentKafkaSpout spout = new TransactionalTridentKafkaSpout(config) ;
TridentTopology topology = new TridentTopology() ;
//销售额
TridentState amtState = topology.newStream("spout", spout)
.parallelismHint(3)
.each(new Fields(StringScheme.STRING_SCHEME_KEY),new OrderAmtSplit("\\t"), new Fields("order_id","order_amt","create_date","province_id","cf"))
.shuffle()
.groupBy(new Fields("create_date","cf","province_id"))
.persistentAggregate(state, new Fields("order_amt"), new Sum(), new Fields("sum_amt"));
// .persistentAggregate(new MemoryMapState.Factory(), new Fields("order_amt"), new Sum(), new Fields("sum_amt"));
topology.newDRPCStream("getOrderAmt", drpc)
.each(new Fields("args"), new Split(" "), new Fields("arg"))
.each(new Fields("arg"), new SplitBy("\\:"), new Fields("create_date","cf","province_id"))
.groupBy(new Fields("create_date","cf","province_id"))
.stateQuery(amtState, new Fields("create_date","cf","province_id"), new MapGet(), new Fields("sum_amt"))
.each(new Fields("sum_amt"),new FilterNull())
.applyAssembly(new FirstN(5, "sum_amt", true))
;
//订单数
TridentState orderState = topology.newStream("orderSpout", spout)
.parallelismHint(3)
.each(new Fields(StringScheme.STRING_SCHEME_KEY),new OrderNumSplit("\\t"), new Fields("order_id","order_amt","create_date","province_id","cf"))
.shuffle()
.groupBy(new Fields("create_date","cf","province_id"))
.persistentAggregate(state, new Fields("order_id"), new Count(), new Fields("order_num"));
// .persistentAggregate(new MemoryMapState.Factory(), new Fields("order_id"), new Count(), new Fields("order_num"));
topology.newDRPCStream("getOrderNum", drpc)
.each(new Fields("args"), new Split(" "), new Fields("arg"))
.each(new Fields("arg"), new SplitBy("\\:"), new Fields("create_date","cf","province_id"))
.groupBy(new Fields("create_date","cf","province_id"))
.stateQuery(orderState, new Fields("create_date","cf","province_id"), new MapGet(), new Fields("order_num"))
.each(new Fields("order_num"),new FilterNull())
// .applyAssembly(new FirstN(5, "order_num", true))
;
return topology.build() ;
}
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
LocalDRPC drpc = new LocalDRPC();
Config conf = new Config() ;
conf.setNumWorkers(5);
conf.setDebug(false);
if (args.length > 0) {
try {
StormSubmitter.submitTopology(args[0], conf, builder(null));
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
}else{
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("mytopology", conf, builder(drpc));
}
// Utils.sleep(60000);
// while (true) {
// System.err.println("销售额:"+drpc.execute("getOrderAmt", "2014-09-13:cf:amt_3 2014-09-13:cf:amt_2 2014-09-13:cf:amt_1 2014-09-13:cf:amt_7 2014-09-13:cf:amt_6 2014-09-13:cf:amt_5 2014-09-13:cf:amt_4 2014-09-13:cf:amt_8")) ;
// Utils.sleep(5000);
// }
/**
* [["2014-08-19:1 2014-08-19:2 2014-08-19:3 2014-08-19:4 2014-08-19:5","2014-08-19:1","2014-08-19","1",821.9000000000001],
* ["2014-08-19:1 2014-08-19:2 2014-08-19:3 2014-08-19:4 2014-08-19:5","2014-08-19:2","2014-08-19","2",631.3000000000001],
* ["2014-08-19:1 2014-08-19:2 2014-08-19:3 2014-08-19:4 2014-08-19:5","2014-08-19:3","2014-08-19","3",240.7],
* ["2014-08-19:1 2014-08-19:2 2014-08-19:3 2014-08-19:4 2014-08-19:5","2014-08-19:4","2014-08-19","4",340.4],
* ["2014-08-19:1 2014-08-19:2 2014-08-19:3 2014-08-19:4 2014-08-19:5","2014-08-19:5","2014-08-19","5",460.8]]
*/
}
}
----------------------------DRPC Client-------------
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.utils.Utils;
public class TridentDRPCclient {
public static void main(String[] args) {
DRPCClient client = new DRPCClient("192.168.1.107", 3772);
// LocalDRPC client = new LocalDRPC();
try {
while (true)
{
System.err.println("销售额:"+client.execute("getOrderAmt", "2014-09-13:cf:amt_5 2014-09-13:cf:amt_8")) ;
System.err.println("订单数:"+client.execute("getOrderNum", "2014-09-13:cf:amt_1 2014-09-13:cf:amt_2")) ;
Utils.sleep(5000);
}
} catch (Exception e) {
e.printStackTrace() ;
}
}
}
分享到:
相关推荐
3、掌握Storm Trident项目开发模式; 4、掌握Storm集成Kafka开发及项目实战; 5、掌握HighCharts各类图表开发和实时无刷新加载数据; 6、掌握Storm+Kafka+HBase的经典组合,完整呈现企业实践项目; 7、可以做到以...
Storm视频教程通过含3个Storm完整项目,均为企业实际项目,其中一个是完全由Storm Trident开发。本课程每个技术均采用最新稳定版本,学完后可以从Kafka到Storm项目开发及HighCharts图表开发一个人搞定!涨工资?身价...
Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群、Zookeeper集群等,涉及项目:网站PV、UV案例实战、其他案例; Storm视频教程亮点: 1、Storm...
1、课程中完整开发3个Storm项目,均为企业实际项目,其中一个是完全由Storm Trident开发。 项目源码均可以直接运行,也可直接用于商用或企业。 2、Storm全面、系统、深入讲解 3、注重实践,对较抽象难懂的技术点如...
Storm流计算项目(文档中含有视频下载地址和解压密码),内容包含 storm、trident、kafka、hbase、cdh、hightcharts 等内容
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
storm_trident_state storm trident自定义state的实现,实现了3种插入数据库的持久化方法,事务,不透明事务,不透明分区事务 下载下面,运行:mvn eclipse:eclipse即可在eclipse使用此项目。
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...