Đóng góp bởi Prithviraj Bose
Trong blog trước của tôi, tôi đã thảo luận về các chuyển đổi trạng thái bằng cách sử dụng khái niệm cửa sổ của Apache Spark Streaming. Bạn có thể đọc nó đây .
Trong bài đăng này, tôi sẽ thảo luận về các hoạt động trạng thái tích lũy trong Apache Spark Streaming. Nếu bạn chưa quen với Spark Streaming thì tôi thực sự khuyên bạn nên đọc blog trước của tôi để hiểu cách hoạt động của windowing.
Các loại chuyển đổi trạng thái trong Spark Streaming (Tiếp theo…)
> Theo dõi tích lũy
Chúng tôi đã sử dụng ReduceByKeyAndWindow (…) API để theo dõi trạng thái của các khóa, tuy nhiên việc mở cửa sổ gây ra những hạn chế cho một số trường hợp sử dụng nhất định. Điều gì sẽ xảy ra nếu chúng ta muốn tích lũy trạng thái của các phím trong suốt hơn là giới hạn nó trong một cửa sổ thời gian? Trong trường hợp đó, chúng tôi sẽ cần sử dụng updateStateByKey (…) NGỌN LỬA.
API này đã được giới thiệu trong Spark 1.3.0 và đã rất phổ biến. Tuy nhiên, API này có một số chi phí về hiệu suất, hiệu suất của nó suy giảm khi kích thước của các trạng thái tăng lên theo thời gian. Tôi đã viết một mẫu để hiển thị cách sử dụng API này. Bạn có thể tìm thấy mã đây .
Spark 1.6.0 giới thiệu một API mới mapWithState (…) giải quyết chi phí hiệu suất do updateStateByKey (…) . Trong blog này, tôi sẽ thảo luận về API cụ thể này bằng cách sử dụng một chương trình mẫu mà tôi đã viết. Bạn có thể tìm thấy mã đây .
Trước khi tôi đi sâu vào phần hướng dẫn về mã, hãy dành vài lời về cách kiểm tra. Đối với bất kỳ chuyển đổi trạng thái nào, việc kiểm tra là bắt buộc. Checkpointing là cơ chế khôi phục trạng thái của các phím trong trường hợp chương trình điều khiển bị lỗi. Khi trình điều khiển khởi động lại, trạng thái của các phím được khôi phục từ các tệp điểm kiểm tra. Vị trí điểm kiểm tra thường là HDFS hoặc Amazon S3 hoặc bất kỳ bộ lưu trữ đáng tin cậy nào. Trong khi kiểm tra mã, người ta cũng có thể lưu trữ trong hệ thống tệp cục bộ.
sử dụng trình lặp trong java
Trong chương trình mẫu, chúng tôi lắng nghe luồng văn bản socket trên host = localhost và cổng = 9999. Nó mã hóa luồng đến thành (từ, số lần xuất hiện) và theo dõi số từ bằng API 1.6.0 mapWithState (…) . Ngoài ra, các khóa không có cập nhật sẽ bị xóa bằng cách sử dụng StateSpec.timeout API. Chúng tôi đang kiểm tra trong HDFS và tần suất kiểm tra là 20 giây một lần.
Đầu tiên, hãy tạo một phiên Phát trực tuyến Spark,

Chúng tôi tạo ra một checkpointDir trong HDFS và sau đó gọi phương thức đối tượng getOrCreate (…) . Các getOrCreate API kiểm tra checkpointDir để xem liệu có bất kỳ trạng thái nào trước đó cần khôi phục hay không, nếu tồn tại, thì nó sẽ tạo lại phiên Spark Streaming và cập nhật trạng thái của khóa từ dữ liệu được lưu trữ trong tệp trước khi chuyển sang dữ liệu mới. Nếu không, nó sẽ tạo một phiên Spark Streaming mới.
Các getOrCreate lấy tên thư mục điểm kiểm tra và một hàm (mà chúng tôi đã đặt tên createFunc ) chữ ký của ai nên là () => StreamingContext .
Hãy kiểm tra mã bên trong createFunc .
Dòng # 2: Chúng tôi tạo bối cảnh phát trực tuyến với tên công việc thành “TestMapWithStateJob” và khoảng thời gian hàng loạt = 5 giây.
Dòng # 5: Đặt thư mục điểm kiểm tra.
Dòng # 8: Đặt đặc tả trạng thái bằng cách sử dụng lớp org.apache.streaming.StateSpec vật. Đầu tiên chúng tôi đặt hàm sẽ theo dõi trạng thái, sau đó chúng tôi đặt số lượng phân vùng cho các DStream kết quả sẽ được tạo trong các lần biến đổi tiếp theo. Cuối cùng, chúng tôi đặt thời gian chờ (thành 30 giây), trong đó nếu không nhận được bất kỳ bản cập nhật nào cho khóa trong 30 giây thì trạng thái khóa sẽ bị xóa.
Dòng 12 #: Thiết lập luồng ổ cắm, làm phẳng dữ liệu hàng loạt đến, tạo cặp khóa-giá trị, gọi mapWithState , đặt khoảng thời gian kiểm tra là 20 giây và cuối cùng in kết quả.
Khung Spark gọi th e createFunc cho mọi khóa có giá trị trước đó và trạng thái hiện tại. Chúng tôi tính tổng và cập nhật trạng thái với tổng tích lũy và cuối cùng chúng tôi trả về tổng cho khóa.
Nguồn Github -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala
Có một câu hỏi cho chúng tôi? Vui lòng đề cập đến nó trong phần bình luận và chúng tôi sẽ liên hệ lại với bạn.
Bài viết liên quan:
Bắt đầu với Apache Spark & Scala