京东技术创新实践:Hack Day百期精华案例分享pdf/doc/txt格式电子书下载
本站仅展示书籍部分内容
如有任何咨询
请加微信10090337咨询
书名:京东技术创新实践:Hack Day百期精华案例分享pdf/doc/txt格式电子书下载
推荐语:
作者:京东研发研发管理部,综合管理部
出版社:
出版时间:2015-11-01
书籍编号:30206291
ISBN:
正文语种:中文
字数:79640
版次:1
所属分类:互联网+-产品/运营
京东有尊重技术、尊重人才的文化。我们鼓励分享、鼓励创新。每一次创新,都面临着风险、挑战与困难。如何在巨大的压力下,快速、有效的突破挑战与困难,我相信这本书将让我们受益匪浅。
——京东技术首席执行官 张晨
《京东技术创新》浓缩了Hack Day最精华的部分,是每位研发人员必看的一本书。Hack Day不仅仅是在分享最前沿的技术,还培养了京东研发人善于分享、善于创新的好习惯。一切完美的事物,无一不是创新的结果。希望Hack Day能够越办越好!
——京东营销研发部负责人 马松
技术是京东核心基因之一,技术的强大需要每一个人的努力,不断在实践中总结经验,同时将经验进行分享和传承,让每一个人都有机会成为技术专家,这是每一个研发人的心声!
——京东云平台负责人 何刚
Hack Day是研发内部最具影响力的技术交流、学习平台,在这里研发人共同见证京东技术的成长和进步。我相信,当京东研发的每个人都成为不断学习和自我创新的载体时,一个技术和业务双导向的未来将不再遥远。
——京东运营研发部负责人 肖军
Hack Day现在已经是京东研发人员耳熟能详的一个品牌活动。在这里,研发团队有机会将自己的创新技术和科研成果与更多人分享交流,而参与这样的活动也能认识更多牛人,了解行业前沿现状。现在这本书的出版,是一个很好的里程碑,既是对过去工作的一个总结,更是开启未来新篇章的号角,希望能促进京东优秀技术案例的宣传推广,并激励团队更高质量的技术产出!
——京东首席技术顾问 翁志Dennis
主题:Spark-streaming在云海实时计算项目中的实践
作者:刘永平
职位:架构师
作者简介:云平台资深架构师,多年与数据相关的工作经验。云海实计项目整合了Spark-streaming,Hbase,Flume等大数据相关技术,建立了跨集群,跨数据库的通用实时计算平台,实现将sql思想引入实时计算中,大大降低了大数据实时计算的门槛。
本文所属技术领域:大数据处理中的实时计算
第一章节 项目背景
京东云海项目,致力于开放京东商品、商家、客服绩效、品牌、行业五大主题数据,为ISV提供海量数据的分布式存储计算平台,提供完整的云端数据仓库解决方案。在实时计算上线之前,云海项目只提供T+1的数据解决方案,即今天数据的相关报表只能明天才能提供,主要基于hive技术实现。为了提高用户对数据的体验,云海项目提出实时计算的解决方案,即在秒级内实时提供最终的报表。
在实时计算的技术框架的选择上,我们充分考虑到大数据处理的三个典型场景,即复杂的海量数据处理、基于历史数据的交互式查询、基于实时数据流的数据处理。而Spark作为一个很火的高效的分布式计算系统能比较全面的满足上述的需求。同时,Spark-streaming有很丰富的数据转换处理的API,可大量减少开发成本。
Spark支持两种类型操作:Transformations和Actions。Transformation从一个已知的RDD数据集经过转换得到一个新的RDD数据集,这些Transformation操作包括map、filter、flatMap、union、join等,而且Transformation具有lazy的特性,调用这些操作并没有立刻执行对已知RDD数据集的计算操作,而是在调用了另一类型的Action操作才会真正地执行。Action执行,会真正地对RDD数据集进行操作,返回一个计算结果给Driver程序,或者没有返回结果,如将计算结果数据进行持久化,Action操作包括reduceByKey、count、foreach、collect等。同样、Spark Streaming提供了类似Spark的两种操作类型。
第二章节 项目实践
一 项目框架
输入流:业务系统的数据库或网页的各种点击所生成的每条数据都发送到相应的kafka的源topic中。
输出流:其处理包括如下几步:
1)通过spark-streaming的Transformations或Actions处理后写入到hbase表中。
2)Hbase的结果表部署上协处理器,将更新数据发送到kafka的结果topic中。
3)实时消费结果topic的数据,并写入mysql中,这是供web应用(用户)消费的最终数据。
二 启动命令(spark on yarn)
SPARK_JAR=./jars/spark-assembly-0.9.1-hadoop2.3.0.jar ./bin/spark-class org.apache.spark.deploy.yarn.Client——jar /data1/hadoop/runjars/oderealtime.jar——class com.ode.realtime.app.$1——args $2——num-workers $3——master-memory 2g——worker-memory $5——worker-cores $4——name $6
oderealtime.jar 自行开发实现计算逻辑的jar包。
$1 java主程序
$2 java主程序的运行参数
其他参数见spark的相关文档
三 代码解析
(一)设置system属性,创建JavaStreamingContext对象,设置与kafka消费相关的属性,并以此为参数与kakfa对接,实时接收流式数据。
1)System.setProperty(\"spark.serializer\",\"org.apache.spark.serializer.KryoSerializer\");
其他参数有spark.serializer,spark.default.parallelism,spark.storage.memoryFraction,spark.streaming.concurrentJobs等。
2)JavaStreamingContext ssc=new JavaStreamingContext(runmode,appName,new Duration(3000),System.getenv(\"SPARK_HOME\"),JavaStreamingContext.jarOfClass(App.class));
3)kafkaParams.put(\"zookeeper.connect\",**);kafkaParams.put(\"group.id\",**);
4)JavaPairDStream<byte[],byte[]> messages=KafkaUtils.createStream(ssc,kafkaParams,topicMap,new
MulUtils.DelayCallback(format));
MulUtils.DelayCallback回调函数实现不同格式的kafka消息使用不同的解析逻辑。
(二)实现Transformations处理
messages.mapPartitions(new PairFlatMapFunction<Iterator<byte[]>,String,Double>(){@Override
public Iterable<Tuple2<String,Double>> call(Iterator<byte[]> iterator)throws Exception {
List<Tuple2<String,Double>> list=new ArrayList<Tuple2<String,Double>>();
todo();
list.add(new Tuple2<String,Double>(\"received\",Double.valueOf(data.size())));
return list;
} }).filter(new Function<Tuple2<String,Double>,Boolean>(){
@Override
public Boolean call(Tuple2<String,Double> stringDoubleTuple2){
if(stringDoubleTuple2==null){
return false;
}
return true;
}
}).reduceByKey(new Function2<Double,Double,Double>(){
@Override
public Double call(Double i1,Double i2){
return i1 + i2;
}
});
Todo方法的说明,实现计算逻辑,可将sql计算的实现逻辑在此封装。
除了mapPartitions这个api,还有其他可用的api,见spark相关文档。
(三)Actions操作
javaPairDStream.foreachRDD(new Function2<JavaPairRDD<String,List<Double>>,Time,Void>(){@Override
public Void call(JavaPairRDD<String,List<Double>> stringIntegerJavaPairRDD,Time time){
List<Tuple2<String,List<Double>>> list=stringIntegerJavaPairRDD.collect();
for(Tuple2 t:list){
todo();
}
return null;
}
});
Todo方法的说明,项目中是根据不同的指标与维度,通过Put或Increment写入hbase的结果表中。
除了foreachRDD这个api,还有其他可用的api,见spark相关文档。
第三章节 项目中遇到的问题与解决
1、block 还未进行计算就被ttl的设置而删除,导致block ** not found错误
System.setProperty(“spark.cleaner.ttl”,“3600”); //单位:秒
以上指自动清理一小时以前的RDD,如果你的计算延迟了一小时,就会报以下的错,并导致应用失败。
解决办法:
1)提高性能,以减少延迟的时间,使延迟一直在安全范围内。
2)对输入流进行限速,如每秒只允许多少条,这样,在生产者进行大量数据发送时,延迟还在可控范围之内
2、hbase读写速度过慢,此框架的实时计算严重依赖Hbase的性能。
一般是hbase配置未优化或进行了相应的后台操作,如Minor compact等。
解决办法:
1)对rowkey进行预分区表,提高写入速度。
2)多线程处理(使用队列或mappartition)
3)对hbase进行优化配置,提高写入速度,减少队列等待的时间。
4)对数据进行限流。
因为项目中除了使用Spark-streaming的reduceByKey的机制,还充分使用了hbase的计数器(Increment)。使用场景,即同一批(Duration)数据的汇总用Spark-streaming的reduceByKey实现,但不同批次的数据,则使用hbase的计数器Increment来实现,由于Increment 只处理整型的数值,所以数值指标都进行10000倍的处理,即可支持小数点4位的精度计算。
另外,实时计算过程中不仅仅只有事实表的实时数据流需要消费,还有维度表的实时数据流要实时的写入到维度表中,实时地参与到事实表的指标计算中。
下图能比较清晰的反映出“hbase写入速度”与“计算总延迟”的影响关系。
3、Waiting for application to be successfully unregistered。
一般是由于配置文件不正确引起的,比如线上环境与测试环境混淆了。
4、消费kafka数据时,已消费的offset未正常的写入zookeeper中或offset已正常的写入到zookeeper中但还
....
本站仅展示书籍部分内容
如有任何咨询
请加微信10090337咨询