trident State应用指南

xiaoxiao2021-02-28  114

trident State应用指南

@(STORM)[storm, 大数据]

trident State应用指南一State基础示例 1主类2Aggregator的用法 1Aggregator接口2init方法3aggregate方法4complete方法 3state的用法 1拓扑定义2工厂类NameSumStateFactory3更新类NameSumUpdater4状态类NameSumState 4state应用步骤总结5state应用的一些注意事项6state与MapState的差异 二MapState 1persistentAggregate2MapStates3Demo 1创建一个实现IBackingMap的类实现multiGet和multiPut方法2创建实现StateFactory的类3定义Count函数4在拓扑中写入state或者查询state 4关于MapState的总结 1基本步骤2全流程逻辑3复杂的情况4其它思考 5MapState读写mysql示例 1MysqlMapStateFactory2MysqlMapStateBacking 三以HBaseMapState为例分析MapState代码调用全过程 零概述 MapState被调用的全流程代码 1调用过程TOTO按着这个流程把代码重头读一遍先了解ITridentBatchBolt2内容概述 一如何使用MapState二如何实现一个MapStateHBaseMapState源码分析 1Option内部类2Factory内部类 1构造函数 2makeState方法3构造函数4返回StateFactory的方法5multiGet6multiPut7序列化器 三MapState框架 TODO补充各个类的关系图参考P3231build方法2构造方法3beginCommitTODO CachedBatchReadsMap分析4commit5multiGet6multiPut7multiUpdate 四storm如何调用MapState的代码 1GroupedStream类

Trident及State的原理请见另一篇文章:http://blog.csdn.net/lujinhong2/article/details/47132305

简单总结: 1、最简单的情况使用IBacking的逻辑,很容易实现k-v格式的state。 2、如果IBacking不够灵活(不能取得txid,不是kv而是多列的格式),则直接实现MapState的接口。 3、最复杂的是使用State接口,最灵活,但真有必要吗?

第一二种方法比较:persistenceAggregate 第一个参数关键定义了如何去更新state(如mysql中的内容),比如先取出数据,更新txid,再写回去之类的,而第二个参数定义了以什么逻辑去更新数据,如求和、计算、还是平均之类的。 因此,反正第一个参数都只是返回一个MapState对象,那使用IBacking接口还是直接使用MapState接口都可以了,只是前者作了一些txid逻辑的封装,对应于几种state的类型,因此使用方便了一点,便事实上,它的代码是很简单的,它就是通过判断txid的关系来定义了update是如何使用get和put的,所以,可以直接实现MapState接口的update方法即可。

一、State基础示例

trident通过spout的事务性与state的事务处理,保证了恰好一次的语义。这里介绍了如何使用state。

完整代码请见 https://github.com/lujinhong/tridentdemo

1、主类

主类定义了拓扑的整体逻辑,这个拓扑通过一个固定的spout循环产生数据,然后统计消息中每个名字出现的次数。

拓扑中先将消息中的内容提取出来成name, age, title, tel4个field,然后通过project只保留name字段供统计,接着按照name分区后,为每个分区进行聚合,最后将聚合结果通过state写入map中。

storm.trident.Stream Origin_Stream = topology .newStream("tridentStateDemoId", spout) .parallelismHint(3) .shuffle() .parallelismHint(3) .each(new Fields("msg"), new Splitfield(), new Fields("name", "age", "title", "tel")) .parallelismHint(3) .project(new Fields("name")) //其实没什么必要,上面就不需要发射BCD字段,但可以示范一下project的用法 .parallelismHint(3) .partitionBy(new Fields("name")); //根据name的值作分区 Origin_Stream.partitionAggregate(new Fields("name"), new NameCountAggregator(), new Fields("nameSumKey", "nameSumValue")).partitionPersist( new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"), new NameSumUpdater());

2、Aggregator的用法

这里涉及了一些trident常用的API,但project等相对容易理解,这里只介绍partitionAggregate的用法。

再看看上面代码中对partitionAggregate的使用:

Origin_Stream.partitionAggregate(new Fields("name"), new NameCountAggregator(), new Fields("nameSumKey", "nameSumValue"))

第一,三个参数分别表示输入流的名称与输出流的名称。中间的NameCountAggregator是一个Aggregator的对象,它定义了如何对输入流进行聚合。我们看一下它的代码:

