Helpex - Trao đổi & giúp đỡ Đăng nhập

Giới thiệu về SparkSession

Spark 2.0 là phiên bản chính tiếp theo của Apache Spark. Điều này mang lại những thay đổi lớn đối với mức độ trừu tượng cho Spark API và các thư viện. Trong bài đăng trên blog này, tôi sẽ thảo luận về SparkSession.

Giới thiệu về SparkSession

Trước khi vào SparkSession, hãy hiểu điểm vào. Điểm vào là nơi điều khiển được chuyển từ hệ điều hành sang chương trình được cung cấp. Trước 2.0, điểm vào Spark Core là  sparkContext. Apache Spark là một công cụ điện toán cụm mạnh mẽ, do đó nó được thiết kế để tính toán nhanh dữ liệu lớn.

sparkContext trong Apache Spark

Web

Một bước quan trọng đối với bất kỳ ứng dụng trình điều khiển Spark nào là tạo  sparkContext. Nó cho phép ứng dụng Spark của bạn truy cập vào cụm Spark với sự trợ giúp của người quản lý tài nguyên. Người quản lý tài nguyên có thể là một trong ba người sau:

  • SparkStandopol

  • YARN

  • Mesos Apache

Các chức năng của sparkContext trong Apache Spark

  • Nhận trạng thái hiện tại của ứng dụng Spark của bạn.

  • Đặt cấu hình.

  • Truy cập các dịch vụ khác nhau.

  • Hủy bỏ một công việc.

  • Hủy bỏ một giai đoạn.

  • Vệ sinh đóng cửa.

  • Đăng ký SparkListener.

  • Lập trình phân bổ động.

  • Truy cập RDD liên tục.

Trước Spark 2.0,  sparkContext đã được sử dụng như một kênh để truy cập tất cả các chức năng của Spark. Chương trình trình điều khiển Spark sử dụng  sparkContext để kết nối với cụm thông qua trình quản lý tài nguyên.

Cần có SparkConf để tạo các đối tượng sparkContext, lưu trữ các tham số cấu hình như appName (để xác định trình điều khiển Spark của bạn), số lõi và kích thước bộ nhớ của trình thực thi đang chạy trên nút worker.

Để sử dụng API SQL, Hive và phát trực tuyến, cần tạo bối cảnh riêng biệt.

Thí dụ:

val conf = new SparkConf()

.setMaster("local")

.setAppName("Spark Practice2")

val sc = new SparkContext(conf)

SparkSession – New entry-point of Spark

introduction-to-apache-spark-20-12-638

Như chúng ta đã biết, trong các phiên bản trước, sparkContext là điểm vào của Spark. Vì RDD là API chính, nó được tạo và thao tác bằng API ngữ cảnh. Đối với mọi API khác, chúng tôi cần sử dụng một bối cảnh khác nhau.

Đối với streamin, chúng tôi cần  streamingContext. Đối với SQL,  sqlContext, và cho Hive,  hiveContext. Nhưng khi API Data set và DataFrame đang trở thành API độc lập mới, chúng tôi cần xây dựng điểm nhập cảnh cho chúng. Vì vậy, trong Spark 2.0, chúng tôi có một bản dựng điểm nhập cảnh mới cho API Dataset và DataFrame được gọi là SparkSession.

jumpstart-on-apache-spark-22-on-databricks-40-638

Đây là sự kết hợp của SQLContext, HiveContext và streamingContext. Tất cả các API có sẵn trên các bối cảnh đó đều có sẵn trên SparkSession; SparkSession cũng có một SparkContext cho tính toán thực tế.

spark-sql-SessionState

 Bây giờ chúng ta có thể xem cách tạo SparkSession và tương tác với nó.

Tạo SparkSession

Đoạn mã sau có ích khi bạn muốn tạo SparkSession:

val spark = SparkSession.builder()

.master("local")

.appName("example of SparkSession")

.config("spark.some.config.option", "some-value")

.getOrCreate()

SparkSession.builder()

Phương pháp này được tạo để xây dựng SparkSession.

master(“local”)

Đặt URL chính Spark để kết nối với:

“local” to run locally

“local[4]” to run locally with 4 cores

“spark://master:7077” to run on a spark standalone cluster

appName( )

Đặt tên cho ứng dụng sẽ được hiển thị trong UI Web UI.

Nếu không có tên ứng dụng nào được đặt, tên được tạo ngẫu nhiên sẽ được sử dụng.

Cấu hình

Từ khóa này đặt tùy chọn cấu hình bằng phương pháp này được tự động truyền sang cả cấu hình 'SparkConf' và 'SparkSession'. Đối số của nó bao gồm các cặp khóa-giá trị.

Nhận được

Nhận được một SparkSession hiện có hoặc, nếu có một SparkSession luồng cục bộ hợp lệ, nó sẽ trả về cái đó. Sau đó, nó sẽ kiểm tra xem có SparkSession mặc định toàn cầu hợp lệ hay không và nếu có thì trả về cái đó. Nếu không có SparkSession toàn cầu hợp lệ tồn tại, phương thức tạo SparkSession mới và gán SparkSession mới được tạo làm mặc định toàn cầu.

Trong trường hợp SparkSession hiện tại được trả về, tùy chọn cấu hình được chỉ định trong trình tạo này sẽ được áp dụng cho SparkSession hiện có.

Ở trên tương tự như tạo SparkContext với cục bộ và tạo SQLContext bao bọc nó. Nếu bạn có thể cần tạo bối cảnh Hive, bạn có thể sử dụng mã dưới đây để tạo SparkSession với hỗ trợ Hive:

