RDD باستخدام Spark: The Building Block of Apache Spark



ستزودك هذه المدونة على RDD باستخدام Spark بمعرفة مفصلة وشاملة عن RDD ، وهي الوحدة الأساسية لـ Spark ومدى فائدتها.

و الكلمة نفسها كافية لتوليد شرارة في ذهن كل مهندس Hadoop. إلى ن في الذاكرة أداة المعالجة وهو سريع البرق في الحوسبة العنقودية. بالمقارنة مع MapReduce ، فإن مشاركة البيانات في الذاكرة تجعل RDDs 10-100 ضعف أسرع من مشاركة الشبكة والقرص وكل هذا ممكن بسبب RDDs (مجموعات البيانات الموزعة المرنة). النقاط الرئيسية التي نركز عليها اليوم في هذا RDD باستخدام مقالة Spark هي:

طريقة التحميل الزائد والتجاوز في جافا

تحتاج إلى RDDs؟

لماذا نحتاج RDD؟ -RDD باستخدام Spark





العالم يتطور مع و علم البيانات بسبب التقدم في . الخوارزميات مرتكز على تراجع و و و الذي يعمل على وزعت الحساب التكراري ation الموضة التي تتضمن إعادة استخدام البيانات ومشاركتها بين وحدات الحوسبة المتعددة.

التقليدية تقنيات تحتاج إلى تخزين مستقر وسيط وموزع مثل HDFS تتضمن عمليات حسابية متكررة مع تكرار البيانات وتسلسلها ، مما جعل العملية أبطأ كثيرًا. لم يكن إيجاد حل سهلاً أبدًا.



هذا هو المكان RDDs (مجموعات البيانات الموزعة المرنة) يأتي إلى الصورة الكبيرة.

RDD s سهلة الاستخدام وسهل الإنشاء حيث يتم استيراد البيانات من مصادر البيانات وإسقاطها في RDDs. علاوة على ذلك ، يتم تطبيق العمليات لمعالجتها. هم مجموعة موزعة من الذاكرة مع أذونات يقرأ فقط والأهم من ذلك ، هم مستحمل للخطأ .



لو اي قسم البيانات من RDD هو ضائع ، يمكن تجديدها بتطبيق نفس الشيء تحويل العملية على هذا القسم المفقود بتنسيق النسب ، بدلاً من معالجة جميع البيانات من البداية. هذا النوع من النهج في سيناريوهات الوقت الفعلي يمكن أن يجعل المعجزات تحدث في حالات فقدان البيانات أو عندما يكون النظام معطلاً.

ما هي RDDs؟

RDD أو ( مجموعة البيانات الموزعة المرنة ) أساسي هيكل البيانات في سبارك. المصطلح مرن يحدد القدرة على توليد البيانات أو البيانات تلقائيًا التراجع الى الحالة الأصلية عندما تحدث كارثة غير متوقعة مع احتمال فقدان البيانات.

البيانات المكتوبة في RDDs هي مقسمة وتخزينها في عدة عقد قابلة للتنفيذ . إذا كانت العقدة المنفذة فشل في وقت التشغيل ، ثم يحصل على الفور على النسخ الاحتياطي من ملف العقدة التالية القابلة للتنفيذ . هذا هو السبب في اعتبار RDDs نوعًا متقدمًا من هياكل البيانات عند مقارنتها بهياكل البيانات التقليدية الأخرى. يمكن لـ RDDs تخزين البيانات المهيكلة وغير المهيكلة وشبه المنظمة.

دعنا نمضي قدمًا في RDD باستخدام مدونة Spark والتعرف على الميزات الفريدة لـ RDDs التي تمنحها ميزة على الأنواع الأخرى من هياكل البيانات.