public class NameCountAggregator implements Aggregator<Map<String, Integer>> { private static final long serialVersionUID = -5141558506999420908L; @Override public Map<String, Integer> init(Object batchId,TridentCollector collector) { return new HashMap<String, Integer>(); } //判断某个名字是否已经存在于map中,若无,则put,若有,则递增 @Override public void aggregate(Map<String, Integer> map,TridentTuple tuple, TridentCollector collector) { String key=tuple.getString(0); if(map.containsKey(key)){ Integer tmp=map.get(key); map.put(key, ++tmp); }else{ map.put(key, 1); } } //将聚合后的结果emit出去 @Override public void complete(Map<String, Integer> map,TridentCollector collector) { if (map.size() > 0) { for(Entry<String, Integer> entry : map.entrySet()){ System.out.println("Thread.id="+Thread.currentThread().getId()+"|"+entry.getKey()+"|"+entry.getValue()); collector.emit(new Values(entry.getKey(),entry.getValue())); } map.clear(); } } @Override public void prepare(Map conf, TridentOperationContext context) { } @Override public void cleanup() { } }

(1)Aggregator接口

它实现了Aggregator接口,这个接口有3个方法:

public interface Aggregator<T> extends Operation { T init(Object batchId, TridentCollector collector); void aggregate(T val, TridentTuple tuple, TridentCollector collector); void complete(T val, TridentCollector collector); }

init方法:在处理batch之前被调用。init的返回值是一个表示聚合状态的对象,该对象会被传递到aggregate和complete方法。 aggregate方法:为每个在batch分区的输入元组所调用,更新状态 complete方法:当batch分区的所有元组已经被aggregate方法处理完后被调用。

除了实现Aggregator接口,还可以实现ReducerAggregator或者CombinerAggregator,它们使用更方便。详见《从零开始学storm》或者官方文档 https://storm.apache.org/documentation/Trident-API-Overview.html

下面我们看一下这3个方法的实现。

(2)init方法

@Override public Map<String, Integer> init(Object batchId,TridentCollector collector) { return new HashMap<String, Integer>(); }

仅初始化了一个HashMap对象,这个对象会作为参数传给aggregate和complete方法。对一个batch只执行一次。

(3)aggregate方法

aggregate方法对于batch内的每一个tuple均执行一次。这里将这个batch内的名字出现的次数放到init方法所初始化的map中。

@Override public void aggregate(Map<String, Integer> map,TridentTuple tuple, TridentCollector collector) { String key=tuple.getString(0); if(map.containsKey(key)){ Integer tmp=map.get(key); map.put(key, ++tmp); }else{ map.put(key, 1); } }

(4)complete方法

这里在complete将aggregate处理完的结果发送出去,实际上可以在任何地方emit,比如在aggregate里面。 这个方法对于一个batch也只执行一次。

@Override public void complete(Map<String, Integer> map,TridentCollector collector) { if (map.size() > 0) { for(Entry<String, Integer> entry : map.entrySet()){ System.out.println("Thread.id="+Thread.currentThread().getId()+"|"+entry.getKey()+"|"+entry.getValue()); collector.emit(new Values(entry.getKey(),entry.getValue())); } map.clear(); } }

3、state的用法

(1)拓扑定义

先看一下主类中如何将结果写入state:

partitionPersist( new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"), new NameSumUpdater());

它的定义为:

TridentState storm.trident.Stream.partitionPersist(StateFactory stateFactory, Fields inputFields, StateUpdater updater)

其中的第二个参数比较容易理解,就是输入流的名称,这里是名字与它出现的个数。下面先看一下Facotry。

(2)工厂类:NameSumStateFactory

很简单,它实现了StateFactory,只有一个方法makeState,返回一个State类型的对象。

public class NameSumStateFactory implements StateFactory { private static final long serialVersionUID = 8753337648320982637L; @Override public State makeState(Map arg0, IMetricsContext arg1, int arg2, int arg3) { return new NameSumState(); } }

(3)更新类:NameSumUpdater

这个类继承自BaseStateUpdater,它的updateState对batch的内容进行处理,这里是将batch的内容放到一个map中,然后调用setBulk方法

public class NameSumUpdater extends BaseStateUpdater<NameSumState> { private static final long serialVersionUID = -6108745529419385248L; public void updateState(NameSumState state, List<TridentTuple> tuples, TridentCollector collector) { Map<String,Integer> map=new HashMap<String,Integer>(); for(TridentTuple t: tuples) { map.put(t.getString(0), t.getInteger(1)); } state.setBulk(map); } }

(4)状态类:NameSumState

这是state最核心的类,它实现了大部分的逻辑。NameSumState实现了State接口:

public interface State { void beginCommit(Long txid); void commit(Long txid); }

分别在提交之前与提交成功的时候调用,在这里只打印了一些信息。

另外NameSumState还定义了如何处理NameSumUpdater传递的消息:

public void setBulk(Map<String, Integer> map) { // 将新到的tuple累加至map中 for (Entry<String, Integer> entry : map.entrySet()) { String key = entry.getKey(); if (this.map.containsKey(key)) { this.map.put(key, this.map.get(key) + map.get(key)); } else { this.map.put(key, entry.getValue()); } } System.out.println("-------"); // 将map中的当前状态打印出来。 for (Entry<String, Integer> entry : this.map.entrySet()) { String Key = entry.getKey(); Integer Value = entry.getValue(); System.out.println(Key + "|" + Value); } }

即将NameSumUpdater传送过来的内容写入一个HashMap中,并打印出来。 此处将state记录在一个HashMap中,如果需要记录在其它地方,如mysql,则使用jdbc写入mysql代替下面的map操作即可。

事实上,这个操作不一定要在state中执行,可以在任何类中,但建议还是在state类中实现。

4、state应用步骤总结

partitionPersist( new NameSumStateFactory(), new Fields("nameSumKey", "nameSumValue"), new NameSumUpdater());

state的应用步骤相当简单,原理也很简单: (1)NameSumStateFactory()指定了将结果保存在哪里,如本例中的hashset,还可以是mysql/hbase等。当然还有更新逻辑, (2)NameSumUpdater()指定了更新state的逻辑,如将当前数据和原有数据相加等。

5、state应用的一些注意事项

(1)使用state,你不再需要比较事务id,在数据库中同时写入多个值等内容,而是专注于你的逻辑实现 (2)除了实现State接口,更常用的是实现MapState接口,下次补充。 (3)在拓扑中指定了StateFactory,这个工厂类找到相应的State类。而Updater则每个批次均会调用它的方法。State中则定义了如何保存数据,这里将数据保存在内存中的一个HashMap,还可以保存在mysql, hbase等等。 (4)trident会自动比较txid的值,如果和当前一样,则不更改状态,如果是当前txid的下一个值,则更新状态。这种逻辑不需要用户处理。 (5)如果需要实现透明事务状态,则需要保存当前值与上一个值,在update的时候2个要同时处理。即逻辑由自己实现。在本例子中,大致思路是在NameSumState中创建2个HashMap,分别对应当前与上一个状态的值,而NameSumUpdater每次更新这2个Map。

6、state与MapState的差异

(1)由上面可以看出,state需要自己指定如何更新数据

if (this.map.containsKey(key)) { this.map.put(key, this.map.get(key) + map.get(key)); } else { this.map.put(key, entry.getValue()); } }

这里是将原有的值,加上新到的值。而MapState会根据你选择的类型(Transactional, Opaque, NonTransactional)定义好逻辑,只要定义如果向state中读写数据即可。 (2)MapState将State的aggreate与persistent 2部分操作合在一起了,由方法名也可以看出。在State中最后2步是partitionAggregate()与partitionPersistent(),而在MapState中最后1步是persistentAggregate() 事实上,查看persistentAggregate()的实现,它最终也是分成aggregate和persistent 2个步骤的。

public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) { return aggregate(inputFields, agg, functionFields) .partitionPersist(spec, TridentUtils.fieldsUnion(_groupFields, functionFields), new MapCombinerAggStateUpdater(agg, _groupFields, functionFields), TridentUtils.fieldsConcat(_groupFields, functionFields)); }

二、MapState

1、persistentAggregate

Trident有另外一种更新State的方法叫做persistentAggregate。如下:

TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))

