Just a journey


• Spark


对超过单机内存容量的(K, V)对进行TopN排序,在键值K是否重复的情况

Spark Implementation: Unique Keys

1. 建立和Spark的连接

/*create a connection to the Spark master*/
JavaSparkContext ctx = new JavaSparkContext();

2. 从HDFS读取数据并建立RDD

JavaRDD<String> lines = ctx.textFile(inputPath, 1);

3. 从初始RDD转化为pair RDD

 JavaPairRDD<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
         public Tuple2<String,Integer> call(String s) {
            String[] tokens = s.split(","); // cat7,234
            return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));

使用mapToPair函数用于创建pair RDD.

4. 在每个partition中建立本地TopN

      * <U> JavaRDD<U> mapPartitions(FlatMapFunction<java.util.Iterator<T>,U> f)
      * Return a new RDD by applying a function to each partition of this RDD.
      * mapPartitions(func)	Similar to map, but runs separately on each partition   
      * (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when   
      * running on an RDD of type T.
      * public interface FlatMapFunction<T,R> extends java.io.Serializable
      * A function that returns zero or more output records from each input record.
      * java.lang.Iterable<R> call(T t) throws java.lang.Exception
      * */
      // STEP-5: create a local top-10
      JavaRDD<SortedMap<Integer, String>> partitions = pairs.mapPartitions(
         new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() {
             public Iterable<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
             SortedMap<Integer, String> top10 = new TreeMap<Integer, String>();
             while (iter.hasNext()) {
                Tuple2<String,Integer> tuple = iter.next();
                top10.put(tuple._2, tuple._1);
                // keep only top N 
                if (top10.size() > 10) {
             //return Collections.singletonList(top10).iterator();
             return Collections.singletonList(top10);

根据《Learning Spark》的4.5节Data Patitioning的论述:

Spark’s partitioning is available on all RDDs of key/value pairs, and causes the system to group elements based on a function of each key. Although Spark does not give explicit control of which worker node each key goes to (partly because the system is designed to work even if specific nodes fail), it lets the program ensure that a set of keys will appear together on some node.


5. 通过对各个分区的TopN处理,获得最终的TopN

  SortedMap<Integer, String> finaltop10 = new TreeMap<Integer, String>();
      List<SortedMap<Integer, String>> alltop10 = partitions.collect();
      for (SortedMap<Integer, String> localtop10 : alltop10) {
          //System.out.println(tuple._1 + ": " + tuple._2);
          //weight/count = tuple._1
          //catname/URL = tuple._2
          for (Map.Entry<Integer, String> entry : localtop10.entrySet()) {
              //System.out.println(entry.getKey() + "--" + entry.getValue());
              finaltop10.put(entry.getKey(), entry.getValue());
              // keep only top 10 
              if (finaltop10.size() > 10) {


Spark Implementation: Nonunique Keys


   JavaPairRDD<String, Integer> uniqueKeys = kv.reduceByKey(new Function2<Integer, Integer, Integer>() {
         public Integer call(Integer i1, Integer i2) {
            return i1 + i2;


comments powered by Disqus