欢迎光临
我们一直在努力

MapReduce将文本数据导入到HBase中

  1. 整体描述:将本地文件的数据整理之后导入到hbase中

  2. 在HBase中创建表

  3. 数据格式

  4. MapReduce程序

    map程序

    package com.hadoop.mapreduce.test.map;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class WordCountHBaseMapper extends Mapper<Object, Text, Text, Text>{
        
        public Text keyValue = new Text();
        public Text valueValue = new Text();
        //数据类型为:key@addressValue#ageValue#sexValue
        @Override
        protected void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String lineValue = value.toString();
            
            if(lineValue != null){
                String[] valuesArray = lineValue.split("@");
                context.write(new Text(valuesArray[0]), new Text(valuesArray[1]));
            }
        }
    }

    Reduce程序

    package com.hadoop.mapreduce.test.reduce;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    
    public class WordCountHBaseReduce extends TableReducer<Text, Text, NullWritable>{
    
        @Override
        protected void reduce(Text key, Iterable<Text> value, Context out)
                throws IOException, InterruptedException {
            String keyValue = key.toString();
            Iterator<Text> valueIterator = value.iterator();
            while(valueIterator.hasNext()){
                Text valueV = valueIterator.next();
                String[] valueArray = valueV.toString().split("#");
                
                Put putRow = new Put(keyValue.getBytes());
                putRow.add("address".getBytes(), "baseAddress".getBytes(), 
                            valueArray[0].getBytes());
                putRow.add("sex".getBytes(), "baseSex".getBytes(), 
                            valueArray[1].getBytes());
                putRow.add("age".getBytes(), "baseAge".getBytes(), 
                            valueArray[2].getBytes());
                
                out.write(NullWritable.get(), putRow);
            }
        }
    }

    主程序

    package com.hadoop.mapreduce.test;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    import com.hadoop.mapreduce.test.map.WordCountHBaseMapper;
    import com.hadoop.mapreduce.test.reduce.WordCountHBaseReduce;
    
    /**
     * 将hdfs上的内容读取到,并插入到hbase的表中,然后读取hbase表中的内容,将统计结果插入到hbase中 
     */
    public class WordCountHBase {
        public static void main(String args[]) throws IOException, 
            InterruptedException, ClassNotFoundException{
            
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "192.168.192.137"); 
            Job job = Job.getInstance(conf, "MapReduceHbaseJob");
            //各种class
            job.setJarByClass(WordCountHBase.class);
            job.setMapperClass(WordCountHBaseMapper.class);
            TableMapReduceUtil.initTableReducerJob("userInfo3", 
                   WordCountHBaseReduce.class, job);
            
            FileInputFormat.addInputPath(job, new Path(args[0]));
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

    结果:

  5. 注:如果运行的client没有hbase,需要在hadoop里面的lib中加入hbase的lib

赞(0)
【声明】:本博客不参与任何交易,也非中介,仅记录个人感兴趣的主机测评结果和优惠活动,内容均不作直接、间接、法定、约定的保证。访问本博客请务必遵守有关互联网的相关法律、规定与规则。一旦您访问本博客,即表示您已经知晓并接受了此声明通告。