全网整合营销服务商

电脑端+手机端+微信端=数据同步管理

免费咨询热线:400-708-3566

java 中Spark中将对象序列化存储到hdfs

java 中Spark中将对象序列化存储到hdfs

摘要: Spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到HDFS, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从Hbase读取数据, 生成一个word2vec模型, 存储到hdfs.

废话不多说, 直接贴代码了. spark1.4 + hbase0.98

import org.apache.spark.storage.StorageLevel
import scala.collection.JavaConverters._
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.net.URI
import java.util.Date
import org.ansj.library.UserDefineLibrary
import org.ansj.splitWord.analysis.NlpAnalysis
import org.ansj.splitWord.analysis.ToAnalysis
import org.apache.hadoop.fs.FSDataInputStream
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.PageFilter
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import com.feheadline.fespark.db.Neo4jManager
import com.feheadline.fespark.util.Env
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
import scala.math.log
import scala.io.Source

object Word2VecDemo {

 def convertScanToString(scan: Scan) = {
  val proto = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
 }

 def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setAppName("Word2Vec Demo")
  sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  sparkConf.set("spark.kryoserializer.buffer", "256m")
  sparkConf.set("spark.kryoserializer.buffer.max","2046m")
  sparkConf.set("spark.akka.frameSize", "500")
  sparkConf.set("spark.rpc.askTimeout", "30")
  

  val sc = new SparkContext(sparkConf)
  val hbaseConf = HBaseConfiguration.create()
  hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper")

  hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled")

  val scan = new Scan()
  val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL)
  
  val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""")
  
  val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter(
  "data".getBytes,
  "article".getBytes,
  CompareOp.EQUAL,
  comp
  )
  
  filterList.addFilter(articleFilter)
  filterList.addFilter(new PageFilter(100))
  
  scan.setFilter(filterList)
  scan.setCaching(50)
  scan.setCacheBlocks(false)
  hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))

  val crawledRDD = sc.newAPIHadoopRDD(
   hbaseConf,
   classOf[TableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result]
  )
 
  val articlesRDD = crawledRDD.filter{
   case (_,result) => {
     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
     content != null
   }
  }

  val wordsInDoc = articlesRDD.map{
   case (_,result) => {
     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
     if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq
     else Seq("")
   }
  }
  
  val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty)
  
  val word2vec = new Word2Vec()
  val model = word2vec.fit(fitleredWordsInDoc)
  
  //---------------------------------------重点看这里-------------------------------------------------------------
  //将上面的模型存储到hdfs
  val hadoopConf = sc.hadoopConfiguration
  hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/")
  val fileSystem = FileSystem.get(hadoopConf)
  val path = new Path("/user/hadoop/data/mllib/word2vec-object")
  val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))
  oos.writeObject(model)
  oos.close
  
  //这里示例另外一个程序直接从hdfs读取序列化对象使用模型
  val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path)))
  val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
  
  /*
  * //你还可以将序列化文件从hdfs放到本地, scala程序使用模型
  * import java.io._
  * import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
  * val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))
  * val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
  * ois.close
  */
  //--------------------------------------------------------------------------------------------------------------
 }
}


感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!


# java  # 中Spark中将对象序列化存储到hdfs  # 在IntelliJ IDEA中创建和运行java/scala/spark程序的方法  # java-spark中各种常用算子的写法示例  # Java和scala实现 Spark RDD转换成DataFrame的两种方法小结  # 详解Java编写并运行spark应用程序的方法  # 如何使用Java调用Spark集群  # 序列化  # 尤其是  # 你还  # 希望能  # 这样一个  # 另外一个  # 谢谢大家  # 多说  # 中经  # CompareOp  # SingleColumnValueFilter  # PageFilter  # RegexStringComparator  # CompareFilter  # ProtobufUtil  # Bytes  # mapreduce  # TableInputFormat  # protobuf  # Path 


相关文章: javascript基本数据类型及类型检测常用方法小结  建站主机空间推荐 高性价比配置与快速部署方案解析  开封网站制作公司,网络用语开封是什么意思?  如何在西部数码注册域名并快速搭建网站?  行程制作网站有哪些,第三方机票电子行程单怎么开?  SAX解析器是什么,它与DOM在处理大型XML文件时有何不同?  整人网站在线制作软件,整蛊网站退不出去必须要打我是白痴才能出去?  油猴 教程,油猴搜脚本为什么会网页无法显示?  简单实现Android验证码  深圳防火门网站制作公司,深圳中天明防火门怎么编码?  网站制作公司广州有几家,广州尚艺美发学校网站是多少?  阿里云网站搭建费用解析:服务器价格与建站成本优化指南  建站10G流量真的够用吗?如何应对访问高峰?  建站VPS能否同时实现高效与安全翻墙?  免费视频制作网站,更新又快又好的免费电影网站?  香港网站服务器数量如何影响SEO优化效果?  企业微网站怎么做,公司网站和公众号有什么区别?  学生网站制作软件,一个12岁的学生写小说,应该去什么样的网站?  香港代理服务器配置指南:高匿IP选择、跨境加速与SEO优化技巧  建站之星安装后如何自定义网站颜色与字体?  网页设计与网站制作内容,怎样注册网站?  品牌网站制作公司有哪些,买正品品牌一般去哪个网站买?  济南网站建设制作公司,室内设计网站一般都有哪些功能?  江苏网站制作公司有哪些,江苏书法考级官方网站?  建站之星微信建站一键生成小程序+多端营销系统  如何在新浪SAE免费搭建个人博客?  公司门户网站制作流程,华为官网怎么做?  西安专业网站制作公司有哪些,陕西省建行官方网站?  rsync同步时出现rsync: failed to set times on “xxxx”: Operation not permitted  如何通过主机屋免费建站教程十分钟搭建网站?  相亲简历制作网站推荐大全,新相亲大会主持人小萍萍资料?  Dapper的Execute方法的返回值是什么意思 Dapper Execute返回值详解  如何访问已购建站主机并解决登录问题?  nginx修改上传文件大小限制的方法  官网自助建站平台指南:在线制作、快速建站与模板选择全解析  音乐网站服务器如何优化API响应速度?  上海网站制作网站建设公司,建筑电工证网上查询系统入口?  如何快速使用云服务器搭建个人网站?  公司网站建设制作费用,想建设一个属于自己的企业网站,该如何去做?  如何高效利用亚马逊云主机搭建企业网站?  建站之星收费标准详解:套餐费用及年费价格表一览  如何零基础在云服务器搭建WordPress站点?  如何制作网站标识牌,动态网站如何制作(教程)?  巅云智能建站系统:可视化拖拽+多端适配+免费模板一键生成  建站主机系统SEO优化与智能配置核心关键词操作指南  道歉网站制作流程,世纪佳缘致歉小吴事件,相亲网站身份信息伪造该如何稽查?  天津个人网站制作公司,天津网约车驾驶员从业资格证官网?  c# Task.Yield 的作用是什么 它和Task.Delay(1)有区别吗  武汉网站制作费用多少,在武汉武昌,建面100平方左右的房子,想装暖气片,费用大概是多少啊?  C++如何编写函数模板?(泛型编程入门) 

您的项目需求

*请认真填写需求信息,我们会在24小时内与您取得联系。