RDD sử dụng Spark: Khối xây dựng của Apache Spark



Blog về RDD sử dụng Spark này sẽ cung cấp cho bạn kiến ​​thức chi tiết và toàn diện về RDD, là đơn vị cơ bản của Spark & ​​Mức độ hữu ích của nó.

, Bản thân từ này đủ để tạo ra một tia sáng trong tâm trí mỗi kỹ sư Hadoop. ĐẾN n trong bộ nhớ công cụ xử lý nhanh như chớp trong tính toán cụm. So với MapReduce, việc chia sẻ dữ liệu trong bộ nhớ làm cho RDDs 10-100x nhanh hơn hơn chia sẻ mạng và đĩa và tất cả điều này là có thể nhờ RDD (Tập dữ liệu phân tán có khả năng phục hồi). Các điểm chính mà chúng ta tập trung hôm nay trong bài viết RDD sử dụng Spark này là:

Cần cho RDD?

Tại sao chúng ta cần RDD? -RDD sử dụng Spark





Thế giới đang phát triển với Khoa học dữ liệu vì sự tiến bộ trong . Thuật toán dựa trên hồi quy , , và chạy trên Phân phối Tính toán lặp lại ation thời trang bao gồm Tái sử dụng và Chia sẻ dữ liệu giữa nhiều đơn vị máy tính.

Truyền thống các kỹ thuật cần thiết một bộ nhớ Trung gian ổn định và Phân tán HDFS bao gồm các phép tính lặp đi lặp lại với các bản sao dữ liệu và tuần tự hóa dữ liệu, khiến quá trình này chậm hơn rất nhiều. Tìm một giải pháp chưa bao giờ là dễ dàng.



Đây là đâu RDDs (Tập dữ liệu phân tán có khả năng phục hồi) đến với bức tranh lớn.

RDD s rất dễ sử dụng và dễ tạo vì dữ liệu được nhập từ các nguồn dữ liệu và được đưa vào RDD. Hơn nữa, các hoạt động được áp dụng để xử lý chúng. Họ là một bộ sưu tập phân tán của bộ nhớ với các quyền như Chỉ đọc và quan trọng nhất, họ Chịu được lỗi .



Nếu có phân vùng dữ liệu của RDD là mất đi , nó có thể được tái tạo bằng cách áp dụng cùng một sự biến đổi hoạt động trên phân vùng bị mất đó trong dòng dõi , thay vì xử lý tất cả dữ liệu từ đầu. Cách tiếp cận này trong các tình huống thời gian thực có thể làm nên điều kỳ diệu xảy ra trong các tình huống mất dữ liệu hoặc khi hệ thống gặp sự cố.

RDD là gì?

RDD hoặc là ( Tập dữ liệu phân tán có khả năng phục hồi ) là một điều cơ bản cấu trúc dữ liệu trong Spark. Thời hạn Đàn hồi xác định khả năng tạo dữ liệu tự động hoặc dữ liệu lăn trở lại đến trạng thái ban đầu khi một thiên tai bất ngờ xảy ra với xác suất mất dữ liệu.

Dữ liệu được ghi vào RDD là phân vùng và được lưu trữ vào nhiều nút thực thi . Nếu một nút đang thực thi thất bại trong thời gian chạy, sau đó nó ngay lập tức được sao lưu từ nút thực thi tiếp theo . Đây là lý do tại sao RDD được coi là một loại cấu trúc dữ liệu tiên tiến khi so sánh với các cấu trúc dữ liệu truyền thống khác. RDD có thể lưu trữ dữ liệu có cấu trúc, phi cấu trúc và bán cấu trúc.

Hãy tiếp tục với RDD của chúng tôi bằng blog Spark và tìm hiểu về các tính năng độc đáo của RDD giúp nó có lợi thế hơn các loại cấu trúc dữ liệu khác.

