package
com.kafka.spout;
import
java.util.HashMap;
import
java.util.Map;
import
com.google.common.collect.Maps;
//import org.apache.storm.guava.collect.Maps;
import
backtype.storm.Config;
import
backtype.storm.LocalCluster;
import
backtype.storm.StormSubmitter;
import
backtype.storm.generated.AlreadyAliveException;
import
backtype.storm.generated.InvalidTopologyException;
import
backtype.storm.spout.SchemeAsMultiScheme;
import
backtype.storm.topology.TopologyBuilder;
import
backtype.storm.tuple.Fields;
import
backtype.storm.utils.Utils;
import
storm.kafka.BrokerHosts;
import
storm.kafka.KafkaSpout;
import
storm.kafka.SpoutConfig;
import
storm.kafka.ZkHosts;
public
class
StormKafkaTopo {
public
static
void
main(String[] args) {
BrokerHosts brokerHosts =
new
ZkHosts(
"zeb,yjd,ylh"
);
SpoutConfig spoutConfig =
new
SpoutConfig(brokerHosts,
"yjd"
,
"/storm"
,
"kafkaspout"
);
Config conf =
new
Config();
spoutConfig.scheme =
new
SchemeAsMultiScheme(
new
MessageScheme());
SimpleHBaseMapper mapper =
new
SimpleHBaseMapper();
mapper.withColumnFamily(
"result"
);
mapper.withColumnFields(
new
Fields(
"count"
));
mapper.withRowKeyField(
"word"
);
Map<String, Object> map = Maps.newTreeMap();
map.put(
"hbase.rootdir"
,
"hdfs://zeb:9000/hbase"
);
map.put(
"hbase.zookeeper.quorum"
,
"zeb:2181,yjd:2181,ylh:2181"
);
// hbase-bolt
HBaseBolt hBaseBolt =
new
HBaseBolt(
"wc"
, mapper).withConfigKey(
"hbase.conf"
);
conf.setDebug(
true
);
conf.put(
"hbase.conf"
, map);
TopologyBuilder builder =
new
TopologyBuilder();
builder.setSpout(
"spout"
,
new
KafkaSpout(spoutConfig));
builder.setBolt(
"split"
,
new
LevelSplit(),
1
).shuffleGrouping(
"spout"
);
builder.setBolt(
"count"
,
new
LevelCount(),
1
).fieldsGrouping(
"split"
,
new
Fields(
"word"
));
builder.setBolt(
"hbase"
, hBaseBolt,
1
).shuffleGrouping(
"count"
);
if
(args !=
null
&& args.length >
0
) {
//提交到集群运行
try
{
StormSubmitter.submitTopology(args[
0
], conf, builder.createTopology());
}
catch
(AlreadyAliveException e) {
e.printStackTrace();
}
catch
(InvalidTopologyException e) {
e.printStackTrace();
}
}
else
{
//本地模式运行
LocalCluster cluster =
new
LocalCluster();
cluster.submitTopology(
"Topotest1121"
, conf, builder.createTopology());
Utils.sleep(
1000000
);
cluster.killTopology(
"Topotest1121"
);
cluster.shutdown();
}
}
}