4

Bài đăng này sẽ giúp bạn bắt đầu sử dụng  Apache Spark Streaming  với HBase. Spark Streaming là một phần mở rộng của Spark API lõi cho phép xử lý luồng dữ liệu liên tục.

Spark Streaming là gì?

Trước hết, truyền phát là gì? Luồng dữ liệu là một chuỗi dữ liệu không giới hạn đến liên tục. Truyền phân chia dữ liệu đầu vào liên tục chảy thành các đơn vị riêng biệt để xử lý. Xử lý luồng là xử lý độ trễ thấp và phân tích dữ liệu truyền phát. Spark Streaming là một phần mở rộng của Spark API lõi cho phép xử lý luồng dữ liệu trực tiếp có khả năng mở rộng, thông lượng cao, có khả năng chịu lỗi. Spark Streaming dành cho các trường hợp sử dụng đòi hỏi một lượng dữ liệu đáng kể để được xử lý nhanh chóng ngay khi đến. Ví dụ các trường hợp sử dụng thời gian thực là:

  • Giám sát trang web, giám sát mạng
  • Phát hiện gian lận
  • Nhấp chuột trên web
  • Quảng cáo
  • Cảm biến Internet of Things

Spark Streaming hỗ trợ các nguồn dữ liệu như thư mục HDFS, ổ cắm TCP, Kafka, Flume, Twitter, v.v. Luồng dữ liệu có thể được xử lý bằng APIS lõi của Spark, SQL DataFrames hoặc API học máy và có thể được lưu vào hệ thống tệp, HDFS, cơ sở dữ liệu hoặc bất kỳ nguồn dữ liệu nào cung cấp Hadoop OutputFormat.

Cách thức hoạt động của Spark Streaming

Spark Streaming chia luồng dữ liệu thành các lô X giây được gọi là Dòng, trong đó là một chuỗi các RDD. Ứng dụng Spark của bạn xử lý RDD bằng API Spark và kết quả được xử lý của các hoạt động RDD được trả về theo đợt.

Kiến trúc của ví dụ Ứng dụng phát trực tuyến

Mã ví dụ Spark Streaming thực hiện như sau:

  • Đọc dữ liệu trực tuyến.
  • Xử lý dữ liệu phát trực tuyến.
  • Ghi dữ liệu đã xử lý vào Bảng HBase.

Mã ví dụ Spark khác thực hiện như sau:

  • Đọc dữ liệu Bảng HBase được viết bởi mã phát trực tuyến
  • Tính toán thống kê tóm tắt hàng ngày
  • Ghi số liệu thống kê tóm tắt vào bảng HBase

Tập dữ liệu mẫu

Dữ liệu cảm biến bơm dầu xuất hiện dưới dạng các tệp giá trị được phân tách bằng dấu phẩy (csv) được thả trong một thư mục. Spark Streaming sẽ giám sát thư mục và xử lý bất kỳ tệp nào được tạo trong thư mục đó. (Như đã nêu trước đây, Spark Streaming hỗ trợ các nguồn dữ liệu phát trực tuyến khác nhau; để đơn giản, ví dụ này sẽ sử dụng các tệp.) Dưới đây là ví dụ về tệp csv với một số dữ liệu mẫu:

Chúng tôi sử dụng lớp trường hợp Scala để xác định lược đồ cảm biến tương ứng với các tệp csv dữ liệu cảm biến và hàm parseSensor để phân tích các giá trị được phân tách bằng dấu phẩy vào lớp trường hợp cảm biến.

Lược đồ bảng HBase

Lược đồ bảng HBase cho dữ liệu phát trực tuyến như sau:

  • Khóa hàng tổng hợp của ngày và tên tem thời gian
  • Cột Dữ liệu gia đình với các cột tương ứng với các trường dữ liệu đầu vào Cột Cảnh báo gia đình với các cột tương ứng với bất kỳ bộ lọc nào cho các giá trị đáng báo động. Lưu ý rằng các họ dữ liệu và cột cảnh báo có thể được đặt thành hết hạn giá trị sau một khoảng thời gian nhất định.