ميزات RDD

  • في الذاكرة (الرامات 'الذاكرة العشوائية في الهواتف والحواسيب) الحسابات : مفهوم الحساب داخل الذاكرة يأخذ معالجة البيانات إلى مرحلة أسرع وأكثر كفاءة حيث بشكل عام أداء من النظام ترقية.
  • إل تقييمه : مصطلح التقييم الكسول يقول التحولات يتم تطبيقها على البيانات الموجودة في RDD ، ولكن لم يتم إنشاء الإخراج. بدلا من ذلك ، فإن التحولات المطبقة هي تسجيل.
  • إصرار : RDDs الناتجة دائمًا قابلة لإعادة الاستخدام.
  • عمليات الحبيبات الخشنة : يمكن للمستخدم تطبيق التحويلات على جميع العناصر في مجموعات البيانات من خلال خريطة، منقي أو مجموعة من عمليات.
  • مستحمل للخطأ : إذا كان هناك فقدان للبيانات ، يمكن للنظام دحر لها الحالة الأصلية باستخدام ملف التحولات .
  • ثبات : لا يمكن تحديد البيانات أو استرجاعها أو إنشاؤها تغير بمجرد تسجيل الدخول إلى النظام. في حالة ما إذا كنت بحاجة إلى الوصول إلى RDD الحالي وتعديله ، يجب عليك إنشاء RDD جديد من خلال تطبيق مجموعة من تحويل يعمل على RDD الحالي أو السابق.
  • التقسيم : انها وحدة حاسمة من التوازي في Spark RDD. بشكل افتراضي ، يعتمد عدد الأقسام التي تم إنشاؤها على مصدر البيانات. يمكنك حتى تحديد عدد الأقسام التي ترغب في استخدامها قسم مخصص المهام.

إنشاء RDD باستخدام Spark

يمكن إنشاء RDDs بتنسيق ثلاث طرق:

  1. قراءة البيانات من مجموعات متوازية
val PCRDD = spark.sparkContext.parallelize (Array ('Mon'، 'Tue'، 'Wed'، 'Thu'، 'Fri'، 'Sat')، 2) نتيجة ValRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. التقديم تحويل على RDDs السابقة
كلمات val = spark.sparkContext.parallelize (Seq ('Spark'، 'is'، 'a'، 'very'، 'strong'، 'language')) val wordpair = words.map (w = (w.charAt ( 0)، w)) wordpair.collect (). foreach (println)
  1. قراءة البيانات من تخزين خارجي أو مسارات الملفات مثل HDFS أو HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

العمليات التي أجريت على RDDs:

هناك نوعان رئيسيان من العمليات التي يتم إجراؤها على أجهزة RDD ، وهما:

  • التحولات
  • أجراءات

التحولات : ال عمليات نطبق على RDDs ل التصفية والوصول و تعديل البيانات الموجودة في RDD الأصل لإنشاء ملف RDD المتتالية يسمى تحويل . يقوم RDD الجديد بإرجاع مؤشر إلى RDD السابق لضمان التبعية بينهما.

التحولات التقييمات الكسولة ، بمعنى آخر ، سيتم تسجيل العمليات المطبقة على RDD التي تعمل بها ولكن لن يتم تسجيلها أعدم. يقوم النظام بإلقاء نتيجة أو استثناء بعد تشغيل ملف عمل .

يمكننا تقسيم التحولات إلى نوعين على النحو التالي:

  • تحولات ضيقة
  • تحولات واسعة

تحولات ضيقة نطبق تحولات ضيقة على أ قسم واحد من RDD الأصل لإنشاء RDD جديد حيث أن البيانات المطلوبة لمعالجة RDD متوفرة على قسم واحد من الوالد ASD . أمثلة التحولات الضيقة هي:

  • خريطة()
  • منقي()
  • خريطة مسطحة()
  • تقسيم()
  • mapPartitions ()

تحولات واسعة: نطبق التحول الواسع على أقسام متعددة لتوليد RDD جديد. البيانات المطلوبة لمعالجة RDD متاحة على أقسام متعددة من الوالد ASD . أمثلة التحولات الواسعة هي:

  • تقليل بواسطة ()
  • اتحاد()

أجراءات : الإجراءات توجه Apache Spark لتطبيقها حساب وتمرير النتيجة أو استثناء إلى برنامج التشغيل RDD. قليل من الإجراءات تشمل:

  • تجميع()
  • عدد ()
  • يأخذ()
  • أول()

دعونا نطبق عمليًا العمليات على RDDs:

IPL (الدوري الهندي الممتاز) هي بطولة كريكيت حيث تصل إلى أعلى مستوياتها. لذلك ، دعونا اليوم نضع أيدينا على مجموعة بيانات IPL وننفذ RDD الخاص بنا باستخدام Spark.

  • أولا، فلنقم بتنزيل بيانات مطابقة CSV لـ IPL. بعد تنزيله ، يبدأ في الظهور كملف EXCEL به صفوف وأعمدة.

في الخطوة التالية ، نقوم بتشغيل الشرارة وتحميل ملف match.csv من موقعه ، في حالتيcsvموقع الملف هو '/user/edureka_566977/test/matches.csv'

الآن دعونا نبدأ مع تحويل الجزء الأول:

  • خريطة():

نحن نستخدم تحويل الخريطة لتطبيق عملية تحويل محددة على كل عنصر من عناصر RDD. هنا نقوم بإنشاء RDD بالاسم CKfile حيث يتم تخزين ملفاتcsvملف. سننشئ RDD آخر يسمى الدول تخزين تفاصيل المدينة .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println) val States = CKfile.map (_. split ('،') (2)) States.collect (). foreach (println)

  • منقي():

