facebook_business/adobjects/serverside/event_request_async.py (84 lines of code) (raw):

# Copyright 2014 Facebook, Inc. # You are hereby granted a non-exclusive, worldwide, royalty-free license to # use, copy, modify, and distribute this software in source code or binary # form for use in connection with the web services and APIs provided by # Facebook. # As with any software that integrates with the Facebook platform, your use # of this software is subject to the Facebook Developer Principles and # Policies [http://developers.facebook.com/policy/]. This copyright notice # shall be included in all copies or substantial portions of the software. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL # THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. from facebook_business.adobjects.serverside.util import Util if not Util.async_requests_available(): raise Exception('EventRequestAsync requires Python >= 3.5.3') from facebook_business.adobjects.serverside.event_request import EventRequest from facebook_business.adobjects.serverside.event_response import EventResponse from facebook_business.api import FacebookAdsApi from facebook_business.session import FacebookSession import aiohttp import json import os import ssl class EventRequestAsync(EventRequest): def __init__( self, pixel_id=None, events=None, test_event_code=None, namespace_id=None, upload_id=None, upload_tag=None, upload_source=None, partner_agent=None ): super().__init__( pixel_id, events, test_event_code, namespace_id, upload_id, upload_tag, upload_source, partner_agent ) self.pixel_id = pixel_id async def create_event(self, session): api_version = FacebookAdsApi.get_default_api().API_VERSION url = '/'.join([ FacebookSession.GRAPH, api_version, self.pixel_id, 'events' ]) facebook_session = FacebookAdsApi.get_default_api()._session access_token = facebook_session.access_token params = self.get_params() params['access_token'] = access_token if facebook_session.app_secret: params['appsecret_proof'] = facebook_session._gen_appsecret_proof() cafile = os.path.join( os.path.dirname(__file__), '..', '..', 'fb_ca_chain_bundle.crt' ) sslcontext = ssl.create_default_context(cafile=cafile) async with session.post( url=url, data=params, ssl=sslcontext, headers=FacebookAdsApi.get_default_api().HTTP_DEFAULT_HEADERS ) as response: return await response.json() async def execute(self): async with aiohttp.ClientSession() as session: response = await self.create_event(session) return EventResponse(events_received=response['events_received'], fbtrace_id=response['fbtrace_id'], messages=response['messages']) def normalize(self): normalized_events = [] for event in self.events: normalized_event = event.normalize() normalized_events.append(normalized_event) return json.dumps(normalized_events) def clone_without_events(self): return EventRequestAsync( pixel_id=self.pixel_id, events=[], test_event_code=self.test_event_code, namespace_id=self.namespace_id, upload_id=self.upload_id, upload_tag=self.upload_tag, upload_source=self.upload_source, )