[Zoom 14-02-2025] Chuẩn bị dữ liệu pre-trained cho mô hình ngôn ngữ
Cách bố trí dữ liệu hiểu quả
Các công cụ tối ưu bố trí dữ liệu
1. Nội dung
1.1. Slide - Data Pipeline
1.2. Slide - Đào tạo phân tán
1.3. Các bộ dữ liệu dùng để Pretrained nổi tiếng
Các paper quan trọng:
LLAMA paper: https://arxiv.org/pdf/2302.13971
Datasets for Large Language Models: A Comprehensive Survey: https://arxiv.org/pdf/2402.18041
1.4. Chuẩn bị dữ liệu cho LLMs
Sau khi đã Crawl xong dữ liệu từ tuần trước, team bổ sung Notebook chi tiết chuẩn bị dữ liệu sẵn sàng để train cho mô hình ngôn ngữ lớn. Chú ý trong notebook này để tăng tốc load dữ liệu lớn cần code song song trên cả CPU và GPU.
Đường dẫn: https://colab.research.google.com/drive/1_5ba9MbA8MUnLoQuTraojwLOy2AXSvBF?usp=sharing
Quy trình xây dựng tự điền thì làm song song trên CPU.
Quy trình khi có từ điển thì chuyển các câu thành chuỗi số thì làm song song trên GPU.
Các notebook tiếp theo sẽ là quá trình training mô hình và đánh giá.
1.5. [Nâng cao] Thư viện deduplication
2. Demo train phân tán
2.1. Thực hành train GPT
Data: https://drive.google.com/drive/folders/1hIN7zSwHg1rKc_R4-J5BEhJyy5tSITQt
Code load dữ liệu:
https://drive.google.com/file/d/1t4glTDgRBd08SeChSOXlplGpnIJvUh0S/view?usp=drive_link
Code đào tạo:
https://drive.google.com/file/d/1p2AsCWHA9HWKjAmzE0kQtp8z8Gr5E4fZ/view?usp=drive_link
Script training:
python -m torch.distributed.launch --nproc_per_node 1 train.py
3. Các kỹ thuật liên quan tới Common Crawl
3.1. Một số cript chạy PySpark trên GGDataproc
Cài đặt Cloud SDK
Clean Common Crawl với PySpark + GG Dataproc
Setup phân quyền cho Service Account hiện tại của bạn
Quyền truy cập Storage
gcloud projects add-iam-policy-binding <PROJECT_ID> \
--member serviceAccount:<YOUR_SERVICE_ACCOUNT_EMAIL> \
--role roles/storage.admin
Quyền dùng Dataproc
gcloud projects add-iam-policy-binding <PROJECT_ID> \
--member serviceAccount:<YOUR_SERVICE_ACCOUNT_EMAIL> \
--role roles/dataproc.worker
Quyền IAM
gcloud projects add-iam-policy-binding <PROJECT_ID> \
--member serviceAccount:<YOUR_SERVICE_ACCOUNT_EMAIL> \
--role roles/iam.serviceAccountUser
Kiểm tra quyền
gcloud projects get-iam-policy <PROJECT_ID> --flatten="bindings[].members" --format='table(bindings.role)' --filter="bindings.members:<YOUR_SERVICE_ACCOUNT_EMAIL>"
Đảm bảo các quyền sau:
roles/storage.admin
roles/dataproc.worker
roles/iam.serviceAccountUser
Setup Cluster
Master + Workers Node (Thông thường)
gcloud dataproc clusters create my-cluster \
--region us-central1 \
--zone us-central1-a \
--num-workers 2 \
--master-machine-type n1-standard-2 \
--worker-machine-type n1-standard-2 \
--master-boot-disk-size 100GB \
--worker-boot-disk-size 50GB \
--project vertical-set-272909 \
--project <PROJECT_ID> \
--initialization-actions gs://protonx-my-dataproc-bucket/scripts/install_requirements.sh
Tải Warc Data lên GCS
gsutil cp warc_data/*.warc gs://my-dataproc-bucket/warc_data/
Submit code preprocess
gcloud dataproc jobs submit pyspark gs://my-dataproc-bucket/scripts/warc_processor.py \
--cluster my-cluster \
--region us-central1 \
-- \
gs://my-dataproc-bucket/warc_data/
Script warc_processor.py này như sau:
Ý tưởng là chia data từ GCS thành các phần nhỏ gọi là partitions
Xử lý dữ liệu trên các partitions này và hợp lại
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, StructType, StructField
from warcio.archiveiterator import ArchiveIterator
from io import BytesIO
from bs4 import BeautifulSoup
# 🚀 Initialize Spark Session
spark = SparkSession.builder.appName("WARC Processor & Cleaner").getOrCreate()
# 🔹 Read WARC files from Google Cloud Storage
warc_rdd = spark.sparkContext.binaryFiles("gs://protonx-my-dataproc-bucket/small_warc/*.warc") \
.repartition(10) # Prevents memory overload on a single executor
# ✅ Allowed character set for Vietnamese text cleaning
letters = set('aáàảãạăắằẳẵặâấầẩẫậbcdđeéèẻẽẹêếềểễệfghiíìỉĩịjklmnoóòỏõọôốồổỗộơớờởỡợpqrstuúùủũụưứừửữựvwxyýỳỷỹỵz0123456789')
# 🔹 Function to clean individual words
def clean_word(w):
return "".join(letter.lower() for letter in w if letter.lower() in letters or letter == '.')
# 🔥 Function to process WARC records and extract raw HTML content
def process_warc_partition(iterator):
extracted_data = []
for filename, warc_bytes in iterator:
try:
stream = BytesIO(warc_bytes) # Convert binary content into a readable stream
for record in ArchiveIterator(stream):
if record.rec_type == "response":
url = record.rec_headers.get_header("WARC-Target-URI")
html_content = record.content_stream().read().decode("utf-8", errors="ignore")
extracted_data.append((url, html_content))
except Exception as e:
print(f"❌ Error processing {filename}: {e}")
return extracted_data
# ⚡ Extract and convert RDD to DataFrame
schema = StructType([
StructField("url", StringType(), True),
StructField("content", StringType(), True)
])
warc_extracted_rdd = warc_rdd.mapPartitions(process_warc_partition)
df = spark.createDataFrame(warc_extracted_rdd, schema)
# 🔥 Function to clean HTML content
def clean_html_content(content):
if not content:
return ""
soup = BeautifulSoup(content, "html.parser")
text = soup.get_text(separator=" ") # Extract plain text
# Clean each word in the extracted text
cleaned_words = [clean_word(word) for word in text.split()]
return " ".join(cleaned_words)
# 🔹 Register function as a Spark UDF
clean_html_udf = udf(clean_html_content, StringType())
# ✅ Apply cleaning function to the 'content' column
df_cleaned = df.withColumn("cleaned_content", clean_html_udf(df["content"]))
# 🔄 Save the cleaned data to Google Cloud Storage
output_path = "gs://protonx-my-dataproc-bucket/processed_warc/cleaned_data.parquet"
df_cleaned.write.mode("overwrite").parquet(output_path)
print(f"🔥 Cleaned WARC content saved to: {output_path}")
# 🚀 Stop Spark Session
spark.stop()
3.2. Thực hành
Đọc WARC file dùng Python
https://colab.research.google.com/drive/1MtoL5GCXzHzcGQlWYTlrlAWDa0Ek1bkl
Sinh một file WARC và xử lý WARC phân tán
https://drive.google.com/file/d/1E6e7bm93IAJVA0x5DQmj6Ux_kvOUczNi/view?usp=drive_link