تحويل مرشح ، يصف الاسم نفسه استخدامه. نستخدم عملية التحويل هذه لتصفية البيانات الانتقائية من مجموعة البيانات المعطاة. نطبق عملية التصفية هنا للحصول على سجلات مباريات IPL لهذا العام 2017 وتخزينه في ملف RDD.

val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • خريطة مسطحة():

نحن نطبق flatMap وهي عملية تحويل لكل عنصر من عناصر RDD لإنشاء RDD جديد. إنه مشابه لتحويل الخريطة. هنا نطبقخريطة مسطحةإلى بصق مباريات مدينة حيدر أباد وتخزين البيانات فيfilRDDRDD.

val filRDD = fil.flatMap (line => line.split ('Hyderabad')). اجمع ()

  • تقسيم():

يتم تقسيم كل البيانات التي نكتبها في RDD إلى عدد معين من الأقسام. نستخدم هذا التحول للعثور على عدد الأقسام يتم تقسيم البيانات فعليًا إلى.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

نحن نعتبر MapPatitions كبديل لـ Map () وforeach() سويا. نستخدم mapPartitions هنا للعثور على ملف عدد الصفوف لدينا في ملف RDD.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator).

  • تقليل بواسطة ():

نحن نستخدمتقليل بواسطة() على أزواج مفتاح القيمة . استخدمنا هذا التحول فيcsvملف للعثور على المشغل بامتداد أعلى رجل في المباريات .