Lược đồ cho các bản tóm tắt thống kê hàng ngày như sau:

  • Khóa hàng tổng hợp của tên và ngày bơm
  • Số liệu thống kê gia đình cột
  • Các cột cho min, max và avg.

Hàm bên dưới chuyển đổi một đối tượng Cảm biến thành đối tượng HBase Put, được sử dụng để chèn một hàng vào HBase.

Cấu hình để ghi vào bảng HBase

Bạn có thể sử dụng lớp TableOutputFormat với Spark để ghi vào bảng HBase, tương tự như cách bạn sẽ ghi vào bảng HBase từ MapReduce. Dưới đây chúng tôi thiết lập cấu hình để ghi vào HBase bằng lớp TableOutputFormat.

Mã ví dụ Spark Streaming

Đây là các bước cơ bản cho mã Spark Streaming:

  1. Khởi tạo một đối tượng Spark StreamingContext.
  2. Áp dụng các phép biến đổi và các hoạt động đầu ra cho DStreams.
  3. Bắt đầu nhận dữ liệu và xử lý nó bằng streamingContext.start ().
  4. Đợi quá trình xử lý bị dừng sử dụng streamingContext.awaitTermination ().

Chúng ta sẽ đi qua từng bước với mã ứng dụng mẫu.

Đang khởi tạo StreamingContext

Đầu tiên chúng tôi tạo một StreamingContext , điểm vào chính cho chức năng phát trực tuyến, với khoảng thời gian 2 giây .

val sparkConf = new SparkConf().setAppName("HBaseStream")

//  create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sparkConf, Seconds(2))

Tiếp theo, chúng tôi sử dụng phương thức StreamingContext textFileStream (thư mục) để tạo luồng đầu vào theo dõi hệ thống tệp tương thích Hadoop cho các tệp mới và xử lý bất kỳ tệp nào được tạo trong thư mục đó.

// create a DStream that represents streaming data from a directory source
val linesDStream = ssc.textFileStream("/user/user01/stream")

Các dòngDStream đại diện cho luồng dữ liệu, mỗi bản ghi là một dòng văn bản. Bên trong DStream là một chuỗi các RDD, một RDD cho mỗi khoảng thời gian theo đợt.

Áp dụng chuyển đổi và hoạt động đầu ra cho DStreams

Tiếp theo, chúng tôi phân tích các dòng dữ liệu thành các đối tượng Cảm biến, với thao tác bản đồ trên các dòngDStream.

// parse each line of data in linesDStream  into sensor objects

val sensorDStream = linesDStream.map(Sensor.parseSensor) 

Hoạt động bản đồ áp dụng chức năng Sensor.parseSensor trên RDD trong các dòngDStream, dẫn đến RDD của các đối tượng Cảm biến.

Tiếp theo, chúng tôi sử dụng   phương thức Deachream foreachRDD để áp dụng xử lý cho từng RDD trong DStream này. Chúng tôi lọc các đối tượng cảm biến cho psi thấp để tạo cảnh báo, sau đó chúng tôi ghi cảm biến và dữ liệu cảnh báo vào HBase bằng cách chuyển đổi chúng thành Đặt đối tượng và sử dụng   phương thức PairRDDFifts saveAsHadoopDataset , đưa RDD sang bất kỳ hệ thống lưu trữ nào được Hadoop hỗ trợ bằng Hadoop Đối tượng cấu hình cho hệ thống lưu trữ đó (xem Cấu hình Hadoop cho HBase ở trên).

// for each RDD. performs function on each RDD in DStream
sensorRDD.foreachRDD { rdd =>
        // filter sensor data for low psi
     val alertRDD = rdd.filter(sensor => sensor.psi < 5.0)

      // convert sensor data to put object and write to HBase  Table CF data
      rdd.map(Sensor.convertToPut).saveAsHadoopDataset(jobConfig)

     // convert alert to put object write to HBase  Table CF alerts
     rdd.map(Sensor.convertToPutAlert).saveAsHadoopDataset(jobConfig)
}

