facebook_business/adobjects/serverside/batch_processor.py (39 lines of code) (raw):
# Copyright 2014 Facebook, Inc.
# You are hereby granted a non-exclusive, worldwide, royalty-free license to
# use, copy, modify, and distribute this software in source code or binary
# form for use in connection with the web services and APIs provided by
# Facebook.
# As with any software that integrates with the Facebook platform, your use
# of this software is subject to the Facebook Developer Principles and
# Policies [http://developers.facebook.com/policy/]. This copyright notice
# shall be included in all copies or substantial portions of the software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
from facebook_business.adobjects.serverside.util import Util
if not Util.async_requests_available():
raise Exception('BatchProcessor requires Python >= 3.5.3')
import asyncio
class BatchProcessor:
def __init__(self, batch_size=50, concurrent_requests=4):
self.batch_size = batch_size
self.concurrent_requests = concurrent_requests
def process_event_requests(self, event_requests_async):
async def process():
async for _ in self.process_event_requests_generator(event_requests_async):
pass
asyncio.run(process())
async def process_event_requests_generator(self, event_requests_async):
index = 0
while index < len(event_requests_async):
batch = event_requests_async[index:(index + self.concurrent_requests)]
responses = []
for request in batch:
response = await request.execute()
responses.append(response)
yield responses
index += self.concurrent_requests
def process_events(self, event_request_async_to_clone, events):
async def process():
async for _ in self.process_events_generator(event_request_async_to_clone, events):
pass
asyncio.run(process())
async def process_events_generator(self, event_request_async_to_clone, events):
index = 0
while index < len(events):
responses = []
while index < len(events) and len(responses) < self.concurrent_requests:
event_request = event_request_async_to_clone.clone_without_events()
event_request.events = events[index:(index + self.batch_size)]
response = await event_request.execute()
responses.append(response)
index += self.batch_size
yield responses