val ManOfTheMatch = CKfile.map (_. split ('،') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount، 1)) val ManOTH = MOTMcount.reduceByKey ((x، y) => x + y). الخريطة (tup => (tup._2، tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

ما هو الفرق بين التحميل الزائد والتجاوز
  • اتحاد():

يشرح الاسم كل شيء ، نستخدم تحويل النقابات إلى نادي اثنين من RDDs معًا . هنا نقوم بإنشاء اثنين من RDDs وهما fil و fil2. يحتوي fil RDD على سجلات 2017 مباريات IPL ويحتوي fil2 RDD على سجل مطابقة IPL لعام 2016.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

دعونا نبدأ مع عمل الجزء حيث نظهر الناتج الفعلي:

  • تجميع():

التجميع هو العمل الذي نستخدمه عرض المحتويات في RDD.

ما هو مطور iOS
val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println)

  • عدد ():

العدهو إجراء نستخدمه لحساب عدد السجلات موجود في RDD.هنانحن نستخدم هذه العملية لحساب العدد الإجمالي للسجلات في ملف match.csv الخاص بنا.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.count ()

  • يأخذ():

Take هي عملية إجراء مشابهة للتجميع ولكن الاختلاف الوحيد هو أنها تستطيع طباعة أي منها عدد انتقائي من الصفوف حسب طلب المستخدم. هنا نطبق الكود التالي لطباعة ملف أهم عشرة تقارير رائدة.

val statecountm = Scount.reduceByKey ((x، y) => x + y) .map (tup => (tup._2، tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. تأخذ (10) .foreach (println)

  • أول():

First () هي عملية إجراء مشابهة للتجميع () واتخاذ ()عليهتُستخدم لطباعة التقرير الأعلى الإخراج هنا نستخدم العملية () الأولى للعثور على ملف أقصى عدد من المباريات التي تم لعبها في مدينة معينة ونحصل على مومباي كناتج.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') val States = CKfile.map (_. split ('،') (2)) val Scount = States.map (Scount => ( Scount، 1)) scala & gt val statecount = Scount.reduceByKey ((x، y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x، y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x، y) => x + y) .map (tup => (tup._2، tup._1)) sortByKey (false) statecountm.first ()

لجعل عمليتنا التعلم RDD باستخدام Spark ، أكثر إثارة للاهتمام ، توصلت إلى حالة استخدام مثيرة للاهتمام.

RDD باستخدام Spark: Pokemon Use Case

  • أولا، دعنا ننزّل ملف Pokemon.csv ونحمّله إلى شرارة الصدفة كما فعلنا مع ملف Matches.csv.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

تتوفر البوكيمون في الواقع في مجموعة كبيرة ومتنوعة ، دعونا نجد بعض الأصناف.

  • إزالة المخطط من ملف Pokemon.csv

قد لا نحتاج إلى مخطط من ملف Pokemon.csv. ومن ثم نقوم بإزالته.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • إيجاد عدد أقسام يتم توزيع pokemon.csv إلى ملفات.
println ('عدد الأقسام =' + NoHeader.partitions.size)

  • بوكيمون الماء

العثور على عدد بوكيمون الماء

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • بوكيمون النار

العثور على عدد بوكيمون النار

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • يمكننا أيضًا الكشف عن تعداد السكان من نوع مختلف من البوكيمون باستخدام وظيفة العد
WaterRDD.count () FireRDD.count ()

  • منذ أن أحب لعبة استراتيجية دفاعية دعونا نجد البوكيمون مع أقصى دفاع.
val defenceList = NoHeader.map {x => x.split ('،')}. map {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

  • نحن نعلم الحد الأقصى قيمة قوة الدفاع لكننا لا نعرف أي بوكيمون هو. لذلك ، دعونا نجد ما هو ذلك بوكيمون.
val defWithPokemonName = NoHeader.map {x => x.split ('،')}. map {x => (x (6) .toDouble، x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (ترتيب [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • الآن دعونا نفرز البوكيمون أقل دفاع
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble، true، 1) minDefencePokemon.take (5) .foreach (println)

  • الآن دعونا نرى البوكيمون مع a استراتيجية أقل دفاعية.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val defWithPokemonName2 = NoHeader .map {x => x.split ('،')}. map {x => (x (6) .toDouble، x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (ترتيب [مزدوج ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

لذلك ، بهذا ، نصل إلى نهاية هذا RDD باستخدام مقالة Spark. أتمنى أن نكون قد ألقينا القليل من الضوء على معرفتك بأجهزة RDD وميزاتها وأنواع العمليات المختلفة التي يمكن إجراؤها عليها.

هذه المقالة على أساس تم تصميمه لإعدادك لامتحان شهادة Cloudera Hadoop و Spark Developer (CCA175). ستحصل على معرفة متعمقة حول Apache Spark و Spark Ecosystem ، والذي يتضمن Spark RDD و Spark SQL و Spark MLlib و Spark Streaming. سوف تحصل على معرفة شاملة بلغة Scala Programming و HDFS و Sqoop و Flume و Spark GraphX ​​ونظام المراسلة مثل كافكا.