Phân tích thời gian thực về các vị trí phổ biến của Uber bằng API Apache


Phan Gia Khánh
1 năm trước
Hữu ích 4 Chia sẻ Viết bình luận 0
Đã xem 4986

Theo Gartner , các thành phố thông minh sẽ sử dụng khoảng 1,39 tỷ ô tô, cảm biến IoT và thiết bị được kết nối vào năm 2020. Việc phân tích vị trí và mô hình hành vi trong các thành phố sẽ cho phép tối ưu hóa lưu lượng, quyết định quy hoạch tốt hơn và quảng cáo thông minh hơn. Ví dụ: phân tích dữ liệu xe GPS có thể cho phép các thành phố tối ưu hóa lưu lượng giao thông dựa trên thông tin giao thông thời gian thực. Các công ty viễn thông đang sử dụng dữ liệu vị trí của điện thoại di động để cung cấp thông tin chuyên sâu bằng cách xác định và dự đoán xu hướng hoạt động vị trí và mô hình của dân số trong một khu vực đô thị lớn. Ứng dụng của Machine Learning dữ liệu định vị địa lý đang được sử dụng trong viễn thông, du lịch, tiếp thị và sản xuất để xác định các mô hình và xu hướng cho các dịch vụ như khuyến nghị, phát hiện bất thường và gian lận.

Trong bài viết này, chúng tôi thảo luận về việc sử dụng Spark Structured Streaming trong một đường ống xử lý dữ liệu để phân tích cụm trên dữ liệu sự kiện của Uber để phát hiện và trực quan hóa các địa điểm phổ biến của Uber.

Chúng tôi bắt đầu với việc xem xét một số khái niệm Truyền có cấu trúc, sau đó khám phá trường hợp sử dụng từ đầu đến cuối. (Lưu ý mã trong ví dụ này không phải từ Uber, chỉ là dữ liệu.)

Truyền khái niệm

Xuất bản-Đăng ký Luồng sự kiện với MapR-ES

MapR-ES là một hệ thống phát trực tuyến sự kiện đăng ký phân phối cho phép các nhà sản xuất và người tiêu dùng trao đổi các sự kiện trong thời gian thực theo cách song song và chịu lỗi thông qua API Kafka của Apache.

Một luồng đại diện cho một chuỗi các sự kiện liên tục từ nhà sản xuất đến người tiêu dùng, trong đó một sự kiện được xác định là một cặp khóa-giá trị.

Chủ đề là một dòng logic của các sự kiện. Các chủ đề tổ chức các sự kiện thành các danh mục và tách các nhà sản xuất từ ​​người tiêu dùng. Các chủ đề được phân vùng cho thông lượng và khả năng mở rộng. MapR-ES có thể mở rộng đến mức thông lượng rất cao, dễ dàng phân phối hàng triệu tin nhắn mỗi giây bằng phần cứng rất khiêm tốn.

Bạn có thể nghĩ về một phân vùng như nhật ký sự kiện: các sự kiện mới được thêm vào cuối và được gán một số ID tuần tự được gọi là offset.

Giống như một hàng đợi, các sự kiện được gửi theo thứ tự chúng được nhận.

Tuy nhiên, không giống như một hàng đợi, các tin nhắn không bị xóa khi đọc. Chúng vẫn nằm trên phân vùng có sẵn cho người tiêu dùng khác. Tin nhắn, một khi được công bố, là bất biến và có thể được giữ lại mãi mãi.

Không xóa tin nhắn khi chúng được đọc cho phép hiệu suất cao ở quy mô và cũng để xử lý cùng một tin nhắn của những người tiêu dùng khác nhau cho các mục đích khác nhau, chẳng hạn như nhiều chế độ xem với sự kiên trì polyglot .

Bộ dữ liệu Spark, Khung dữ liệu, SQL

Spark Dataset là một tập hợp phân tán các đối tượng được gõ được phân vùng trên nhiều nút trong một cụm. Một bộ dữ liệu có thể được thao tác bằng cách sử dụng các phép biến đổi chức năng (map, FlatMap, bộ lọc, v.v.) và / hoặc Spark SQL. DataFrame là Bộ dữ liệu của các đối tượng Hàng và biểu thị một bảng dữ liệu với các hàng và cột.