persistentAggregate是在partitionPersist之上的另外一层抽象。它知道怎么去使用一个Trident 聚合器来更新State。在这个例子当中,因为这是一个group好的stream,Trident会期待你提供的state是实现了MapState接口的。用来进行group的字段会以key的形式存在于State当中,聚合后的结果会以value的形式存储在State当中。MapState接口看上去如下所示:

public interface MapState<T> extends State { List<T> multiGet(List<List<Object>> keys); List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters); void multiPut(List<List<Object>> keys, List<T> vals); }

当你在一个未经过group的stream上面进行聚合的话,Trident会期待你的state实现Snapshottable接口:

public interface Snapshottable<T> extends State { T get(); T update(ValueUpdater updater); void set(T o); }

MemoryMapState 和 MemcachedState 都实现了上面的2个接口。

2、MapStates

在Trident中实现MapState是非常简单的,它几乎帮你做了所有的事情。OpaqueMap, TransactionalMap, 和 NonTransactionalMap 类实现了所有相关的逻辑,包括容错的逻辑。你只需要将一个IBackingMap 的实现提供给这些类就可以了。IBackingMap接口看上去如下所示:

public interface IBackingMap<T> { List<T> multiGet(List<List<Object>> keys); void multiPut(List<List<Object>> keys, List<T> vals); }

OpaqueMap’s会用OpaqueValue的value来调用multiPut方法,TransactionalMap’s会提供TransactionalValue中的value,而NonTransactionalMaps只是简单的把从Topology获取的object传递给multiPut。

Trident还提供了一种CachedMap类来进行自动的LRU cache。