Các tính năng của RDD

  • Trong trí nhớ (RAM) Tính toán : Khái niệm tính toán trong bộ nhớ đưa việc xử lý dữ liệu lên một giai đoạn nhanh hơn và hiệu quả hơn trong đó tổng thể hiệu suất của hệ thống là được nâng cấp.
  • L Đánh giá của anh ấy : Thuật ngữ Đánh giá lười biếng nói lên sự biến đổi được áp dụng cho dữ liệu trong RDD, nhưng đầu ra không được tạo. Thay vào đó, các phép biến đổi được áp dụng là đã đăng nhập.
  • Sự bền bỉ : Các RDD kết quả luôn có thể tái sử dụng.
  • Hoạt động hạt thô : Người dùng có thể áp dụng các phép biến đổi cho tất cả các phần tử trong tập dữ liệu thông qua bản đồ, bộ lọc hoặc là nhóm bởi các hoạt động.
  • Khả năng chịu lỗi : Nếu mất dữ liệu, hệ thống có thể cuộn lại đến nó trạng thái ban đầu bằng cách sử dụng sự biến đổi .
  • Bất biến : Dữ liệu được xác định, truy xuất hoặc tạo không thể được đã thay đổi khi nó được đăng nhập vào hệ thống. Trong trường hợp nếu bạn cần truy cập và sửa đổi RDD hiện có, bạn phải tạo một RDD mới bằng cách áp dụng một bộ Chuyển đổi hoạt động đối với RDD hiện tại hoặc trước đó.
  • Phân vùng : Nó là đơn vị quan trọng của chủ nghĩa song song trong Spark RDD. Theo mặc định, số lượng phân vùng được tạo dựa trên nguồn dữ liệu của bạn. Bạn thậm chí có thể quyết định số lượng phân vùng bạn muốn sử dụng phân vùng tùy chỉnh chức năng.

Tạo RDD bằng Spark

RDD có thể được tạo trong ba cách:

  1. Đọc dữ liệu từ bộ sưu tập song song
