Giới thiệu về API cốt lõi của Apache Spark (Phần II)


Đỗ Thanh Tuyền
10 tháng trước
Hữu ích 5 Chia sẻ Viết bình luận 0
Đã xem 4412

Xin chào các lập trình viên, tôi hy vọng tất cả các bạn đang làm tốt.

Trong bài viết trước của tôi,  Giới thiệu về API lõi của Apache Spark (Phần I) , tôi đã đề cập đến phương pháp RDD thuần túy và như tôi đã hứa sẽ giải thích các hàm hoặc phương thức liên quan đến ghép RDD với nhiều đoạn mã mẫu. Vì vậy, đây là!

Để tạo cặp RDD, vui lòng tham khảo bài viết trước của tôi. Với sự trợ giúp của hướng dẫn đó, bạn có thể tạo cặp RDD (ở đây, tôi giả sử rằng orderPairRdd là cặp RDD của tôi có khóa được gắn nhãn  order_id và đặt giá trị là  order).

  • Ghép nối các API lõi RDD

    • orderPairRdd.join (otherRDD)

      • Phương thức này trả về một RDD chứa tất cả các cặp phần tử có khóa khớp với nhau  otherRDD. Phép nối mặc định hoạt động giống như phép nối bên trong trong SQL. 

      ordersPairRdd.first()
      # (u'1', u'1,2013-07-25 00:00:00.0,11599,CLOSED')
      orderItemsPairRDD.first()
      # (u'1', u'1,1,957,1,299.98,299.98')
      ordersJoinOrderItems = ordersPairRdd.join(orderItemsPairRDD)
      # (u'1', (u'1,2013-07-25 00:00:00.0,11599,CLOSED', u'1,1,957,1,299.98,299.98'))


    • orderPairRdd.leftOuterJoin (otherRDD)

      • Phương pháp này thực hiện nối ngoài bên trái trên  ordersPairRdd và  otherRDD.

      • Giả sử  ordersPairRdd có (k, v) và  otherRDD có (k, w) thì RDD kết quả sẽ có (k, (v, w)) và (k, (v, Không)) nếu không có phần tử nào  otherRDD có khóa gọi là k.

      ordersLeftJoinOrderItems = ordersPairRdd.leftOuterJoin(orderItemsPairRDD)
      ordersLeftJoinOrderItems.first()
      # (u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'87999,35236,365,1,59.99,59.99'))


    • orderPairRdd.rightOuterJoin (otherRDD)

      • Phương pháp này thực hiện một tham gia bên ngoài bên phải trên  ordersPairRdd và otherRDD.

      • Giả sử  ordersPairRdd có (k, v) và  otherRDD có (k, w) thì RDD kết quả sẽ có (k, (v, w)) và (k, (Không, V)) nếu không có phần tử nào  ordersPairRdd có khóa gọi là k.

      ordersRightJoinOrderItems = ordersPairRdd.rightOuterJoin(orderItemsPairRDD)
      ordersRightJoinOrderItems.first()
      # (u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'87999,35236,365,1,59.99,59.99'))


    • orderPairRdd.fullOuterJoin (otherRDD)

      • Phương pháp này thực hiện một tham gia bên ngoài đầy đủ trên  ordersPairRdd và otherRDD.

      • Giả sử  ordersPairRdd có (k, v) và  otherRDDcó (k, w) thì RDD kết quả sẽ có (k, (v, w)) và (k, (v, Không)) nếu không có phần tử nào  otherRDD có khóa k và (k , (Không có, V)) nếu không có phần tử nào  inordersPairRdd có khóa gọi là k.

      ordersFullJoinOrderItems = ordersPairRdd.fullOuterJoin(orderItemsPairRDD)
      ordersFullJoinOrderItems.first()
      # (u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'87999,35236,365,1,59.99,59.99'))


    • orderPairRdd.countByKey ()

      • Phương pháp này được sử dụng để đếm số lượng phần tử cho mỗi khóa và sau đó trả kết quả cho chủ dưới dạng từ điển.

      rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
      rdd.countByKey().items()
      # [('a', 2), ('b', 1)]


    • orderPairRdd.groupByKey ()

      • Phương thức này nhóm giá trị cho mỗi khóa trong RDD thành một khóa duy nhất.

      • Trong trường hợp tổng hợp trên mỗi khóa, sử dụng  reduceByKey và  aggregateByKeysẽ cung cấp hiệu suất tốt hơn nhiều so với  groupByKey.

      ordersPairRdd.groupByKey().mapValues(list).collect()
      #[(u'18065', [u'18065,2013-11-13 00:00:00.0,5684,PROCESSING']), (u'34148', [u'34148,2014-02-20 00:00:00.0,10198,COMPLETE'])]


    • orderPairRdd.reduceByKey (func)

      • Phương pháp này sẽ hợp nhất các giá trị cho mỗi khóa với sự trợ giúp của hàm giảm kết hợp.

      pairRdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
      test = pairRdd.reduceByKey(lambda x, y: x+y)
      test.collect()
      # [('a', 2), ('b', 1)]


    • orderPairRdd.aggregateByKey (zeroValue, seqFunc, combFunc)

      • Theo hướng dẫn lập trình Spark , hàm này tổng hợp các giá trị của từng khóa, sử dụng các hàm kết hợp đã cho và "giá trị 0" trung tính. Hàm này có thể trả về một loại kết quả khác, U, sau đó là loại giá trị trong RDD, V. Do đó, chúng ta cần một thao tác để hợp nhất V thành một U và một thao tác để hợp nhất hai Us. Hoạt động trước được sử dụng để hợp nhất các giá trị trong một phân vùng và hoạt động sau được sử dụng để hợp nhất các giá trị giữa các phân vùng. Để tránh cấp phát bộ nhớ, cả hai hàm này đều được phép sửa đổi và trả về đối số đầu tiên của chúng thay vì tạo một U mới.

      pairRdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
      pairRdd.aggregateByKey((0),lambda x,y: (x+y),lambda r1,r2: (r1+r2)).collect()
      # [('a', 2), ('b', 1)]


    • orderPairRdd.combineByKey (createdCombiner, mergeValue, mergeCombiner)

      • Phương pháp này được sử dụng để kết hợp các giá trị cho mỗi khóa bằng cách sử dụng một tập hợp tùy chỉnh của hàm tổng hợp.

      • Ví dụ: nếu RDD thuộc loại (k, v) thì có thể hàm này trả về RDD loại (k, w) trong đó v và w có thể là một loại khác.

        • createCombiner - mà biến av thành w.

        • mergeValue - để hợp nhất v vào w.

        • mergeCombiner - để kết hợp hai 'w thành một.

        x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
        def add(a, b):
        return a + str(b)
        
        sorted(x.combineByKey(str, add, add).collect())
        # [('a', '11'), ('b', '1')]


    • orderPairRdd.sortByKey (tăng dần = Truthy)

      • Như tên cho thấy, chức năng này được sử dụng để sắp xếp cặp RDD dựa trên khóa.

      tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
      sc.parallelize(tmp).sortByKey().first()
      # ('1', 3)
      
      tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
      tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
      sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
      # [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]


Đây là tất cả các hàm hoặc phương thức cho cặp RDD, tức là với cặp key: value. Trong bài đăng tiếp theo của tôi, tôi sẽ giải thích các hàm hoặc phương thức liên quan đến ghép RDD với nhiều đoạn mã ví dụ.

Cảm ơn đã đọc và chúc mừng mã hóa!

Hữu ích 5 Chia sẻ Viết bình luận 0
Đã xem 4412