التحول التراكمي ذي الحالة في تدفق أباتشي سبارك



يناقش منشور المدونة هذا التحولات ذات الحالة في Spark Streaming. تعرف على كل شيء عن التتبع التراكمي والمهارات الأعلى لمهنة Hadoop Spark.

بمساهمة بريثفيراج بوس

ما هو شارات في جافا

لقد ناقشت في مدونتي السابقة التحولات ذات الحالة باستخدام مفهوم النوافذ في Apache Spark Streaming. يمكنك قراءتها هنا .





في هذا المنشور ، سأناقش العمليات التراكمية ذات الحالة في Apache Spark Streaming. إذا كنت جديدًا على Spark Streaming ، فإنني أوصيك بشدة بقراءة مدونتي السابقة لفهم كيفية عمل النوافذ.

أنواع التحول الحاصل في Spark Streaming (تابع ...)

> التتبع التراكمي

لقد استخدمنا ملف تقليل ByKeyAndWindow (...) واجهة برمجة التطبيقات لتتبع حالات المفاتيح ، إلا أن النوافذ تفرض قيودًا على حالات استخدام معينة. ماذا لو أردنا تجميع حالات المفاتيح طوال الوقت بدلاً من قصرها على نافذة زمنية؟ في هذه الحالة سنحتاج إلى استخدام updateStateByKey (...) نار.



تم تقديم واجهة برمجة التطبيقات هذه في Spark 1.3.0 ولاقت شهرة كبيرة. على الرغم من أن واجهة برمجة التطبيقات هذه لديها بعض الحمل الزائد في الأداء ، فإن أدائها يتدهور مع زيادة حجم الحالات بمرور الوقت. لقد كتبت نموذجًا لإظهار استخدام واجهة برمجة التطبيقات هذه. يمكنك العثور على الرمز هنا .

قدم Spark 1.6.0 واجهة برمجة تطبيقات جديدة mapWithState (...) الذي يحل النفقات العامة التي يطرحها الأداء updateStateByKey (...) . في هذه المدونة ، سأناقش واجهة برمجة التطبيقات المحددة هذه باستخدام نموذج برنامج كتبته. يمكنك العثور على الرمز هنا .

قبل أن أتعمق في شرح التعليمات البرمجية ، دعنا نحتفظ ببضع كلمات عند نقاط التفتيش. بالنسبة لأي تحول ذي حالة ، فإن نقاط التفتيش إلزامية. نقطة التفتيش هي آلية لاستعادة حالة المفاتيح في حالة فشل برنامج التشغيل. عند إعادة تشغيل برنامج التشغيل ، تتم استعادة حالة المفاتيح من ملفات نقاط التفتيش. عادةً ما تكون مواقع نقاط التفتيش هي HDFS أو Amazon S3 أو أي تخزين موثوق. أثناء اختبار الكود ، يمكن للمرء أيضًا تخزينه في نظام الملفات المحلي.



في نموذج البرنامج ، نستمع إلى دفق نص المقبس على host = localhost و port = 9999. يقوم بترميز الدفق الوارد إلى (الكلمات ، عدد التكرارات) ويتتبع عدد الكلمات باستخدام 1.6.0 API mapWithState (...) . بالإضافة إلى ذلك ، تتم إزالة المفاتيح بدون تحديثات باستخدام مهلة StateSpec API. نحن نحقق نقاط التفتيش في HDFS وتكرار نقاط التفتيش كل 20 ثانية.

دعونا أولاً ننشئ جلسة Spark Streaming ،

Spark-streaming-session

نقوم بإنشاء ملف نقطة تفتيش في HDFS ثم قم باستدعاء طريقة الكائن getOrCreate (...) . ال getOrCreate يتحقق API من ملف نقطة تفتيش لمعرفة ما إذا كانت هناك أي حالات سابقة لاستعادتها ، إن وجدت ، تقوم بإعادة إنشاء جلسة Spark Streaming وتحديث حالات المفاتيح من البيانات المخزنة في الملفات قبل الانتقال إلى بيانات جديدة. وإلا فإنه ينشئ جلسة Spark Streaming جديدة.

ال getOrCreate يأخذ اسم دليل نقطة التفتيش ووظيفة (التي قمنا بتسميتها إنشاء ) الذي يجب أن يكون توقيعه () => StreamingContext .

خوارزميات جافا وهياكل البيانات

دعونا نفحص الشفرة بالداخل إنشاء .

السطر رقم 2: نقوم بإنشاء سياق دفق مع اسم الوظيفة إلى 'TestMapWithStateJob' والفاصل الزمني للدفعة = 5 ثوان.

السطر رقم 5: قم بتعيين دليل نقاط التفتيش.

السطر رقم 8: قم بتعيين مواصفات الحالة باستخدام الفئة org.apache.streaming.StateSpec موضوع. نقوم أولاً بتعيين الوظيفة التي ستتبع الحالة ، ثم نقوم بتعيين عدد الأقسام لـ DStreams الناتجة التي سيتم إنشاؤها أثناء عمليات التحويل اللاحقة. أخيرًا ، قمنا بتعيين المهلة (على 30 ثانية) حيث إذا لم يتم تلقي أي تحديث لمفتاح في 30 ثانية ، فستتم إزالة حالة المفتاح.

السطر 12 #: قم بإعداد دفق المقبس ، وقم بتسوية بيانات الدُفعات الواردة ، وقم بإنشاء زوج قيم مفتاح ، واتصل mapWithState ، اضبط الفاصل الزمني للتسجيل على 20 ثانية وأخيرًا اطبع النتائج.

يستدعي إطار Spark th e createFunc لكل مفتاح بالقيمة السابقة والحالة الحالية. نحسب المجموع ونحدّث الحالة بالمجموع التراكمي وأخيرًا نعيد مجموع المفتاح.

ماذا تفعل تقليم في جافا

مصادر جيثب -> TestMapStateWithKey.scala و TestUpdateStateByKey.scala

لديك سؤال لنا؟ يرجى ذكر ذلك في قسم التعليقات وسنعاود الاتصال بك.

المنشورات ذات الصلة:

ابدأ مع Apache Spark & ​​Scala

التحولات ذات الحالة مع الانغماس في تدفق شرارة