另外,Trident 提供了 SnapshottableMap 类将一个MapState 转换成一个 Snapshottable 对象.

大家可以看看 MemcachedState的实现,从而学习一下怎样将这些工具组合在一起形成一个高性能的MapState实现。MemcachedState是允许大家选择使用opaque transactional, transactional, 还是 non-transactional 语义的。

实现一个MapState,可以实现IBackingMap接口(mutliGet()/multiPut),并且实现StateFactory接口(makeState()),返回一个State对象,这是常见的用法 * 但如果有一引起高级需求,可以直接实现MapState接口,这样可以覆盖一些如beginCommit(Long txid);commit(Long txid);这些方法,还有multiUpdate()。*

3、Demo

完整代码请见 https://github.com/lujinhong/tridentdemo

更详细的可以参考trident-memcached(很全面,但较旧) https://github.com/nathanmarz/trident-memcached或者storm-hbase的State实现等

在Trident中实现MapState是非常简单的,它和单纯的State不同点在于:OpaqueMap, TransactionalMap 和 NonTransactionalMap会实现相关的容错逻辑,只需为这些类提供一个IBackingMap接口实现,调用multiGet和multiPut方法访问各自的K/V值。

public interface IBackingMap<T> { List<T> multiGet(List<List<Object>> keys); void multiPut(List<List<Object>> keys, List<T> vals); }

详细的步骤如下:

(1)创建一个实现IBackingMap的类,实现multiGet和multiPut方法

主要实现multiGet和multiPut的方法,实现如何从state中读写数据。 multiGet 的参数是一个List,可以根据key来查询数据,key本身也是一个List,以方便多个值组合成key的情形。 multiPut的参数是一个List类型的keys和一个List类型的values,它们的size应该是相等的,把这些值写入state中。

public class MemoryMapStateBacking<T> implements IBackingMap<T> { Map<List<Object>, T> db = new HashMap<List<Object>, T>(); @Override public List<T> multiGet(List<List<Object>> keys) { List<T> ret = new ArrayList(); for (List<Object> key : keys) { ret.add(db.get(key)); } return ret; } @Override public void multiPut(List<List<Object>> keys, List<T> vals) { for (int i = 0; i < keys.size(); i++) { List<Object> key = keys.get(i); T val = vals.get(i); db.put(key, val); } } }

这里将k/v写入了一个HashMap中,如果需要写入mysql,则只需要使用jdbc,把db.put改为写入mysql即可,查询类似。

(2)创建实现StateFactory的类

public class MemoryMapStateFacotry implements StateFactory{ @Override public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { return TransactionalMap.build((IBackingMap<TransactionalValue>) new MemoryMapStateBacking()); } }

很简单,就返回一个实现了MapState接口的类对象,通过把上面定义的MemoryMapStateBacking对象传入TransactionalMap.build作参数即可。当然还可以使用:

NonTransactionalMap.build(state);b OpaqueMap.build(state);

(3)定义Count函数

用于说明如果将新来的数据与原来state中的数据组合更新。这里使用了storm提供的一个工具类,它将新来到的值与原有的值相加。

public class Count implements CombinerAggregator<Long> { @Override public Long init(TridentTuple tuple) { return 1L; } @Override public Long combine(Long val1, Long val2) { return val1 + val2; } @Override public Long zero() { return 0L; } }

(4)在拓扑中写入state,或者查询state

//这个流程用于统计单词数据,结果将被保存在wordCounts中 TridentState wordCounts = topology.newStream("spout1", spout) .parallelismHint(16) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapStateFacotry(), new Count(), new Fields("count")).parallelismHint(16); //这个流程用于查询上面的统计结果 topology.newDRPCStream("words", drpc) .each(new Fields("args"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")) .each(new Fields("count"), new FilterNull()) .aggregate(new Fields("count"), new Sum(), new Fields("sum")); return topology.build();

4、关于MapState的总结

(1)基本步骤

(1)创建一个实现IBackingMap的类,实现multiGet和multiPut方法

(2)创建实现StateFactory的类,它的makeState返回一个实现了MapState接口的对象,可以通过:

mapState = TransactionalMap.build(_iBacking);

其中_iBacking就是第一步实现类的对象。当然还可以使用

mapState = NonTransactionalMap.build(state); mapState = OpaqueMap.build(state);

TransactionalMap,OpaqueMap, NonTransactionalMap已经通过判断txid的值实现了相应的事务逻辑,以TransactionalMap为例,它的源码中会判断batch中的txid与state中已经存储的是否相同,或者同的话则新值等于旧值即可: if(_currTx!=null && _currTx.equals(val.getTxid()) && !retval.cached) (3)在拓扑中使用persistentAggregate写入state

(2)全流程逻辑