val spark = SparkSession.builder()

.master("local")

.master("local")

.appName("example of SparkSession")

.config("spark.some.config.option", "some-value")

.enableHiveSupport()

.getOrCreate()

enableHiveSupport trên nhà máy cho phép hỗ trợ Hive, tương tự như  HiveContextSparkSession đã tạo và chúng ta có thể sử dụng nó để đọc dữ liệu.

Đọc dữ liệu bằng SparkSession

SparkSession là điểm vào để đọc dữ liệu, tương tự như SQLContext.read cũ.

Đoạn mã dưới đây đang đọc dữ liệu từ CSV bằng SparkSession.

Trong Spark 2.0 trở đi, tốt hơn là sử dụng SparkSession vì nó cung cấp quyền truy cập vào tất cả các chức năng của Spark  sparkContext cung cấp. Ngoài ra, nó cung cấp API để hoạt động với DataFrames và DataSets

val df = spark.read.format("com.databricks.spark.csv")

.schema(customSchema)

.load("data.csv")

Chạy các truy vấn SQL

SparkSession có thể được sử dụng để thực hiện các truy vấn SQL trên dữ liệu, lấy lại kết quả dưới dạng DataFrame (tức là Bộ dữ liệu [ROW]).

display(spark.sql("Select * from TimeStamp"))
+--------------------+-----------+----------+-----+

| TimeStamp|Temperature| date| Time|

+--------------------+-----------+----------+-----+

|2010-02-25T05:42:...| 79.48|2010-02-25|05:42|

|2010-02-25T05:42:...| 59.27|2010-02-25|05:42|

|2010-02-25T05:42:...| 97.98|2010-02-25|05:42|

|2010-02-25T05:42:...| 91.41|2010-02-25|05:42|

|2010-02-25T05:42:...| 60.67|2010-02-25|05:42|

|2010-02-25T05:42:...| 61.41|2010-02-25|05:42|

|2010-02-25T05:42:...| 93.6|2010-02-25|05:42|

|2010-02-25T05:42:...| 50.32|2010-02-25|05:42|

|2010-02-25T05:42:...| 64.69|2010-02-25|05:42|

|2010-02-25T05:42:...| 78.57|2010-02-25|05:42|

|2010-02-25T05:42:...| 66.89|2010-02-25|05:42|

|2010-02-25T05:42:...| 62.87|2010-02-25|05:42|

|2010-02-25T05:42:...| 74.32|2010-02-25|05:42|

|2010-02-25T05:42:...| 96.55|2010-02-25|05:42|

|2010-02-25T05:42:...| 71.93|2010-02-25|05:42|

|2010-02-25T05:42:...| 79.17|2010-02-25|05:42|

|2010-02-25T05:42:...| 73.89|2010-02-25|05:42|

|2010-02-25T05:42:...| 80.97|2010-02-25|05:42|

|2010-02-25T05:42:...| 81.04|2010-02-25|05:42|

|2010-02-25T05:42:...| 53.05|2010-02-25|05:42|

+--------------------+-----------+----------+-----+

Lưu ý: Chỉ hiển thị 20 hàng đầu.

Làm việc với các tùy chọn cấu hình

SparkSession cũng có thể được sử dụng để đặt các tùy chọn cấu hình thời gian chạy có thể chuyển đổi hành vi tối ưu hóa hoặc hành vi I / O (tức là Hadoop).

Spark.conf.get(“Spark.Some.config”,”abcd”)

Spark.conf.get(“Spark.Some.config”)

Bộ tùy chọn cấu hình cũng có thể được sử dụng trong SQL bằng cách sử dụng thay thế biến.

%Sql select “${spark.some.config}”

Làm việc trực tiếp với siêu dữ liệu

SparkSession cũng bao gồm một  catalog phương thức chứa các phương thức để làm việc với metastore (tức là danh mục dữ liệu). Phương thức trả về DataSets để bạn có thể sử dụng cùng API dữ liệu để chơi với chúng.

Để có được danh sách các bảng trong cơ sở dữ liệu hiện tại, hãy sử dụng mã sau:

val tables =spark.catalog.listTables()

display(tables)



+----+--------+-----------+---------+-----------+

|name|database|description|tableType|isTemporary|

+----+--------+-----------+---------+-----------+

|Stu |default |null |Managed |false |

+----+--------+-----------+---------+-----------+

use the dataset API to filter on names



display(tables.filter(_.name contains “son”)))



+----+--------+-----------+---------+-----------+

|name|database|description|tableType|isTemporary|

+----+--------+-----------+---------+-----------+

|Stu |default |null |Managed |false |

+----+--------+-----------+---------+-----------+

Get the list of the column for a table



display(spark.catalog.listColumns(“smart”))



+-----+----------+----------+-----------+-------------+--------+

|name |description|dataType |nullable |isPartitioned|isbucket|

+-----+-----------+---------+-----------+-------------+--------+

|email|null |string |true |false |false |

+-----+-----------+---------+-----------+-------------+--------+

|iq |null |bigInt |true |false |false |

+-----+-----------+---------+-----------+-------------+--------+

Truy cập SparksContext

SparkSession.sparkContext trả về cơ sở  sparkContext, được sử dụng để tạo RDD cũng như quản lý tài nguyên cụm.

Spark.sparkContext

res17: org.apache.spark.SparkContext = org.apache.spark.SparkContext@2debe9ac
2 hữu ích 0 bình luận 9.2k xem chia sẻ

Có thể bạn quan tâm

loading