1 从Mongo中读取数据进入HDFS
2 分发HDFS作为背景数据
3 一个MR计算,输出为HDFS文件
4 将3的HDFS文件作为输入,通过HTTP写到远程数据库。
--------------------------------------------------------------------------
package com.dew.task;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.util.Tool;import com.mongodb.BasicDBObject;import com.mongodb.DB;import com.mongodb.DBCollection;import com.mongodb.DBCursor;import com.mongodb.MongoClient;import com.mongodb.ServerAddress;public class PullMongoDB extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (null == args || args.length < 4) { return 0; } List list = new ArrayList(); String[] array = args[0].split(":"); list.add(new ServerAddress(array[0], Integer.parseInt(array[1]))); MongoClient mongoClient = new MongoClient(list); DB database = mongoClient.getDB("" + array[2]); DBCollection collection = database.getCollection("" + array[3]); // BasicDBObject query = new BasicDBObject(); query.put("pkg", new BasicDBObject("$exists", true)); query.put("tags", new BasicDBObject("$exists", true)); BasicDBObject fields = new BasicDBObject(); fields.put("pkg", 1); fields.put("tags", 1); //write hdfs Configuration conf = new Configuration(); FileSystem hdfs = FileSystem.get(conf); FSDataOutputStream outHandler = hdfs.create(new Path("" + args[1])); //write DBCursor cursor = collection.find(query, fields); while (cursor.hasNext()) { BasicDBObject record = (BasicDBObject) cursor.next(); String pkg = record.getString("pkg"); ArrayListals = (ArrayList ) record.get("tags"); String tags = ""; for (String s : als) { tags += " " + s.trim(); } tags = tags.trim(); String finalString = pkg + "\t" + tags + System.getProperty("line.separator"); outHandler.write(finalString.getBytes("UTF8")); } //remove handle outHandler.close(); cursor.close(); mongoClient.close(); return 0; }}
package com.dew.task;import java.io.BufferedReader;import java.io.FileReader;import java.io.IOException;import java.net.URI;import java.util.ArrayList;import java.util.Calendar;import java.util.Hashtable;import java.util.Set;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import com.mongodb.BasicDBObject;public class ComputeProfileHDFS extends Configured implements Tool { // map public static class MapClass extends Mapper{ private Hashtable joinData = new Hashtable (); private void readFile(String file) { BufferedReader joinReader = null; String line = null; try { joinReader = new BufferedReader(new FileReader(file)); while ((line = joinReader.readLine()) != null) { String[] array = line.split("\t"); if (null == array || array.length < 2) continue; String pkg = array[0]; if (null == pkg || pkg.length() <= 0) continue; String tagStr = array[1]; if (null == tagStr) continue; tagStr = tagStr.trim(); if (tagStr.length() <= 0) continue; joinData.put(pkg, tagStr); } } catch (Exception e) { // XXX } finally { if (null != joinReader) try { joinReader.close(); } catch (IOException e) { e.printStackTrace(); } } } protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException { try { // Configuration conf = context.getConfiguration(); URI[] cacheFiles = context.getCacheFiles(); if (null != cacheFiles && cacheFiles.length > 0) { readFile(cacheFiles[0].getPath().toString()); } } catch (IOException e) { // xxx } } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // key neglected if (null == value) return; String content = value.toString(); if (null == content || content.trim().length() == 0) return; // split String[] strArray = content.split("\t"); if (null == strArray || strArray.length < 26) return; String cat = strArray[20].trim(); if (null != cat && cat.trim().equals("4")) { // app } else { return; } String sender = strArray[11].trim(); String receiver = strArray[13].trim(); String pkg = strArray[25].trim(); if (null == sender || sender.length() == 0 || null == receiver || receiver.length() == 0 || null == pkg || pkg.length() == 0) { return; } String tags = this.joinData.get(pkg); if (null == tags || tags.trim().length() == 0) { return; } // okay,output it context.write(new Text(sender), new Text(tags)); context.write(new Text(receiver), new Text(tags)); Counter c = context.getCounter("ComputerProfileHDFS", "MapWriteRecord"); c.increment(2); } } public static class Combiner extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { String totalTags = ""; for (Text tag : values) { totalTags += " " + tag; } totalTags = totalTags.trim(); if (totalTags.length() <= 0) return; // okay context.write(key, new Text(totalTags)); Counter c = context.getCounter("ComputerProfileHDFS", "CombineWriteRecord"); c.increment(1); } } public static class Reduce extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { String totalTags = ""; for (Text tag : values) { totalTags += " " + tag; } totalTags = totalTags.trim(); if (totalTags.length() <= 0) return; // okay,let us do it now! context.write(new Text(key.toString()), new Text(totalTags)); Counter c = context.getCounter("ComputerProfileHDFS", "ReduceWriteRecord"); c.increment(1); } } private static String[] getInputPathsByMonth(int year, int month) { // /user/flume/transfer2*/year=*/month=*/day=*/*.tsv String[] path = new String[3]; if (1 == month) { path[0] = "/user/f/transfer2*/year=" + (year - 1) + "/month=11/day=*/*.tsv"; path[1] = "/user/f/transfer2*/year=" + (year - 1) + "/month=12/day=*/*.tsv"; path[2] = "/user/f/transfer2*/year=" + (year) + "/month=01/day=*/*.tsv"; } else if (2 == month) { path[0] = "/user/f/transfer2*/year=" + (year - 1) + "/month=12/day=*/*.tsv"; path[1] = "/user/f/transfer2*/year=" + (year) + "/month=01/day=*/*.tsv"; path[2] = "/user/f/transfer2*/year=" + (year) + "/month=02/day=*/*.tsv"; } else { path[0] = "/user/f/transfer2*/year=" + (year) + "/month=" + ((month - 2) < 10 ? "0" : "") + (month - 2) + "/day=*/*.tsv"; path[1] = "/user/f/transfer2*/year=" + (year) + "/month=" + ((month - 1) < 10 ? "0" : "") + (month - 1) + "/day=*/*.tsv"; path[2] = "/user/f/transfer2*/year=" + (year) + "/month=" + ((month - 0) < 10 ? "0" : "") + (month - 0) + "/day=*/*.tsv"; } return path; } private void setInputPathByMonth(Job job) throws Exception { // FileInputFormat.setInputPaths(job, new Path(args[2])); Calendar cal = Calendar.getInstance(); int year = cal.get(Calendar.YEAR); int month = cal.get(Calendar.MONTH) + 1; String[] paths = getInputPathsByMonth(year, month); for (String path : paths) { FileInputFormat.addInputPaths(job, path); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf, "ComputeProfileHDFS"); // add distributed file job.addCacheFile(new Path(args[1]).toUri()); // prepare setInputPathByMonth(job); // FileInputFormat.setInputPaths(job, new Path(args[2])); FileOutputFormat.setOutputPath(job, new Path(args[3])); job.setJobName("ComputeProfileHDFS"); job.setJarByClass(ComputeProfileHDFS.class); job.setMapperClass(MapClass.class); job.setCombinerClass(Combiner.class); job.setReducerClass(Reduce.class);// job.setNumReduceTasks(0); job.setInputFormatClass(TextInputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); // execute int exitCode = job.waitForCompletion(true) ? 0 : 1; return exitCode; } public static String[] args = null; public static void main(String[] a) throws Exception { // delete all temp files Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); try { //fs.delete(new Path(args[1]), true); } catch (Exception e) { } try { //fs.delete(new Path(args[3]), true); } catch (Exception e) { } args = a; int res; //res = ToolRunner.run(new Configuration(), new PullMongoDB(), args); //res = ToolRunner.run(new Configuration(), new ComputeProfileHDFS(), // args); res = ToolRunner.run(new Configuration(), new HttpApiClient(), args); // delete all temp files try { //fs.delete(new Path(args[1]), true); } catch (Exception e) { } try { //fs.delete(new Path(args[3]), true); } catch (Exception e) { } System.exit(res); }}
package com.dew.task;【实际上这个程序应该在map中读取数据,在reduce中做HTTP请求,暂未做修改,读者可自行修改!】import java.io.IOException;import java.util.ArrayList;import java.util.Hashtable;import java.util.Set;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.RecordReader;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Mapper.Context;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.http.HttpResponse;import org.apache.http.HttpStatus;import org.apache.http.client.HttpClient;import org.apache.http.client.methods.HttpPost;import org.apache.http.client.methods.HttpPut;import org.apache.http.entity.StringEntity;import org.apache.http.impl.client.DefaultHttpClient;import org.apache.http.protocol.HTTP;import org.apache.http.util.EntityUtils;import com.dewmobile.task.ComputeProfileHDFS.Combiner;import com.dewmobile.task.ComputeProfileHDFS.MapClass;import com.dewmobile.task.ComputeProfileHDFS.Reduce;import com.mongodb.BasicDBObject;public class HttpApiClient extends Configured implements Tool { private static ArrayListpersons = new ArrayList (); private static int lock = 500; public static void writeRecord(String key, String[] arrays, Context context) { if (null == key || key.length() <= 0 || null == arrays || arrays.length <= 0) return; Hashtable table = new Hashtable (); for (String tag : arrays) { Integer number = table.get(tag); int count = (null == number ? 0 : number.intValue()); count++; table.put(tag, count); } // single person tag ArrayList tagDocument = new ArrayList (); Set tagSet = table.keySet(); for (String tag : tagSet) { BasicDBObject doc = new BasicDBObject(); doc.put("n", tag); doc.put("#", table.get(tag).intValue()); tagDocument.add(doc); } // person document BasicDBObject person = new BasicDBObject(); person.put("_id", key); person.put("t", tagDocument); // add it persons.add(person); if (persons.size() >= lock) { submit(context); } } public static void submit(Context context) { context.getCounter("Record", "submit").increment(1); try { // submit String entityString = persons.toString(); if (null == entityString || entityString.length() <= 0) { return; } StringEntity se = new StringEntity(entityString, HTTP.UTF_8); String[] args = context.getConfiguration().getStrings(HTTP_URL); if (null == args) { context.getCounter("Record", "args_null").increment(1); return; } if (args.length < 1) { return; } context.getCounter("Record", "args_length_valid").increment(1); String httpTarget = args[0]; if (null == httpTarget) { return; } context.getCounter("Record", "httpTarget_OK").increment(1); String[] parameters = httpTarget.split(":"); if (null == parameters || parameters.length < 3) { return; } context.getCounter("Record", "parameters_valid").increment(1); String ip = parameters[0]; int port = Integer.parseInt(parameters[1]); String path = parameters[2]; context.getCounter("Record", "3_parameters_valid").increment(1); HttpPut request = new HttpPut("http://" + ip + ":" + port + "" + path); request.setHeader("Content-type", "application/json"); request.setHeader("User-Agent", "Mozilla"); request.setHeader("Connection", "Close"); request.setEntity(se); // HttpClient HttpClient client = new DefaultHttpClient(); HttpResponse response = client.execute(request); if (null == response) { context.getCounter("Record", "response_is_null").increment(1); return; } int code = response.getStatusLine().getStatusCode(); context.getCounter("Record", "" + code).increment(1); String respStr = EntityUtils.toString(response.getEntity(), HTTP.UTF_8); context.getCounter("Record", "respStr").increment(1); } catch (Exception e) { context.getCounter("Record", "exception-" + e.toString()) .increment(1); } finally { persons.clear(); } } // // // map public static class MapClass extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String content = value.toString(); String[] arrays = content.split("\t"); String k = arrays[0]; String[] tags = arrays[1].split(" "); HttpApiClient.writeRecord(k, tags, context); context.getCounter("Record", "write").increment(1); } public void cleanup(Context context) throws java.io.IOException, java.lang.InterruptedException { HttpApiClient.submit(context); } } @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration(); Job job = new Job(conf, "ComputeProfileHDFS_HttpApiClient"); FileInputFormat.addInputPath(job, new Path(args[3])); FileOutputFormat.setOutputPath(job, new Path("dev/null")); job.setJobName("ComputeProfileHDFS_HttpApiClient"); job.setJarByClass(HttpApiClient.class); job.setMapperClass(MapClass.class); // job.setCombinerClass(Combiner.class); // job.setReducerClass(Reduce.class);// job.setNumReduceTasks(0); job.setInputFormatClass(TextInputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(NullOutputFormat.class); if (null != args && null != args[4]) job.getConfiguration().set(HTTP_URL, args[4]); // execute int exitCode = job.waitForCompletion(true) ? 0 : 1; return 0; } private static String HTTP_URL = "http_url";}
然后在任务的计数器Counters页面观察结果即可。