以事务型状态为例,我们看一下整个存储过程的逻辑: * 首先,persistentAggregate收到一批数据,它的第一个参数返回的是事务型的MapState * 然后,TransactionalMap在multiUpdate中会判断这个事务的txid与当前state中的txid是否一致。 * 如果txid一致的话,则保持原来的值即可,如果txid不一致,则更新数值。 * 如果更新数据呢?它是拿新来的值和state中的原有的值,使用persistentAggregate中第2个参数定义的类方法作聚合计算。

* 第一个参数关键定义了如何去更新state(如mysql中的内容),比如先取出数据,更新txid,再写回去之类的,而第二个参数定义了以什么逻辑去更新数据,如求和、计算、还是平均之类的。* 因此,反正第一个参数都只是返回一个MapState对象,那使用IBacking接口还是直接使用MapState接口都可以了,只是前者作了一些txid逻辑的封装,对应于几种state的类型,因此使用方便了一点,便事实上,它的代码是很简单的,它就是通过判断txid的关系来定义了update是如何使用get和put的,所以,可以直接实现MapState接口的update方法即可。

persistentAggregate的第2个参数定义了数据是如何更新的,而IBackingMap中的multiGet和multiPut只定义了如何向state中存取数据。 比如此处的Count,它会将将2个数据相加:

@Override public Long combine(Long val1, Long val2) { return val1 + val2; }

因此新来的统计次数与原有的统计次数加起来即是新的总和。

而对于透明事务状态,不管txid是否一致,都需要修改state,同时将当前state保存一下,成为preState。非事务型就简单了,不管你来什么,我都直接更新。

(3)复杂的情况

当然,如果觉得TransactionalMap,OpaqueMap, NonTransactionalMap不能满足业务需求,则可以自定义一个实现了MapState接口的类,而不是直接使用它们。

反正这三个类的实现逻辑非常简单,当不能满足业务需要时,看一下源码,然后参考它创建自己的类即可,此时,关键是multiUpdate的实现。

(4)其它思考

key可以是一个很复杂的List,包括多个字段。

5、MapState读写mysql示例

(1)MysqlMapStateFactory

public class MysqlMapStateFactory<T> implements StateFactory { private static final long serialVersionUID = 1987523234141L; @Override public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { return TransactionalMap.build((IBackingMap<TransactionalValue>) new MysqlMapStateBacking()); } }

很简单,就一行,返回一个IBacking对象。这里使用的Transactioal,当然还可以使用NonTransactional和Opaque。

(2)MysqlMapStateBacking

最核心的还是multiGet()和multiPut:

@Override public List<TransactionalValue> multiGet(List<List<Object>> keys) { if (stmt == null) { stmt = getStatment(); } List<TransactionalValue> values = new ArrayList<TransactionalValue>(); for (List<Object> key : keys) { String sql = "SELECT req_count FROM edt_analysis where id='" + key.get(0) + "'"; LOG.debug("============sql: " + sql); try (ResultSet rs = stmt.executeQuery(sql)) { if (rs.next()) { LOG.info("Get value:{} by key:{}", rs.getObject(1), key); values.add(derialize(rs.getObject(1))); } else { values.add(null); } } catch (SQLException e) { e.printStackTrace(); } } return values; } @Override public void multiPut(List<List<Object>> keys, List<TransactionalValue> vals) { if (stmt == null) { stmt = getStatment(); } for (int i = 0; i < keys.size(); i++) { String sql = "replace into edt_analysis values('" + keys.get(i).get(0) + "','" + serialize(vals.get(i)) + "')"; LOG.debug("===================put sql " + sql); try { stmt.execute(sql); } catch (SQLException e) { e.printStackTrace(); } } }

但mysql与redis之类的不同,它需要将一个TransactionalValue对象转换为mysql中的一行数据,同理,需要将mysql中的一行数据转换为一个TransactionalValue对象:

// 将数据库中的varchar转换为TransactionalValue对象 private TransactionalValue derialize(Object object) { String value[] = object.toString().split(","); return new TransactionalValue(Long.parseLong(value[0]), Long.parseLong(value[1])); } // 将TransactionalValue转换为String private String serialize(TransactionalValue transactionalValue) { return transactionalValue.getTxid() + "," + transactionalValue.getVal(); }

这是使用了最简单的方式,只有2列,一行是key,一列是value,value中保存了txid及真实的value,之间以逗号分隔。

三、以HBaseMapState为例分析MapState代码调用全过程

(零)概述 & MapState被调用的全流程代码

1、调用过程