Truyền phát cấu trúc Spark

Structured Streaming là một công cụ xử lý luồng có khả năng mở rộng và chịu lỗi được xây dựng trên công cụ Spark SQL. Structured Streaming cho phép bạn xem dữ liệu được xuất bản lên Kafka dưới dạng DataFrame không giới hạn và xử lý dữ liệu này với cùng DataFrame, Dataset và SQL API được sử dụng để xử lý hàng loạt.

Khi dữ liệu truyền phát tiếp tục đến, công cụ Spark SQL tăng dần và liên tục xử lý nó và cập nhật kết quả cuối cùng.

Luồng xử lý các sự kiện rất hữu ích cho ETL thời gian thực, lọc, chuyển đổi, tạo bộ đếm và tổng hợp, giá trị tương quan, làm giàu với các nguồn Dữ liệu khác hoặc Học máy, duy trì các tệp hoặc Cơ sở dữ liệu và xuất bản sang một chủ đề khác cho các đường ống.

Mã ví dụ sử dụng truyền phát cấu trúc Spark

Dưới đây là đường ống xử lý dữ liệu cho trường hợp sử dụng phân tích cụm này trên dữ liệu sự kiện của Uber để phát hiện các địa điểm đón phổ biến.

  1. Dữ liệu chuyến đi của Uber được xuất bản theo chủ đề MapR-ES bằng API Kafka .
  2. Một ứng dụng phát trực tuyến Spark đã đăng ký chủ đề:
    1. Nhập dữ liệu chuyến đi của Uber
    2. Sử dụng mô hình Machine Learning được triển khai để làm phong phú dữ liệu chuyến đi với ID cụm và vị trí cụm
    3. Lưu trữ dữ liệu được chuyển đổi và làm giàu trong MapR-DB JSON

Ví dụ dữ liệu ca sử dụng

Tập hợp dữ liệu ví dụ là Uber dữ liệu chuyến đi, mà bạn có thể đọc thêm về bài viết này trên cụm một nalysis của dữ liệu sự kiện Uber để phát hiện các địa điểm đón phổ biến sử dụng Spark  máy l thu nhập . Dữ liệu đến ở định dạng CSV, một ví dụ được hiển thị bên dưới, với tiêu đề:

ngày / thời gian, vĩ độ, kinh độ, cơ sở, dấu thời gian đảo ngược

2014-08-06T05: 29: 00.000-07: 00, 40.7276, -74.0033, B02682, 9223370505593280605

Chúng tôi làm phong phú dữ liệu này bằng ID cụm và vị trí sau đó chuyển đổi nó thành đối tượng JSON sau đây:

{  
"_id":0_922337050559328,
"dt":"2014-08-01 08:51:00",
"lat":40.6858,
"lon":-73.9923,
"base":"B02682",
"cid":0,
"clat":40.67462874550765,
"clon":-73.98667466026531  
}

Đang tải Mô hình K-Means

Lớp Spark KMeansModel được sử dụng để tải mô hình k- mean , được trang bị trên dữ liệu chuyến đi Uber lịch sử và sau đó được lưu vào cụm MapR-XD . Tiếp theo, một bộ dữ liệu ID và vị trí của Trung tâm cụm được tạo để tham gia sau với các địa điểm chuyến đi của Uber.

Bên dưới các trung tâm cụm được hiển thị trên bản đồ google trong Notebook Zeppelin:

Đọc dữ liệu từ chủ đề Kafka

Để đọc từ Kafka, trước tiên chúng ta phải chỉ định định dạng luồng, chủ đề và các tùy chọn bù. Để biết thêm thông tin về các tham số cấu hình, xem tài liệu Luồng MapR.

Điều này trả về một DataFrame với lược đồ sau:

Bước tiếp theo là phân tích và chuyển đổi cột giá trị nhị phân thành Tập dữ liệu của các đối tượng Uber.

Phân tích các giá trị tin nhắn thành một bộ dữ liệu của các đối tượng Uber

Một lớp trường hợp Scala Uber xác định lược đồ tương ứng với các bản ghi CSV. Hàm parseUber phân tích chuỗi giá trị được phân tách bằng dấu phẩy thành đối tượng Uber.