Các đối tượng sensorRDD được chuyển đổi để đặt các đối tượng sau đó được ghi vào HBase.

Bắt đầu nhận dữ liệu

Để bắt đầu nhận dữ liệu, chúng tôi phải gọi start () một cách rõ ràng trên StreamingContext, sau đó gọi awaitTermination để chờ tính toán phát trực tuyến kết thúc.

    // Start the computation
    ssc.start()
    // Wait for the computation to terminate
    ssc.awaitTermination()

Spark Đọc từ và viết cho HBase

Bây giờ chúng tôi muốn đọc dữ liệu bảng cảm biến HBase, tính toán thống kê tóm tắt hàng ngày và viết các thống kê này cho họ cột thống kê.

Mã dưới đây đọc dữ liệu cột psi của bảng cảm biến bảng HBase, tính toán số liệu thống kê trên dữ liệu này bằng  StatCorer , sau đó ghi số liệu thống kê vào họ cột thống kê cảm biến.

     // configure HBase for reading 
    val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, HBaseSensorStream.tableName)
    // scan data column family psi column
    conf.set(TableInputFormat.SCAN_COLUMNS, "data:psi") 

// Load an RDD of (row key, row Result) tuples from the table
    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    // transform (row key, row Result) tuples into an RDD of Results
    val resultRDD = hBaseRDD.map(tuple => tuple._2)

    // transform into an RDD of (RowKey, ColumnValue)s , with Time removed from row key
    val keyValueRDD = resultRDD.
              map(result => (Bytes.toString(result.getRow()).
              split(" ")(0), Bytes.toDouble(result.value)))

    // group by rowkey , get statistics for column value
    val keyStatsRDD = keyValueRDD.
             groupByKey().
             mapValues(list => StatCounter(list))

    // convert rowkey, stats to put and write to hbase table stats column family
    keyStatsRDD.map { case (k, v) => convertToPut(k, v) }.saveAsHadoopDataset(jobConfig)

Sơ đồ bên dưới cho thấy rằng đầu ra từ newAPIHadoopRDD là một RDD của khóa hàng, cặp kết quả. PairRDDFifts saveAsHadoopDataset lưu các đối tượng Đặt vào HBase.

Phần mềm

Chạy ứng dụng

Bạn có thể chạy mã dưới dạng một ứng dụng độc lập như được mô tả trong hướng dẫn về  Bắt đầu với Spark trên MapR Sandbox .

Dưới đây là các bước được tóm tắt:

  1. Đăng nhập vào MapR Sandbox, như được giải thích trong  Bắt đầu với Spark trên MapR Sandbox , sử dụng userid user01, mật khẩu mapr.
  2. Xây dựng ứng dụng bằng maven.
  3. Sao chép tệp jar và tệp dữ liệu vào thư mục nhà / người dùng / người dùng hộp cát của bạn bằng scp.
  4. Chạy ứng dụng phát trực tuyến:
     /opt/mapr/spark/spark-<version>/bin/spark-submit --driver-class-path `hbase classpath` 
       --class examples.HBaseSensorStream sparkstreamhbaseapp-1.0.jar

  5. Sao chép tệp dữ liệu phát trực tuyến vào thư mục luồng:cp sensordata.csv /user/user01/stream/
  6. Đọc dữ liệu và tính toán số liệu thống kê cho một cột
       /opt/mapr/spark/spark-<version>/bin/spark-submit --driver-class-path `hbase classpath` 
        --class examples.HBaseReadWrite sparkstreamhbaseapp-1.0.jar

  7. Tính số liệu thống kê cho toàn bộ hàng
      /opt/mapr/spark/spark-<version>/bin/spark-submit --driver-class-path `hbase classpath` 
       --class examples.HBaseReadRowWriteStats sparkstreamhbaseapp-1.0.jar

Tóm lược

Điều này kết thúc hướng dẫn về Spark Streaming với HBase. Bạn có thể tìm thêm thông tin trong phần tài liệu tham khảo.

Tài liệu tham khảo và thêm thông tin:

|