(1)SubtopologyBolt implements ITridentBatchBolt这个bolt在完成一个batch的处理后会调用finishBatch(BatchInfo batchInfo) (2)然后调用PartitionPersistProcessor implements TridentProcessor这个处理器的finishBatch(ProcessorContext processorContext) (3)接着调用MapCombinerAggStateUpdater implements StateUpdater<MapState>的updateState(MapState map, List<TridentTuple> tuples, TridentCollector collector) (4)再接着调用TransactionalMap<T> implements MapState<T>的 multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) (5)最后就是调用用户定义的MapState类(如HBaseMapState)的multiGet()和multiPut()方法了。

TOTO:按着这个流程把代码重头读一遍,先了解ITridentBatchBolt。

简单的说就是一个blot被处理完后,会调用finishBatch()方法,然后这个方法会调用MapState()框架的updateState(),接着调用mutliUpdate(),最后调用用户定义的multiGet()和multiPut()。

2、内容概述

本部分我们将以HBaseMapState为例,介绍使用MapState保证数据完整唯一的全流程代码调用,主要分成这几个部分: (1)我们先介绍用户如何在构建代码中使用这个MapState (2)然后介绍HBaseMapState的源代码,这也是用户需要实现一个MapState的基本方法。 (3)接着介绍MapState框架如何调用用户定义的代码形成事务性。 (4)最后介绍storm的内部机制,如何调用MapState。 这也是用户如何要看源码的逐步深入的过程。

(一)如何使用MapState

详细DEMO请见:https://github.com/lujinhong/stormhbasedemo

1、指定一些配置

HBaseMapState.Options option = new HBaseMapState.Options(); option.tableName = "ljhtest2"; option.columnFamily = "f1"; option.mapMapper = new SimpleTridentHBaseMapMapper("ms");

SimpleTridentHBaseMapMapper主要用于获取Rowkey和qualifier。Option的完整选项见下面的源码分析。

2、指定state

topology.newStream("kafka", kafkaSpout).shuffle(). each(new Fields("str"), new WordSplit(), new Fields("word")). groupBy(new Fields("word")). persistentAggregate(HBaseMapState.transactional(option), new Count(), new Fields("aggregates_words")).parallelismHint(1);

这里使用Option对象来构建一个HBaseMapStateFactory。 还可以通过

HBaseMapState.nonTransactional(option) HBaseMapState.opaque(option)

分别创建非事务与透明的state。

这里使用了storm内建的Count()方法,如果使用Sum,用法如下:

.persistentAggregate(HBaseMapState.transactional(option), new Fields("cash"), new Sum(), new Fields("state")).parallelismHint(1);

当然还可以自定义方法,这里自定义方法也就可以自定义保存在hbase的数据类型了。

(二)如何实现一个MapState:HBaseMapState源码分析

HBaseMapState的主要代码都在HBaseMapState类中。一个MapState的实现关键在于 * 构建一个实现StateFactory的类,实现makeState() 方法,返回一个State对象。 * 一个MapState,实现IBackingMap接口的multiGet()和multiPut(),指定如何从hbase中读写数据。 关于mapstate的基础介绍请参考上面。

1、Option内部类

HBaseMapState有一个内部类:Option,用于指定一些配置项。

public static class Options<T> implements Serializable { public Serializer<T> serializer = null; public int cacheSize = 5000; public String globalKey = "$HBASE_STATE_GLOBAL$"; public String configKey = "hbase.config"; public String tableName; public String columnFamily; public TridentHBaseMapMapper mapMapper; }

分别意思为: * 序列化器,即以什么格式写入hbase,storm-hbase自带了JSON格式的序列化实现。 * 缓冲大小 * 未知 * 指定hbase-site.xml位置的变量 * 表名 * family名 * 用于获取rowkey和qualifier,创建对象时需要指定一个参数作为qualifier。

2、Factory内部类

(1)构造函数

构造函数接收2个参数,分别为state的类型以及Option对象。 除些以外,还指定了序列化器:

if (this.options.serializer == null) { this.options.serializer = DEFAULT_SERIALZERS.get(stateType); }

(2)makeState()方法

就是返回一个State对象。

3、构造函数

构造函数用于加载配置文件,安全机制等。

4、返回StateFactory的方法

没什么好介绍的,就是返回各种类型的staStateFactory,具体的说就是返回上面Factory的一个对象。这里只保留了透明型的。

@SuppressWarnings("rawtypes") public static StateFactory opaque() { Options<OpaqueValue> options = new Options<OpaqueValue>(); return opaque(options); } @SuppressWarnings("rawtypes") public static StateFactory opaque(Options<OpaqueValue> opts) { return new Factory(StateType.OPAQUE, opts); }

5、multiGet

根据一个List<List<Object>> keys列表获取到一个返回值的列表List。注意key本身也是一个List<Object>。 代码主要是三部分: (1)创建List<Get> gets