val PCRDD = spark.sparkContext.parallelize (Array ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. Đang áp dụng sự biến đổi trên RDD trước đó
val words = spark.sparkContext.parallelize (Seq ('Spark', 'là', 'a', 'rất', 'mạnh mẽ', 'ngôn ngữ')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Đọc dữ liệu từ lưu trữ ngoài hoặc các đường dẫn tệp như HDFS hoặc là HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Các hoạt động được thực hiện trên RDD:

Chủ yếu có hai loại hoạt động được thực hiện trên RDD, đó là:

  • Sự biến đổi
  • Hành động

Sự biến đổi : Các hoạt động chúng tôi áp dụng trên RDD cho bộ lọc, truy cập sửa đổi dữ liệu trong RDD mẹ để tạo RDD kế tiếp được gọi là sự biến đổi . RDD mới trả về một con trỏ đến RDD trước đó đảm bảo sự phụ thuộc giữa chúng.

Biến đổi là Đánh giá lười biếng, nói cách khác, các hoạt động được áp dụng trên RDD mà bạn đang làm việc sẽ được ghi lại nhưng không Thực thi. Hệ thống đưa ra một kết quả hoặc ngoại lệ sau khi kích hoạt Hoạt động .

Chúng ta có thể chia các phép biến đổi thành hai loại như sau:

làm thế nào để chuyển đổi double sang int
  • Biến đổi hẹp
  • Biến đổi rộng

Biến đổi hẹp Chúng tôi áp dụng các phép biến đổi hẹp đối với phân vùng đơn của RDD mẹ để tạo RDD mới vì dữ liệu cần thiết để xử lý RDD có sẵn trên một phân vùng duy nhất của ASD cha mẹ . Các ví dụ cho các phép biến đổi hẹp là:

  • bản đồ()
  • bộ lọc ()
  • Bản đồ phẳng()
  • vách ngăn()
  • mapPartitions ()

Biến đổi rộng: Chúng tôi áp dụng sự chuyển đổi rộng rãi trên nhiều phân vùng để tạo RDD mới. Dữ liệu cần thiết để xử lý RDD có sẵn trên nhiều phân vùng của ASD cha mẹ . Các ví dụ cho các phép biến đổi rộng là:

  • Giảm bằng()
  • liên hiệp()

Hành động : Các hành động hướng dẫn Apache Spark áp dụng tính toán và chuyển kết quả hoặc một ngoại lệ trở lại trình điều khiển RDD. Một số hành động bao gồm:

  • sưu tầm()
  • đếm()
  • lấy()
  • Đầu tiên()

Hãy để chúng tôi áp dụng thực tế các hoạt động trên RDD:

IPL (Giải Ngoại hạng Ấn Độ) là một giải đấu cricket với sự phát triển ở cấp độ đỉnh cao. Vì vậy, hôm nay chúng ta hãy bắt tay vào tập dữ liệu IPL và thực thi RDD của chúng ta bằng Spark.

  • Thứ nhất, hãy tải xuống dữ liệu đối sánh CSV của IPL. Sau khi tải xuống, nó bắt đầu giống như một tệp EXCEL với các hàng và cột.

Trong bước tiếp theo, chúng tôi kích hoạt tia lửa và tải tệp match.csv từ vị trí của nó, trong trường hợp của tôicsvvị trí tệp là “/User/edureka_566977/test/matches.csv”

Bây giờ chúng ta hãy bắt đầu với Chuyển đổi phần đầu tiên:

  • bản đồ():

Chúng tôi sử dụng Chuyển đổi bản đồ để áp dụng một hoạt động chuyển đổi cụ thể trên mọi phần tử của RDD. Ở đây chúng tôi tạo một RDD theo tên CKfile nơi lưu trữcsvtập tin. Chúng tôi sẽ tạo một RDD khác được gọi là Các tiểu bang để lưu trữ chi tiết thành phố .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println) val state = CKfile.map (_. split (',') (2)) state.collect (). foreach (println)

  • bộ lọc ():

Chuyển đổi bộ lọc, chính cái tên mô tả công dụng của nó. Chúng tôi sử dụng thao tác chuyển đổi này để lọc ra dữ liệu có chọn lọc từ tập hợp dữ liệu đã cho. Chúng tôi áp dụng hoạt động lọc tại đây để nhận hồ sơ về các trận đấu IPL trong năm 2017 và lưu trữ nó trong RDD fil.

val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • Bản đồ phẳng():

Chúng tôi áp dụng flatMap là một hoạt động chuyển đổi cho từng phần tử của RDD để tạo ra một RDD mới. Nó tương tự như chuyển đổi Bản đồ. ở đây chúng tôi áp dụngBản đồ phẳngđến nhổ các trận đấu của thành phố Hyderabad và lưu trữ dữ liệu vàofilRDDRDD.

val filRDD = fil.flatMap (line => line.split ('Hyderabad')). collect ()

  • vách ngăn():

Mọi dữ liệu chúng ta ghi vào RDD được chia thành một số phân vùng nhất định. Chúng tôi sử dụng phép biến đổi này để tìm số lượng phân vùng dữ liệu thực sự được chia thành.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

Chúng tôi coi MapPatitions là một giải pháp thay thế cho Map () vàcho mỗi() cùng với nhau. Chúng tôi sử dụng mapPartitions ở đây để tìm số hàng chúng tôi có trong RDD fil của chúng tôi.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • Giảm bằng():

Chúng tôi sử dụngGiảm bằng() trên Các cặp Khóa-Giá trị . Chúng tôi đã sử dụng chuyển đổi này trêncsvtệp để tìm trình phát với Man cao nhất trong các trận đấu .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • liên hiệp():

Cái tên đã giải thích tất cả, Chúng tôi sử dụng phép chuyển đổi liên hợp là để câu lạc bộ hai RDD với nhau . Ở đây chúng tôi đang tạo hai RDD cụ thể là fil và fil2. fil RDD chứa các bản ghi của các trận đấu IPL 2017 và fil2 RDD chứa bản ghi trận đấu IPL 2016.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Hãy để chúng tôi bắt đầu với Hoạt động phần nơi chúng tôi hiển thị sản lượng thực tế:

  • sưu tầm():

Thu thập là hành động mà chúng tôi sử dụng để hiển thị nội dung trong RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println)

  • đếm():

