java中Spark中将对象序列化存储到hdfs
java 中Spark中将对象序列化存储到hdfs
创新互联长期为千余家客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为酉阳土家族苗族企业提供专业的成都网站制作、成都网站建设,酉阳土家族苗族网站改版等技术服务。拥有10多年丰富建站经验和众多成功案例,为您定制开发。
摘要: Spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到HDFS, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从Hbase读取数据, 生成一个word2vec模型, 存储到hdfs.
废话不多说, 直接贴代码了. spark1.4 + hbase0.98
import org.apache.spark.storage.StorageLevel import scala.collection.JavaConverters._ import java.io.File import java.io.FileInputStream import java.io.FileOutputStream import java.io.ObjectInputStream import java.io.ObjectOutputStream import java.net.URI import java.util.Date import org.ansj.library.UserDefineLibrary import org.ansj.splitWord.analysis.NlpAnalysis import org.ansj.splitWord.analysis.ToAnalysis import org.apache.hadoop.fs.FSDataInputStream import org.apache.hadoop.fs.FSDataOutputStream import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.FileUtil import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName} import org.apache.hadoop.hbase.filter.FilterList import org.apache.hadoop.hbase.filter.PageFilter import org.apache.hadoop.hbase.filter.RegexStringComparator import org.apache.hadoop.hbase.filter.SingleColumnValueFilter import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.{Base64, Bytes} import com.feheadline.fespark.db.Neo4jManager import com.feheadline.fespark.util.Env import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd._ import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel} import scala.math.log import scala.io.Source object Word2VecDemo { def convertScanToString(scan: Scan) = { val proto = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray) } def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("Word2Vec Demo") sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.set("spark.kryoserializer.buffer", "256m") sparkConf.set("spark.kryoserializer.buffer.max","2046m") sparkConf.set("spark.akka.frameSize", "500") sparkConf.set("spark.rpc.askTimeout", "30") val sc = new SparkContext(sparkConf) val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper") hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled") val scan = new Scan() val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL) val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""") val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter( "data".getBytes, "article".getBytes, CompareOp.EQUAL, comp ) filterList.addFilter(articleFilter) filterList.addFilter(new PageFilter(100)) scan.setFilter(filterList) scan.setCaching(50) scan.setCacheBlocks(false) hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan)) val crawledRDD = sc.newAPIHadoopRDD( hbaseConf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result] ) val articlesRDD = crawledRDD.filter{ case (_,result) => { val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes)) content != null } } val wordsInDoc = articlesRDD.map{ case (_,result) => { val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes)) if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq else Seq("") } } val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty) val word2vec = new Word2Vec() val model = word2vec.fit(fitleredWordsInDoc) //---------------------------------------重点看这里------------------------------------------------------------- //将上面的模型存储到hdfs val hadoopConf = sc.hadoopConfiguration hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/") val fileSystem = FileSystem.get(hadoopConf) val path = new Path("/user/hadoop/data/mllib/word2vec-object") val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path))) oos.writeObject(model) oos.close //这里示例另外一个程序直接从hdfs读取序列化对象使用模型 val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path))) val sample_model = ois.readObject.asInstanceOf[Word2VecModel] /* * //你还可以将序列化文件从hdfs放到本地, scala程序使用模型 * import java.io._ * import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel} * val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object")) * val sample_model = ois.readObject.asInstanceOf[Word2VecModel] * ois.close */ //-------------------------------------------------------------------------------------------------------------- } }
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!
分享标题:java中Spark中将对象序列化存储到hdfs
网页网址:http://scpingwu.com/article/jpdohh.html