List<Get> gets = new ArrayList<Get>(); for(List<Object> key : keys){ byte[] hbaseKey = this.options.mapMapper.rowKey(key); String qualifier = this.options.mapMapper.qualifier(key); LOG.info("Partition: {}, GET: {}", this.partitionNum, new String(hbaseKey)); Get get = new Get(hbaseKey); get.addColumn(this.options.columnFamily.getBytes(), qualifier.getBytes()); gets.add(get); }

(2)查询hbase:根据gets获取Result[]

List<T> retval = new ArrayList<T>(); Result[] results = this.table.get(gets);

(3)将results封装成一个List<T> retval并返回

for (int i = 0; i < keys.size(); i++) { String qualifier = this.options.mapMapper.qualifier(keys.get(i)); Result result = results[i]; byte[] value = result.getValue(this.options.columnFamily.getBytes(), qualifier.getBytes()); if(value != null) { retval.add(this.serializer.deserialize(value)); } else { retval.add(null); } } return retval;

当返回值为空时,则加上null。

6、multiPut

它将一个List<List<Object>> keys, List<T> values的数据写入hbase,注意keys.size()与values.size()必须相等。

List<Put> puts = new ArrayList<Put>(keys.size()); for (int i = 0; i < keys.size(); i++) { byte[] hbaseKey = this.options.mapMapper.rowKey(keys.get(i)); String qualifier = this.options.mapMapper.qualifier(keys.get(i)); LOG.info("Partiton: {}, Key: {}, Value: {}", new Object[]{this.partitionNum, new String(hbaseKey), new String(this.serializer.serialize(values.get(i)))}); Put put = new Put(hbaseKey); T val = values.get(i); put.add(this.options.columnFamily.getBytes(), qualifier.getBytes(), this.serializer.serialize(val)); puts.add(put); this.table.put(puts);

7、序列化器

序列化器指定了以何种格式将数据写入hbase(序列化),以及取出数据后如何进行解释(反序列化),即关键是serialize()与deserialize()这2个方法。

storm默认提供了json的实现,以Transactional为例:

public class JSONTransactionalSerializer implements Serializer<TransactionalValue>

它的内部只有2个方法:

@Override public byte[] serialize(TransactionalValue obj) { List toSer = new ArrayList(2); toSer.add(obj.getTxid()); toSer.add(obj.getVal()); try { return JSONValue.toJSONString(toSer).getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } }

它将一个TransactionalValue转化为json格式,TransactionalValue只有2个变量,是一个典型的bean:

T val; Long txid;

而另一个方法deserialize()则刚好相反,它将一个json格式字节流解释为一个TransactionalValue对象:

@Override public TransactionalValue deserialize(byte[] b) { try { String s = new String(b, "UTF-8"); List deser = (List) JSONValue.parse(s); return new TransactionalValue((Long) deser.get(0), deser.get(1)); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } }

(三)MapState框架

//TODO:补充各个类的关系图,参考P323

上述介绍了用户如何通过实现IBackingMap接口来创建自己的MapState实现,这里我们将介绍MapState框架是如何调用用户写的mutliGet()和multiPut方法的。

* 另外,如果上述实现iBackingMap的方法不能满足你的要求,你可以实现自己的MapState框架,按照这里介绍的方法即可 *

我们主要以Transactional为例,再简单介绍一下NonTransactional和Opaque的情形。在上面的Factory.makeState()方法中:

IBackingMap state = new HBaseMapState(options, conf, partitionIndex); mapState = TransactionalMap.build(state);

state就是用户代码定义的MapState实现,此此处是HBaseMapState。我们下面看一下TransactionalMap是如何调用HBaseMapState的mutliGet()和multiPut方法的。

1、build()方法

我们从build方法开始,因为这是用户创建MapState所调用的API。

public static <T> MapState<T> build(IBackingMap<TransactionalValue> backing) { return new TransactionalMap<T>(backing); }

它使用用户定义的IBackingMap对象创建一个MapState对象,主要通过构造方法来实现。

2、构造方法

protected TransactionalMap(IBackingMap<TransactionalValue> backing) { _backing = new CachedBatchReadsMap(backing); }

3、beginCommit()

@Override public void beginCommit(Long txid) { _currTx = txid; _backing.reset(); }

当开始处理一个事务时,设置当前正在处理的txid,reset()是CachedBatchReadsMap类中清空缓存的方法。

TODO: CachedBatchReadsMap分析

4、commit()

@Override public void commit(Long txid) { _currTx = null; _backing.reset(); }

当一个事务处理完成后,将txid设置为null。

5、multiGet

@Override public List<T> multiGet(List<List<Object>> keys) { List<CachedBatchReadsMap.RetVal<TransactionalValue>> vals = _backing.multiGet(keys); List<T> ret = new ArrayList<T>(vals.size()); for(CachedBatchReadsMap.RetVal<TransactionalValue> retval: vals) { TransactionalValue v = retval.val; if(v!=null) { ret.add((T) v.getVal()); } else { ret.add(null); } } return ret; }

通过调用用户的_backing.multiGet(keys)来实现具体逻辑,作了一些类型转换。

6、multiPut()

@Override public void multiPut(List<List<Object>> keys, List<T> vals) { List<TransactionalValue> newVals = new ArrayList<TransactionalValue>(vals.size()); for(T val: vals) { newVals.add(new TransactionalValue<T>(_currTx, val)); } _backing.multiPut(keys, newVals); }

同样只是调用用户定位的multiPut()。

7、multiUpdate()

核心的逻辑在于这几行:

if(val==null) { newVal = new TransactionalValue<T>(_currTx, updater.update(null)); changed = true; } else { if(_currTx!=null && _currTx.equals(val.getTxid()) && !retval.cached) { newVal = val; } else { newVal = new TransactionalValue<T>(_currTx, updater.update(val.getVal())); changed = true; } } ret.add(newVal.getVal()); if(changed) { newVals.add(newVal); newKeys.add(keys.get(i)); } }