Đếmlà một hành động mà chúng tôi sử dụng để đếm số lượng hồ sơ hiện diện trong RDD.Đâychúng tôi đang sử dụng thao tác này để đếm tổng số bản ghi trong tệp match.csv của chúng tôi.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.count ()

  • lấy():

Thực hiện là một hoạt động Hành động tương tự như thu thập nhưng sự khác biệt duy nhất là nó có thể in bất kỳ số lượng hàng có chọn lọc theo yêu cầu của người dùng. Ở đây chúng tôi áp dụng mã sau để in mười báo cáo hàng đầu.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. lấy (10) .foreach (println)

  • Đầu tiên():

First () là một thao tác hành động tương tự như collect () và take ()được sử dụng để in kết quả đầu ra của báo cáo trên cùng Ở đây chúng tôi sử dụng phép toán đầu tiên () để tìm số trận đấu tối đa được chơi ở một thành phố cụ thể và chúng tôi lấy Mumbai làm đầu ra.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') val state = CKfile.map (_. split (',') (2)) val Scount = state.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Để làm cho quá trình học RDD của chúng tôi bằng cách sử dụng Spark, thú vị hơn nữa, tôi đã đưa ra một trường hợp sử dụng thú vị.

RDD sử dụng Spark: Trường hợp sử dụng Pokemon

  • Thứ nhất, Hãy để chúng tôi tải xuống tệp Pokemon.csv và tải nó vào spark-shell như chúng tôi đã làm với tệp Matches.csv.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Pokemons thực sự có sẵn với rất nhiều loại, Hãy để chúng tôi tìm một vài loại.

  • Xóa giản đồ khỏi tệp Pokemon.csv

Chúng tôi có thể không cần Lược đồ của tệp Pokemon.csv. Do đó, chúng tôi loại bỏ nó.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Tìm số của vách ngăn pokemon.csv của chúng tôi được phân phối thành.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Pokemon nước

Tìm kiếm số lượng pokemon nước

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Pokemon lửa

Tìm kiếm số lượng pokemon lửa

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Chúng tôi cũng có thể phát hiện dân số của một loại pokemon khác bằng cách sử dụng chức năng đếm
WaterRDD.count () FireRDD.count ()

  • Vì tôi thích trò chơi của chiến lược phòng thủ hãy để chúng tôi tìm pokemon với phòng thủ tối đa.
val defenceList = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

  • Chúng tôi biết tối đa giá trị sức mạnh quốc phòng nhưng chúng tôi không biết đó là pokemon nào. vì vậy, hãy để chúng tôi tìm thấy đó là pokemon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Đặt hàng [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Bây giờ chúng ta hãy phân loại các pokemon với Phòng thủ ít nhất
val minDefencePokemon = defenceList.distinction.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • Bây giờ chúng ta hãy xem Pokemon với một chiến lược phòng thủ ít hơn.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val defWithPokeHeaderame2 = NoHeaderame2 = NoHeaderame2 = NoHeaderame2 .map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Đặt hàng [Double ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Vì vậy, với điều này, chúng ta sẽ kết thúc phần RDD này bằng cách sử dụng bài viết Spark. Tôi hy vọng chúng tôi đã thắp sáng một chút kiến ​​thức của bạn về RDD, các tính năng của chúng và các loại hoạt động khác nhau có thể được thực hiện trên chúng.

Bài viết này dựa trên được thiết kế để chuẩn bị cho bạn cho Kỳ thi cấp chứng chỉ nhà phát triển Cloudera Hadoop và Spark (CCA175). Bạn sẽ có được kiến ​​thức chuyên sâu về Apache Spark và Spark Ecosystem, bao gồm Spark RDD, Spark SQL, Spark MLlib và Spark Streaming. Bạn sẽ có được kiến ​​thức toàn diện về ngôn ngữ Lập trình Scala, HDFS, Sqoop, Flume, Spark GraphX ​​và Hệ thống nhắn tin như Kafka.