in src/graph_notebook/neptune/client.py [0:0]
def opencypher_http(self, query: str, headers: dict = None, explain: str = None,
query_params: dict = None,
plan_cache: str = None,
query_timeout: int = None,
use_port: bool = False) -> requests.Response:
if headers is None:
headers = {}
url = f'{self._http_protocol}://{self.host}'
if self.is_neptune_domain():
data = {}
if self.is_analytics_domain():
if use_port:
url += f':{self.port}'
url += f'/queries'
data['language'] = 'opencypher'
else:
if 'content-type' not in headers:
headers['content-type'] = 'application/x-www-form-urlencoded'
url += f':{self.port}/openCypher'
if plan_cache:
if plan_cache not in OPENCYPHER_PLAN_CACHE_MODES:
print('Invalid --plan-cache mode specified, defaulting to auto.')
else:
if plan_cache != 'auto':
if self.is_analytics_domain():
data['planCache'] = plan_cache
else:
query = set_plan_cache_hint(query, plan_cache)
data['query'] = query
if explain:
if self.is_analytics_domain():
data['explain.mode'] = explain
data['explain-mode'] = explain
data['explain'] = explain
headers['Accept'] = "text/html"
if query_params:
data['parameters'] = str(query_params).replace("'", '"') # '{"AUS_code":"AUS","WLG_code":"WLG"}'
if query_timeout and self.is_analytics_domain():
data['queryTimeoutMilliseconds'] = str(query_timeout)
else:
url += 'db/neo4j/tx/commit'
headers['content-type'] = 'application/json'
headers['Accept'] = 'application/vnd.neo4j.jolt+json-seq'
data_dict = {
"statements": [
{
"statement": query
}
]
}
data = json.dumps(data_dict)
if self.neo4j_auth:
user_and_pass = self.neo4j_username + ":" + self.neo4j_password
user_and_pass_base64 = b64encode(user_and_pass.encode())
headers['authorization'] = user_and_pass_base64
req = self._prepare_request('POST', url, data=data, headers=headers)
res = self._http_session.send(req, verify=self.ssl_verify)
return res