병렬 스크래핑 가이드
이 가이드 작성 시점(2023년 12월)에서 요청 잠금(request locking) 기능은 아직 실험 단계입니다. 자세한 내용은 요청 잠금 실험 페이지를 참고해 주세요.
이 가이드에서는 단일 스크래퍼를 여러 인스턴스에서 병렬로 실행할 수 있도록 변환하는 방법을 설명합니다. 입문 가이드를 이미 읽었거나 스크래퍼를 구현해 본 경험이 있다고 가정합니다. 아직 입문 가이드를 읽지 않으셨다면, 먼저 그 내용을 숙지하신 후 돌아와 주세요.
자, 이제 스크래퍼를 병렬화해 볼까요!
병렬화 전 고려사항
스크래퍼를 병렬화하기 전에 다음 사항들을 고려해 보시기 바랍니다:
- 병렬화가 필요할 만큼 많은 페이지를 스크래핑하시나요?
- 몇 페이지만 스크래핑한다면 병렬화는 불필요할 수 있습니다
- 하지만 많은 페이지를 처리하거나 로딩이 오래 걸리는 페이지를 스크래핑한다면 병렬화를 고려해볼 만합니다
- 대상 웹사이트에 과부하를 주지 않고 병렬화할 수 있나요?
- 트래픽이 많은 웹사이트의 경우, 병렬 스크래핑으로 인해 서비스에 영향을 줄 수 있습니다
- 병렬 실행에 필요한 리소스가 충분한가요?
- 로컬 실행 시: CPU와 RAM이 충분한지 확인하세요
- 클라우드 실행 시: 병렬화로 인한 속도 향상이 추가 비용 대비 효율적인지 검토하세요
위 질문들에 모두 "예"라고 답하셨나요? 좋습니다! 본격적인 가이드로 들어가기 전에 Apify의 윤리적인 웹 스크래핑 블로그 포스트도 한번 읽어보시기 바랍니다.
이 가이드는 크게 두 부분으로 구성됩니다: 입문 가이드에서 만든 스크래퍼를 병렬 처리가 가능하도록 변환하는 과정과, 실제로 병렬 실행하는 방법을 다룹니다.
Crawlee 병렬 스크래핑 예제 저장소에서 확인하실 수 있습니다! 입문 가이드에서 만든 스크래퍼를 TypeScript로 작성하고 병렬화한 버전입니다.
Crawlee의 동시성과 병렬화의 차이점
잠깐만요! Crawlee에는 이미
maxConcurrency
옵션이 있지 않나요?
네, 말씀하신 대로 Crawlee는 이미 "병렬"(정확히는 동시성) 스크래핑을 지원합니다. 이는 하나의 프로세스에서 여러 작업을 동시에 백그라운드로 실행할 수 있게 해줍니다. 하지만 스크래핑 작업의 규모가 커질수록 병목 현상이 발생할 수 있습니다. 런타임 환경이 더 이상 동시 요청을 처리하지 못하거나, RAM과 CPU가 한계에 도달하는 등의 문제가 생길 수 있죠. 단순히 리소스를 늘리는 것만으로는 한계가 있습니다.
이것이 바로 수직적 확장과 수평적 확장의 차이점입니다. 수직적 확장은 단일 프로세스나 머신의 리소스를 증가시키는 것이고, 수평적 확장은 프로세스나 머신의 수를 늘리는 것입니다. 이 가이드에서 다루는 "병렬화"는 바로 이 수평적 확장을 의미합니다!
병렬화를 위한 스크래퍼 준비하기
Crawlee의 장점 중 하나는 병렬화를 위해 많은 수정이 필요하지 않다는 것입니다! 잠금을 지원하는 큐를 만들고, 초기 스크래퍼에서 링크를 해당 큐에 추가한 다음, 이 큐를 사용하는 병렬 스크래퍼를 구축하면 됩니다.
잠금 지원 요청 큐 만들기
먼저 공통으로 사용할 파일(requestQueue.mjs
)을 만들어 잠금을 지원하는 요청 큐를 저장합니다.
import { RequestQueueV2 } from 'crawlee';
// 병렬 처리를 지원하는 요청 대기열 생성
let queue;
/**
* @param {boolean} makeFresh 대기열을 반환하기 전에 초기화할지 여부
* @returns 요청 대기열 인스턴스
*/
export async function getOrInitQueue(makeFresh = false) {
if (queue) {
return queue;
}
queue = await RequestQueueV2.open('shop-urls');
if (makeFresh) {
await queue.drop();
queue = await RequestQueueV2.open('shop-urls');
}
return queue;
}
내보내는 함수 getOrInitQueue
는 복잡해 보일 수 있지만, 본질적으로는 요청 큐가 초기화되어 있는지 확인하고, 필요한 경우 빈 상태로 시작하도록 합니다.
기존 스크래퍼 수정하기
이전에 만든 스크래퍼의 src/routes.mjs
파일에서 CATEGORY
라벨에 대한 핸들러를 수정하여 상품 URL들을 새로 만든 큐에 추가하도록 해보겠습니다.
먼저 requestQueue.mjs
파일에서 getOrInitQueue
함수를 가져옵니다:
import { getOrInitQueue } from './requestQueue.mjs';
그리고 CATEGORY
핸들러를 다음과 같이 수정합니다:
router.addHandler('CATEGORY', async ({ page, enqueueLinks, request, log }) => {
log.debug(`다음 페이지네이션 대기열에 추가: ${request.url}`);
// 현재 카테고리 페이지에 있습니다. 이를 통해 모든 제품을 페이지네이션하고 대기열에 추가할 수 있으며,
// 발견되는 후속 페이지들도 추가할 수 있습니다
await page.waitForSelector('.product-item > a');
await enqueueLinks({
selector: '.product-item > a',
label: 'DETAIL', // <= 다른 레이블 참고
requestQueue: await getOrInitQueue(), // <= 다른 요청 대기열 참고
});
// 이제 "다음" 버튼을 찾아 다음 결과 페이지를 대기열에 추가해야 합니다 (존재하는 경우)
const nextButton = await page.$('a.pagination__next');
if (nextButton) {
await enqueueLinks({
selector: 'a.pagination__next',
label: 'CATEGORY', // <= 동일한 레이블 참고
});
}
});
이제 진입점 파일 src/main.mjs
의 이름을 src/initial-scraper.mjs
로 변경하고 실행해보세요. 상세 페이지는 스크래핑하지 않고 URL만 잠금 지원 큐에 추가되는 것을 확인할 수 있습니다.
마지막으로 crawler.run()
전에 다음 코드를 추가합니다:
import { getOrInitQueue } from './requestQueue.mjs';
// 크롤러가 채울 빈 큐를 미리 초기화합니다
await getOrInitQueue(true);
이는 스크래퍼를 실행할 때마다 큐가 빈 상태에서 시작하도록 하기 위함입니다. 하지만 사용 사례에 따라 이 과정이 필요하지 않을 수도 있으니, 실험을 통해 최적의 방법을 찾아보세요.
이것으로 초기 스크래퍼가 스크래핑할 모든 URL을 잠금 지원 큐에 저장하도록 준비가 완료되었습니다!
병렬 스크래퍼 만들기
이제 큐에 있는 URL들을 병렬로 스크래핑할 스크래퍼를 만들어보겠습니다! 여기서는 Node.js의 자식 프로세스를 사용하지만, 다른 방법으로도 병렬 실행이 가능합니다. 다른 방법을 사용하실 경우 코드를 적절히 수정하셔야 합니다.
이 스크래퍼는 자신을 두 번 포크하고(이 숫자는 실험해보세요), 각 포크는 앞서 만든 큐를 재사용합니다. 가장 좋은 점은 초기 스크래퍼에서 만든 라우터를 그대로 재사용할 수 있다는 것입니다!
import { fork } from 'node:child_process';
import { Configuration, Dataset, PlaywrightCrawler, log } from 'crawlee';
import { router } from './routes.mjs';
import { getOrInitQueue } from './shared.mjs';
// 이 예제에서는 스토어를 병렬로 스크래핑하기 위해 2개의 별도 프로세스를 생성합니다.
if (!process.env.IN_WORKER_THREAD) {
// 메인 프로세스입니다. 워커 스레드를 생성하는 데 사용됩니다.
log.info('워커 스레드 설정 중.');
const currentFile = new URL(import.meta.url).pathname;
// 모든 워커가 완료될 때까지 기다리기 위해 워커별로 프로미스를 저장합니다
const promises = [];
// 원하는 만큼 워커를 생성할 수 있지만, 시스템 과부하를 피하기 위해 적절한 수를 유지해야 합니다
for (let i = 0; i < 2; i++) {
const proc = fork(currentFile, {
env: {
// 현재 프로세스의 환경 변수를 새로 생성된 프로세스와 공유
...process.env,
// 이 프로세스가 워커 프로세스임을 알림
IN_WORKER_THREAD: 'true',
// 워커 인덱스도 지정
WORKER_INDEX: String(i),
},
});
proc.on('online', () => {
log.info(`프로세스 ${i}가 온라인 상태입니다.`);
// 크롤러의 작업 내용을 로깅
// 참고: 크롤러에서 이미 형식이 지정된 출력을 받기 때문에 log.info 대신 console.log를 사용합니다
proc.stdout.on('data', (data) => {
// eslint-disable-next-line no-console
console.log(data.toString());
});
proc.stderr.on('data', (data) => {
// eslint-disable-next-line no-console
console.error(data.toString());
});
});
proc.on('message', async (data) => {
log.debug(`프로세스 ${i}에서 데이터를 전송했습니다.`, data);
await Dataset.pushData(data);
});
promises.push(
new Promise((resolve) => {
proc.once('exit', (code, signal) => {
log.info(`프로세스 ${i}가 코드 ${code}와 시그널 ${signal}로 종료되었습니다`);
resolve();
});
}),
);
}
await Promise.all(promises);
log.info('크롤링이 완료되었습니다!');
} else {
// 워커 프로세스입니다. 스토어 스크래핑에 사용됩니다.
// 워커 인덱스를 접두사로 사용하는 로거를 생성합니다
const workerLogger = log.child({ prefix: `[워커 ${process.env.WORKER_INDEX}]` });
// CRAWLEE_LOG_LEVEL 환경 변수나 설정 옵션으로 지정하는 것이 더 좋습니다
// 이것은 예시를 위한 것입니다 😈
workerLogger.setLevel(log.LEVELS.DEBUG);
// 시작 시 자동 제거 기능을 비활성화
// 로컬에서 실행할 때 필요합니다. 여러 프로세스가 기본 저장소를 지우려고 하면 충돌이 발생할 수 있기 때문입니다
Configuration.set('purgeOnStart', false);
// 요청 큐 가져오기
const requestQueue = await getOrInitQueue(false);
// 워커별 데이터를 별도 디렉터리에 저장하도록 Crawlee 설정
// (로컬에서 실행할 때는 큐 초기화 후에 수행해야 함)
const config = new Configuration({
storageClientOptions: {
localDataDirectory: `./storage/worker-${process.env.WORKER_INDEX}`,
},
});
workerLogger.debug('크롤러 설정 중.');
const crawler = new PlaywrightCrawler(
{
log: workerLogger,
// if 절이 있는 긴 requestHandler 대신
// 라우터 인스턴스를 제공합니다
requestHandler: router,
// 큐를 사용할 수 있도록 요청 잠금 실험 기능을 활성화합니다
experiments: {
requestLocking: true,
},
// 이전 단계에서 미리 채운 요청 큐를 제공
requestQueue,
// 단일 프로세스에 과부하가 걸리지 않도록 크롤러의 동시성을 제한합니다 🐌
maxConcurrency: 5,
},
config,
);
await crawler.run();
}
DETAIL
라우트 핸들러도 약간 수정해야 합니다. context.pushData
대신 process.send
를 사용하도록 변경하겠습니다.
자식 프로세스를 사용하면 각 워커 프로세스가 자체 저장 공간을 가지기 때문에 context.pushData
가 원하는 대로 동작하지 않습니다.
대신 데이터를 부모 프로세스로 보내서 중앙에서 데이터를 저장해야 합니다.
사용 사례에 따라 이 변경이 필요하지 않을 수도 있습니다! 실험을 통해 최적의 방법을 찾아보세요.
// if 절의 request.label === DETAIL 부분을 대체합니다.
router.addHandler('DETAIL', async ({ request, page, log }) => {
log.debug(`데이터 추출 중: ${request.url}`);
const urlPart = request.url.split('/').slice(-1); // ['sennheiser-mke-440-professional-stereo-shotgun-microphone-mke-440']
const manufacturer = urlPart[0].split('-')[0]; // 'sennheiser'
const title = await page.locator('.product-meta h1').textContent();
const sku = await page.locator('span.product-meta__sku-number').textContent();
const priceElement = page
.locator('span.price')
.filter({
hasText: '$',
})
.first();
const currentPriceString = await priceElement.textContent();
const rawPrice = currentPriceString.split('$')[1];
const price = Number(rawPrice.replaceAll(',', ''));
const inStockElement = page
.locator('span.product-form__inventory')
.filter({
hasText: 'In stock',
})
.first();
const inStock = (await inStockElement.count()) > 0;
const results = {
url: request.url,
manufacturer,
title,
sku,
currentPrice: price,
availableInStock: inStock,
};
log.debug(`데이터 저장 중: ${request.url}`);
// 부모 프로세스로 데이터 전송
// 크롤러 구현 방식에 따라 이 부분은 `context.pushData()`와 같은 다른 방식으로 구현할 수 있습니다.
// 여러 방법을 시도해보면서 최적의 방법을 찾아보세요.
process.send(results);
});
코드가 많으니 하나씩 살펴보겠습니다:
process.env.IS_WORKER_THREAD
확인
이 값을 통해 스크립트가 어떤 모드로 실행되는지 확인합니다. 이 값이 설정되어 있으면 스크래핑을 시작하는 워커 프로세스로 간주하고, 그렇지 않으면 워커 프로세스를 생성할 부모 프로세스로 간주합니다.
워커 프로세스마다 Promise를 만드는 이유
부모 프로세스가 모든 워커 프로세스가 종료될 때까지 살아있도록 하기 위함입니다. 그렇지 않으면 워커 프로세스들이 생성된 후 부모와의 통신이 불가능해집니다. 사용 사례에 따라 이 기능이 필요하지 않을 수도 있습니다(워커만 실행하고 끝내도 되는 경우).
Configuration
호출이 여러 번 있는 이유
워커 프로세스에 대해 세 가지 설정이 필요합니다:
- 시작할 때 기본 저장소가 초기화되지 않도록 합니다
- 부모 프로세스와 같은 위치에서 잠금 지원 큐를 가져옵니다
- 워커 프로세스별로 충돌하지 않는 특별한 저장소를 초기화합니다
다음 코드가 각각의 역할을 수행합니다:
// 자동 초기화 비활성화 (1단계)
// 로컬에서 실행할 때 여러 프로세스가 기본 저장소를 지우려고 하면 충돌이 발생할 수 있어 필요합니다
Configuration.set('purgeOnStart', false);
// 부모 프로세스의 요청 큐 가져오기 (2단계)
const requestQueue = await getOrInitQueue(false);
// 워커별 데이터를 별도 디렉토리에 저장하도록 설정 (3단계)
// 로컬 실행 시 큐 초기화 후에 해야 합니다
const config = new Configuration({
storageClientOptions: {
localDataDirectory: `./storage/worker-${process.env.WORKER_INDEX}`,
},
});
요청 잠금 실험 활성화와 워커 설정 적용
코드에서 강조 표시된 부분들은 요청 잠금 실험을 활성화하고 크롤러에 요청 큐를 제공하는 방법을 보여줍니다. 자세한 내용은 요청 잠금 실험 페이지를 참고하세요.
크롤러 생성자의 두 번째 매개변수로 config
변수를 전달한 것도 보셨을 겁니다. 이는 크롤러가 워커별 저장소를 내부 상태용으로 사용하고 서로 충돌하지 않도록 하기 위함입니다.
context.pushData
대신 process.send
를 사용하는 이유
자식 프로세스를 사용하면 각 워커가 자체 저장 공간을 가지므로 context.pushData
를 사용하면 각자의 "기본" 데이터셋에만 저장됩니다. 대신 데이터를 부모 프로세스로 보내서 중앙의 데이터셋에 저장해야 합니다.
좋은 질문입니다! 로컬에서 실행할 때는 각 프로세스가 데이터셋의 상태를 개별적으로 추적하기 때문에 데이터가 누락되거나 덮어쓰여질 수 있습니다. 그래서 부모 프로세스의 중앙 데이터셋으로 데이터를 보내야 합니다.
크롤러에 따라 이 문제가 발생하지 않을 수도 있습니다! 각 사용 사례마다 특성이 다르니 스크래퍼를 만들 때 이 점을 고려하세요.
최대 동시성을 5
로 제한한 이유
두 가지 이유가 있습니다:
- 대상 웹사이트에 과부하를 주지 않기 위해 워커 프로세스당 동시 요청 수를 적절한 수준으로 제한합니다
- 스크래퍼를 실행하는 머신에 과부하를 주지 않습니다
이는 앞서 언급한 병렬화 고려사항과 연결됩니다.
자주 묻는 질문
initial-scraper
를 parallel-scraper
에 통합할 수 있나요?
기술적으로는 가능합니다! 부모 프로세스에서 먼저 URL을 큐에 추가한 다음 워커 프로세스로 스크래핑하도록 할 수 있습니다. 각 부분의 역할을 명확히 하기 위해 분리했지만, 필요하다면 통합하셔도 됩니다.
제가 만드는 스크래퍼/스크래핑하려는 사이트에 도움이 될까요?
단정 지어 말씀드리기는 어렵습니다! 🤷
먼저 단일 스크래퍼로 만들어서 성능을 모니터링해보세요. 너무 느린가요? 많은 페이지를 스크래핑하거나, 적은 페이지라도 로딩이 오래 걸리나요? 그렇다면 병렬화가 도움될 수 있습니다. 확신이 서지 않는다면 가이드 시작 부분의 고려사항 목록을 다시 한번 확인해보세요.