博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop 2.5.1学习笔记8: 完整的程序模板
阅读量:6241 次
发布时间:2019-06-22

本文共 15681 字,大约阅读时间需要 52 分钟。

hot3.png

1 从Mongo中读取数据进入HDFS

2 分发HDFS作为背景数据

3 一个MR计算,输出为HDFS文件

4 将3的HDFS文件作为输入,通过HTTP写到远程数据库。

--------------------------------------------------------------------------

package com.dew.task;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.util.Tool;import com.mongodb.BasicDBObject;import com.mongodb.DB;import com.mongodb.DBCollection;import com.mongodb.DBCursor;import com.mongodb.MongoClient;import com.mongodb.ServerAddress;public class PullMongoDB extends Configured implements Tool {	@Override	public int run(String[] args) throws Exception {		if (null == args || args.length < 4) {			return 0;		}		List list = new ArrayList();		String[] array = args[0].split(":");		list.add(new ServerAddress(array[0], Integer.parseInt(array[1])));		MongoClient mongoClient = new MongoClient(list);		DB database = mongoClient.getDB("" + array[2]);		DBCollection collection = database.getCollection("" + array[3]);		//		BasicDBObject query = new BasicDBObject();		query.put("pkg", new BasicDBObject("$exists", true));		query.put("tags", new BasicDBObject("$exists", true));		BasicDBObject fields = new BasicDBObject();		fields.put("pkg", 1);		fields.put("tags", 1);		//write hdfs		Configuration conf = new Configuration();		FileSystem hdfs = FileSystem.get(conf);		FSDataOutputStream outHandler = hdfs.create(new Path("" + args[1]));				//write		DBCursor cursor = collection.find(query, fields);		while (cursor.hasNext()) {			BasicDBObject record = (BasicDBObject) cursor.next();			String pkg = record.getString("pkg");			ArrayList
als = (ArrayList
) record.get("tags"); String tags = ""; for (String s : als) { tags += " " + s.trim(); } tags = tags.trim(); String finalString = pkg + "\t" + tags + System.getProperty("line.separator"); outHandler.write(finalString.getBytes("UTF8")); } //remove handle outHandler.close(); cursor.close(); mongoClient.close(); return 0; }}

 

package com.dew.task;import java.io.BufferedReader;import java.io.FileReader;import java.io.IOException;import java.net.URI;import java.util.ArrayList;import java.util.Calendar;import java.util.Hashtable;import java.util.Set;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import com.mongodb.BasicDBObject;public class ComputeProfileHDFS extends Configured implements Tool {	// map	public static class MapClass extends Mapper
{ private Hashtable
joinData = new Hashtable
(); private void readFile(String file) { BufferedReader joinReader = null; String line = null; try { joinReader = new BufferedReader(new FileReader(file)); while ((line = joinReader.readLine()) != null) { String[] array = line.split("\t"); if (null == array || array.length < 2) continue; String pkg = array[0]; if (null == pkg || pkg.length() <= 0) continue; String tagStr = array[1]; if (null == tagStr) continue; tagStr = tagStr.trim(); if (tagStr.length() <= 0) continue; joinData.put(pkg, tagStr); } } catch (Exception e) { // XXX } finally { if (null != joinReader) try { joinReader.close(); } catch (IOException e) { e.printStackTrace(); } } } protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException { try { // Configuration conf = context.getConfiguration(); URI[] cacheFiles = context.getCacheFiles(); if (null != cacheFiles && cacheFiles.length > 0) { readFile(cacheFiles[0].getPath().toString()); } } catch (IOException e) { // xxx } } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // key neglected if (null == value) return; String content = value.toString(); if (null == content || content.trim().length() == 0) return; // split String[] strArray = content.split("\t"); if (null == strArray || strArray.length < 26) return; String cat = strArray[20].trim(); if (null != cat && cat.trim().equals("4")) { // app } else { return; } String sender = strArray[11].trim(); String receiver = strArray[13].trim(); String pkg = strArray[25].trim(); if (null == sender || sender.length() == 0 || null == receiver || receiver.length() == 0 || null == pkg || pkg.length() == 0) { return; } String tags = this.joinData.get(pkg); if (null == tags || tags.trim().length() == 0) { return; } // okay,output it context.write(new Text(sender), new Text(tags)); context.write(new Text(receiver), new Text(tags)); Counter c = context.getCounter("ComputerProfileHDFS", "MapWriteRecord"); c.increment(2); } } public static class Combiner extends Reducer
{ public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { String totalTags = ""; for (Text tag : values) { totalTags += " " + tag; } totalTags = totalTags.trim(); if (totalTags.length() <= 0) return; // okay context.write(key, new Text(totalTags)); Counter c = context.getCounter("ComputerProfileHDFS", "CombineWriteRecord"); c.increment(1); } } public static class Reduce extends Reducer
{ public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { String totalTags = ""; for (Text tag : values) { totalTags += " " + tag; } totalTags = totalTags.trim(); if (totalTags.length() <= 0) return; // okay,let us do it now! context.write(new Text(key.toString()), new Text(totalTags)); Counter c = context.getCounter("ComputerProfileHDFS", "ReduceWriteRecord"); c.increment(1); } } private static String[] getInputPathsByMonth(int year, int month) { // /user/flume/transfer2*/year=*/month=*/day=*/*.tsv String[] path = new String[3]; if (1 == month) { path[0] = "/user/f/transfer2*/year=" + (year - 1) + "/month=11/day=*/*.tsv"; path[1] = "/user/f/transfer2*/year=" + (year - 1) + "/month=12/day=*/*.tsv"; path[2] = "/user/f/transfer2*/year=" + (year) + "/month=01/day=*/*.tsv"; } else if (2 == month) { path[0] = "/user/f/transfer2*/year=" + (year - 1) + "/month=12/day=*/*.tsv"; path[1] = "/user/f/transfer2*/year=" + (year) + "/month=01/day=*/*.tsv"; path[2] = "/user/f/transfer2*/year=" + (year) + "/month=02/day=*/*.tsv"; } else { path[0] = "/user/f/transfer2*/year=" + (year) + "/month=" + ((month - 2) < 10 ? "0" : "") + (month - 2) + "/day=*/*.tsv"; path[1] = "/user/f/transfer2*/year=" + (year) + "/month=" + ((month - 1) < 10 ? "0" : "") + (month - 1) + "/day=*/*.tsv"; path[2] = "/user/f/transfer2*/year=" + (year) + "/month=" + ((month - 0) < 10 ? "0" : "") + (month - 0) + "/day=*/*.tsv"; } return path; } private void setInputPathByMonth(Job job) throws Exception { // FileInputFormat.setInputPaths(job, new Path(args[2])); Calendar cal = Calendar.getInstance(); int year = cal.get(Calendar.YEAR); int month = cal.get(Calendar.MONTH) + 1; String[] paths = getInputPathsByMonth(year, month); for (String path : paths) { FileInputFormat.addInputPaths(job, path); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf, "ComputeProfileHDFS"); // add distributed file job.addCacheFile(new Path(args[1]).toUri()); // prepare setInputPathByMonth(job); // FileInputFormat.setInputPaths(job, new Path(args[2])); FileOutputFormat.setOutputPath(job, new Path(args[3])); job.setJobName("ComputeProfileHDFS"); job.setJarByClass(ComputeProfileHDFS.class); job.setMapperClass(MapClass.class); job.setCombinerClass(Combiner.class); job.setReducerClass(Reduce.class);// job.setNumReduceTasks(0); job.setInputFormatClass(TextInputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); // execute int exitCode = job.waitForCompletion(true) ? 0 : 1; return exitCode; } public static String[] args = null; public static void main(String[] a) throws Exception { // delete all temp files Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); try { //fs.delete(new Path(args[1]), true); } catch (Exception e) { } try { //fs.delete(new Path(args[3]), true); } catch (Exception e) { } args = a; int res; //res = ToolRunner.run(new Configuration(), new PullMongoDB(), args); //res = ToolRunner.run(new Configuration(), new ComputeProfileHDFS(), // args); res = ToolRunner.run(new Configuration(), new HttpApiClient(), args); // delete all temp files try { //fs.delete(new Path(args[1]), true); } catch (Exception e) { } try { //fs.delete(new Path(args[3]), true); } catch (Exception e) { } System.exit(res); }}

 

package com.dew.task;【实际上这个程序应该在map中读取数据,在reduce中做HTTP请求,暂未做修改,读者可自行修改!】import java.io.IOException;import java.util.ArrayList;import java.util.Hashtable;import java.util.Set;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.RecordReader;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Mapper.Context;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.http.HttpResponse;import org.apache.http.HttpStatus;import org.apache.http.client.HttpClient;import org.apache.http.client.methods.HttpPost;import org.apache.http.client.methods.HttpPut;import org.apache.http.entity.StringEntity;import org.apache.http.impl.client.DefaultHttpClient;import org.apache.http.protocol.HTTP;import org.apache.http.util.EntityUtils;import com.dewmobile.task.ComputeProfileHDFS.Combiner;import com.dewmobile.task.ComputeProfileHDFS.MapClass;import com.dewmobile.task.ComputeProfileHDFS.Reduce;import com.mongodb.BasicDBObject;public class HttpApiClient extends Configured implements Tool {	private static ArrayList
persons = new ArrayList
(); private static int lock = 500; public static void writeRecord(String key, String[] arrays, Context context) { if (null == key || key.length() <= 0 || null == arrays || arrays.length <= 0) return; Hashtable
table = new Hashtable
(); for (String tag : arrays) { Integer number = table.get(tag); int count = (null == number ? 0 : number.intValue()); count++; table.put(tag, count); } // single person tag ArrayList
tagDocument = new ArrayList
(); Set
tagSet = table.keySet(); for (String tag : tagSet) { BasicDBObject doc = new BasicDBObject(); doc.put("n", tag); doc.put("#", table.get(tag).intValue()); tagDocument.add(doc); } // person document BasicDBObject person = new BasicDBObject(); person.put("_id", key); person.put("t", tagDocument); // add it persons.add(person); if (persons.size() >= lock) { submit(context); } } public static void submit(Context context) { context.getCounter("Record", "submit").increment(1); try { // submit String entityString = persons.toString(); if (null == entityString || entityString.length() <= 0) { return; } StringEntity se = new StringEntity(entityString, HTTP.UTF_8); String[] args = context.getConfiguration().getStrings(HTTP_URL); if (null == args) { context.getCounter("Record", "args_null").increment(1); return; } if (args.length < 1) { return; } context.getCounter("Record", "args_length_valid").increment(1); String httpTarget = args[0]; if (null == httpTarget) { return; } context.getCounter("Record", "httpTarget_OK").increment(1); String[] parameters = httpTarget.split(":"); if (null == parameters || parameters.length < 3) { return; } context.getCounter("Record", "parameters_valid").increment(1); String ip = parameters[0]; int port = Integer.parseInt(parameters[1]); String path = parameters[2]; context.getCounter("Record", "3_parameters_valid").increment(1); HttpPut request = new HttpPut("http://" + ip + ":" + port + "" + path); request.setHeader("Content-type", "application/json"); request.setHeader("User-Agent", "Mozilla"); request.setHeader("Connection", "Close"); request.setEntity(se); // HttpClient HttpClient client = new DefaultHttpClient(); HttpResponse response = client.execute(request); if (null == response) { context.getCounter("Record", "response_is_null").increment(1); return; } int code = response.getStatusLine().getStatusCode(); context.getCounter("Record", "" + code).increment(1); String respStr = EntityUtils.toString(response.getEntity(), HTTP.UTF_8); context.getCounter("Record", "respStr").increment(1); } catch (Exception e) { context.getCounter("Record", "exception-" + e.toString()) .increment(1); } finally { persons.clear(); } } // // // map public static class MapClass extends Mapper
{ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String content = value.toString(); String[] arrays = content.split("\t"); String k = arrays[0]; String[] tags = arrays[1].split(" "); HttpApiClient.writeRecord(k, tags, context); context.getCounter("Record", "write").increment(1); } public void cleanup(Context context) throws java.io.IOException, java.lang.InterruptedException { HttpApiClient.submit(context); } } @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration(); Job job = new Job(conf, "ComputeProfileHDFS_HttpApiClient"); FileInputFormat.addInputPath(job, new Path(args[3])); FileOutputFormat.setOutputPath(job, new Path("dev/null")); job.setJobName("ComputeProfileHDFS_HttpApiClient"); job.setJarByClass(HttpApiClient.class); job.setMapperClass(MapClass.class); // job.setCombinerClass(Combiner.class); // job.setReducerClass(Reduce.class);// job.setNumReduceTasks(0); job.setInputFormatClass(TextInputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(NullOutputFormat.class); if (null != args && null != args[4]) job.getConfiguration().set(HTTP_URL, args[4]); // execute int exitCode = job.waitForCompletion(true) ? 0 : 1; return 0; } private static String HTTP_URL = "http_url";}

 

然后在任务的计数器Counters页面观察结果即可。

转载于:https://my.oschina.net/qiangzigege/blog/346275

你可能感兴趣的文章
Mars说光场(3)— 光场采集
查看>>
kettle与各数据库建立链接的链接字符串
查看>>
Android--调用系统照相机拍照与摄像
查看>>
【OpenCV学习】利用HandVu进行手部动作识别分析
查看>>
Ubuntu下安装配置JDK1.7
查看>>
AngularJS快速入门指南15:API
查看>>
安装惠普M1136打印机一直处于“新设备已连接”状态怎么办?
查看>>
android88 录音机
查看>>
美国诚实签经验(最全集合)
查看>>
HttpContext.Current:异步模式下的疑似陷阱之源
查看>>
《Java与模式》- 创建型模式
查看>>
[Android]使用Kotlin开发Android(二)
查看>>
php将对象数组转成普通数组
查看>>
org.gradle.process.internal.ExecException: Process 'command 'C:\Program Files (x86)\Java\jdk1.7.0_7
查看>>
Python 中的 if __name__ == '__main__' 该如何理解(1)
查看>>
Qt之对话框设计——利用QPalette改变控件颜色
查看>>
#lspci | grep Eth
查看>>
Linux下svn常用指令【转】
查看>>
C#下2\10\16进制互转代码总汇
查看>>
人工智能和机器学习领域的一些有趣的开源项目
查看>>