Trong mã dưới đây, chúng tôi đăng ký một hàm do người dùng xác định (UDF) để giải tuần tự hóa các chuỗi giá trị thông báo bằng cách sử dụng hàm parseUber. Sau đó, chúng tôi sử dụng UDF trong một biểu thức được chọn với Chuỗi Cast của giá trị cột df1, trả về một DataFrame của các đối tượng Uber.

Làm phong phú bộ dữ liệu của các đối tượng Uber với ID và vị trí trung tâm cụm

VectorAssembler được sử dụng để chuyển đổi và trả về một DataFrame mới với các cột tính năng kinh độ và vĩ độ trong một cột vectơ.

Mô hình k-mean được sử dụng để lấy các cụm từ các tính năng bằng phương thức biến đổi mô hình, trả về một DataFrame với ID cụm (dự đoán được gắn nhãn). Bộ dữ liệu kết quả này được kết hợp với Bộ dữ liệu trung tâm cụm được tạo trước đó (ccdf) để tạo Bộ dữ liệu của các đối tượng UberC, chứa thông tin chuyến đi kết hợp với ID và vị trí trung tâm cụm.

Việc chuyển đổi Dataset cuối cùng là thêm một ID duy nhất vào các đối tượng của chúng ta để lưu trữ trong MapR-DB JSON. Hàm createdUberwId tạo ra một IDcons hiện tại duy nhất của ID cụm và dấu thời gian đảo ngược. Do phân vùng MapR-DB và sắp xếp các hàng theo id, các hàng sẽ được sắp xếp theo ID cụm với lần đầu tiên gần nhất. Chức năng này được sử dụng với bản đồ để tạo Bộ dữ liệu của các đối tượng UberwId.

Viết vào một bộ nhớ chìm

Bây giờ chúng tôi đã thiết lập các phần làm giàu và biến đổi trên dữ liệu phát trực tuyến. Tiếp theo, với mục đích gỡ lỗi, chúng ta có thể bắt đầu nhận dữ liệu và lưu trữ dữ liệu trong bộ nhớ dưới dạng bảng trong bộ nhớ, sau đó có thể được truy vấn.

Đây là ví dụ đầu ra từ %sqlselect * from uber limit 10:

Bây giờ chúng ta có thể truy vấn dữ liệu phát trực tuyến để đặt câu hỏi như giờ và cụm nào có số lượng người nhận nhiều nhất? (Đầu ra được hiển thị trong sổ ghi chép Zeppelin.)

%sql
SELECT hour(uber.dt) as hr,cid, count(cid) as ct FROM uber group By hour(uber.dt), cid

Spark Streaming Viết lên MapR-DB

Trình kết nối MapR-DB cho Apache Spark cho phép bạn sử dụng MapR-DB làm phần chìm cho phát trực tuyến Spark Structured hoặc Spark Streaming.

Một trong những thách thức, khi bạn đang xử lý nhiều dữ liệu phát trực tuyến, bạn muốn lưu trữ nó ở đâu? Đối với ứng dụng này, MapR-DB JSON , cơ sở dữ liệu NoQuery hiệu năng cao, đã được chọn vì khả năng mở rộng và dễ sử dụng linh hoạt với JSON.

Tính linh hoạt của lược đồ JSON

MapR-DB hỗ trợ các tài liệu JSON như một kho lưu trữ dữ liệu gốc. MapR-DB giúp dễ dàng lưu trữ, truy vấn và xây dựng các ứng dụng với các tài liệu JSON. Trình kết nối Spark giúp dễ dàng xây dựng các đường ống theo thời gian thực hoặc theo đợt giữa dữ liệu JSON của bạn và MapR-DB và tận dụng Spark trong đường ống.

Với MapR-DB , một bảng được tự động phân vùng thành các máy tính bảng trên một cụm theo phạm vi khóa, cung cấp khả năng đọc và ghi nhanh và có thể mở rộng bằng phím hàng. Trong trường hợp sử dụng này, khóa hàng, _id, bao gồm ID cụm và dấu thời gian đảo ngược, do đó bảng được tự động phân vùng và sắp xếp theo ID cụm với lần đầu tiên gần đây nhất.