在这之前,先把数据get出来,然后判断:

如果key对应的value为空,则changed为true如果key对应的value不为空,而且当前的txid与value中的txid相同,则changed保持为false。如果key对应的value不为空,但当前的txid与value中的txid不同,则changed为true。

这部分逻辑就是Transactional, NonTransactional和Opaque的差别。 NonTransactional不会判断txid,只要来一批就更新一次。 Opaque基于之前的值作更新。

(四)storm如何调用MapState的代码

根据前面的分析,用户在拓扑定义中通过以下类似的代码来指定state:

topology.newStream("wordsplit", spout).shuffle(). each(new Fields("sentence"), new WordSplit(), new Fields("word")). groupBy(new Fields("word")). persistentAggregate(HBaseMapState.transactional(option), new Count(), new Fields("aggregates_words")).parallelismHint(1);

主要看第3、4行,先对数据根据”word”这个流进行分组,然后再调用persistentAggregate()方法。再简单解释一下这个方法,3个参数分别为: * 返回一个StateFactory对象,它有一个makeState()方法,返回一个State对象。这个state对象就是用户定义的MapState,主要定义了如何从state中读写数据。 * 第二个参数表示如何对取出的数据进行什么操作,这里使用的是Count,如是其它类,如Sum,则多一个参数:

persistentAggregate(HBaseMapState.transactional(option), new Fields("cash"), new Sum(), new Fields("state")).parallelismHint(1);

* 发送的消息流。

好,我们下面开始分析GroupedStream#persistentAggregate()做了什么东西。

1、GroupedStream类

public TridentState persistentAggregate(StateFactory stateFactory, CombinerAggregator agg, Fields functionFields) { return persistentAggregate(new StateSpec(stateFactory), agg, functionFields); } public TridentState persistentAggregate(StateSpec spec, CombinerAggregator agg, Fields functionFields) { return persistentAggregate(spec, null, agg, functionFields); } public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) { return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields); }

很简单的代码逻辑,先使用StateFactory对象创建一个StateSpec对象,然后继续调用,从第3个方法可以看出,这里还有一个参数是表示inputFields,即输入的field,即对哪个field执行CombinerAggregator的操作。StateSpec类的定义非常简单:

public class StateSpec implements Serializable { public StateFactory stateFactory; public Integer requiredNumPartitions = null; public StateSpec(StateFactory stateFactory) { this.stateFactory = stateFactory; } }

最终真正调用的方法是这个:

public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) { return aggregate(inputFields, agg, functionFields) .partitionPersist(spec, TridentUtils.fieldsUnion(_groupFields, functionFields), new MapCombinerAggStateUpdater(agg, _groupFields, functionFields), TridentUtils.fieldsConcat(_groupFields, functionFields)); }

这个方法主要分成2个步骤 * 第一个是调用aggregate()方法,主要如何对数据进行操作。这部分我们以后再分析,反正把它理解为一个数据的更新就好了。 * 第二个是调用partitionPersist()方法,如何将数据写入state。

public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater, Fields functionFields) { projectionValidation(inputFields); String id = _topology.getUniqueStateId(); ProcessorNode n = new ProcessorNode(_topology.getUniqueStreamId(), _name, functionFields, functionFields, new PartitionPersistProcessor(id, inputFields, updater)); n.committer = true; n.stateInfo = new NodeStateInfo(id, stateSpec); return _topology.addSourcedStateNode(this, n); }

构建一个ProcessorNode,然后将它添加进_topology中。

转载请注明原文地址: https://www.6miu.com/read-63685.html

最新回复(0)