Trình kết nối Spark MapR-DB sử dụng API Spark DataSource . Kiến trúc trình kết nối có một đối tượng kết nối trong mỗi Spark Executor, cho phép ghi, đọc hoặc quét song song phân tán với máy tính bảng MapR-DB (phân vùng).

Viết thư cho MapR-DB chìm

Để viết Spark Stream vào MapR-DB, hãy chỉ định định dạng với các tham số tablePath, idFieldPath, createdTable, BulkMode và sampleSize . Ví dụ sau viết ra DataFrame cho MapR-DB và bắt đầu luồng.

Truy vấn JSON MapR-DB bằng Spark SQL

Trình kết nối Spark MapR-DB cho phép người dùng thực hiện các truy vấn và cập nhật SQL phức tạp trên MapR-DB bằng cách sử dụng Bộ dữ liệu Spark trong khi áp dụng các kỹ thuật quan trọng như trình chiếu và đẩy xuống bộ lọc, phân vùng tùy chỉnh và định vị dữ liệu.

Đang tải dữ liệu từ MapR-DB vào bộ dữ liệu Spark

Để tải dữ liệu từ bảng JSON MapR-DB vào Bộ dữ liệu Spark Spark của Apache, chúng tôi gọi phương thức loadFromMapRDB trên một đối tượng SparkSession, cung cấp bảng tên, lược đồ và lớp trường hợp. Điều này trả về một Bộ dữ liệu của các đối tượng UberwId:

Khám phá và truy vấn dữ liệu Uber bằng Spark SQL

Bây giờ chúng ta có thể truy vấn dữ liệu được truyền liên tục vào MapR-DB để đặt câu hỏi với ngôn ngữ dành riêng cho miền Spark DataFrames hoặc với Spark SQL.

Hiển thị các hàng đầu tiên (lưu ý cách các hàng được phân vùng và sắp xếp theo _id, bao gồm id cụm và dấu thời gian đảo ngược, dấu thời gian đảo ngược sắp xếp gần nhất trước tiên).

df.show

Có bao nhiêu xe bán tải xảy ra trong mỗi cụm?

df.groupBy("cid").count().orderBy(desc( "count")).show

hoặc với Spark SQL:

%sql SELECT COUNT(cid), cid FROM uber GROUP BY cid ORDER BY COUNT(cid) DESC

Với tập lệnh Angular và Google Maps trong sổ ghi chép Zeppelin, chúng tôi có thể hiển thị các điểm đánh dấu trung tâm cụm và 5000 địa điểm chuyến đi mới nhất trên bản đồ, cho thấy các địa điểm phổ biến nhất - 0, 3 và 9 - đều ở Manhattan.

Những giờ nào có số lượng xe bán tải nhiều nhất cho cụm 0?

df.filter($"\_id" <= "1")
  .select(hour($"dt").alias("hour"), $"cid")
  .groupBy("hour","cid").agg(count("cid")
  .alias("count"))show

Những giờ nào trong ngày và cụm nào có số lượng xe bán tải nhiều nhất?

%sql SELECT hour(uber.dt), cid, count(cid) FROM uber GROUP BY hour(uber.dt), cid

Số lượng cụm hiển thị cho các chuyến đi Uber theo thời gian.

%sql select cid, dt, count(cid) as count from uber group by dt, cid order by dt, cid limit 100

Tóm lược

Trong bài đăng này, bạn đã học cách sử dụng như sau:

  • Mô hình Spark Machine Learning trong ứng dụng Spark Structured Streaming
  • Truyền phát có cấu trúc Spark với MapR-ES để nhập tin nhắn bằng API Kafka
  • Truyền phát có cấu trúc Spark để duy trì MapR-DB để phân tích SQL có sẵn liên tục nhanh chóng

Tất cả các thành phần của kiến ​​trúc ca sử dụng mà chúng ta vừa thảo luận có thể chạy trên cùng một cụm với Nền tảng dữ liệu MapR.

Chạy mã

Có một số cách bạn có thể bắt đầu với Nền tảng dữ liệu MapR :

Tài nguyên bổ sung

Hữu ích 4 Chia sẻ Viết bình luận 0
Đã xem 4986