jobs/v3/api_client/general_search_sample.py (185 lines of code) (raw):
#!/usr/bin/env python
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
from googleapiclient.discovery import build
client_service = build("jobs", "v3")
parent = "projects/" + os.environ["GOOGLE_CLOUD_PROJECT"]
# [START job_discovery_basic_keyword_search]
def basic_keyword_search(client_service, company_name, keyword):
request_metadata = {
"user_id": "HashedUserId",
"session_id": "HashedSessionId",
"domain": "www.google.com",
}
job_query = {"query": keyword}
if company_name is not None:
job_query.update({"company_names": [company_name]})
request = {
"search_mode": "JOB_SEARCH",
"request_metadata": request_metadata,
"job_query": job_query,
}
response = (
client_service.projects().jobs().search(parent=parent, body=request).execute()
)
print(response)
# [END job_discovery_basic_keyword_search]
# [START job_discovery_category_filter_search]
def category_search(client_service, company_name, categories):
request_metadata = {
"user_id": "HashedUserId",
"session_id": "HashedSessionId",
"domain": "www.google.com",
}
job_query = {"job_categories": categories}
if company_name is not None:
job_query.update({"company_names": [company_name]})
request = {
"search_mode": "JOB_SEARCH",
"request_metadata": request_metadata,
"job_query": job_query,
}
response = (
client_service.projects().jobs().search(parent=parent, body=request).execute()
)
print(response)
# [END job_discovery_category_filter_search]
# [START job_discovery_employment_types_filter_search]
def employment_types_search(client_service, company_name, employment_types):
request_metadata = {
"user_id": "HashedUserId",
"session_id": "HashedSessionId",
"domain": "www.google.com",
}
job_query = {"employment_types": employment_types}
if company_name is not None:
job_query.update({"company_names": [company_name]})
request = {
"search_mode": "JOB_SEARCH",
"request_metadata": request_metadata,
"job_query": job_query,
}
response = (
client_service.projects().jobs().search(parent=parent, body=request).execute()
)
print(response)
# [END job_discovery_employment_types_filter_search]
# [START job_discovery_date_range_filter_search]
def date_range_search(client_service, company_name, date_range):
request_metadata = {
"user_id": "HashedUserId",
"session_id": "HashedSessionId",
"domain": "www.google.com",
}
job_query = {"publish_time_range": date_range}
if company_name is not None:
job_query.update({"company_names": [company_name]})
request = {
"search_mode": "JOB_SEARCH",
"request_metadata": request_metadata,
"job_query": job_query,
}
response = (
client_service.projects().jobs().search(parent=parent, body=request).execute()
)
print(response)
# [END job_discovery_date_range_filter_search]
# [START job_discovery_language_code_filter_search]
def language_code_search(client_service, company_name, language_codes):
request_metadata = {
"user_id": "HashedUserId",
"session_id": "HashedSessionId",
"domain": "www.google.com",
}
job_query = {"language_codes": language_codes}
if company_name is not None:
job_query.update({"company_names": [company_name]})
request = {
"search_mode": "JOB_SEARCH",
"request_metadata": request_metadata,
"job_query": job_query,
}
response = (
client_service.projects().jobs().search(parent=parent, body=request).execute()
)
print(response)
# [END job_discovery_language_code_filter_search]
# [START job_discovery_company_display_name_search]
def company_display_name_search(client_service, company_name, company_display_names):
request_metadata = {
"user_id": "HashedUserId",
"session_id": "HashedSessionId",
"domain": "www.google.com",
}
job_query = {"company_display_names": company_display_names}
if company_name is not None:
job_query.update({"company_names": [company_name]})
request = {
"search_mode": "JOB_SEARCH",
"request_metadata": request_metadata,
"job_query": job_query,
}
response = (
client_service.projects().jobs().search(parent=parent, body=request).execute()
)
print(response)
# [END job_discovery_company_display_name_search]
# [START job_discovery_compensation_search]
def compensation_search(client_service, company_name):
request_metadata = {
"user_id": "HashedUserId",
"session_id": "HashedSessionId",
"domain": "www.google.com",
}
compensation_range = {
"max_compensation": {"currency_code": "USD", "units": 15},
"min_compensation": {"currency_code": "USD", "units": 10, "nanos": 500000000},
}
compensation_filter = {
"type": "UNIT_AND_AMOUNT",
"units": ["HOURLY"],
"range": compensation_range,
}
job_query = {"compensation_filter": compensation_filter}
if company_name is not None:
job_query.update({"company_names": [company_name]})
request = {
"search_mode": "JOB_SEARCH",
"request_metadata": request_metadata,
"job_query": job_query,
}
response = (
client_service.projects().jobs().search(parent=parent, body=request).execute()
)
print(response)
# [END job_discovery_compensation_search]
def set_up():
import base_company_sample
import base_job_sample
company_to_be_created = base_company_sample.generate_company()
company_to_be_created.update({"display_name": "Google"})
company_created = base_company_sample.create_company(
client_service, company_to_be_created
)
company_name = company_created.get("name")
job_to_be_created = base_job_sample.generate_job_with_required_fields(company_name)
amount = {"currency_code": "USD", "units": 12}
compensation_info = {
"entries": [{"type": "BASE", "unit": "HOURLY", "amount": amount}]
}
job_to_be_created.update(
{
"title": "Systems Administrator",
"employment_types": "FULL_TIME",
"language_code": "en-US",
"compensation_info": compensation_info,
}
)
job_name = base_job_sample.create_job(client_service, job_to_be_created).get("name")
return company_name, job_name
def tear_down(company_name, job_name):
import base_company_sample
import base_job_sample
base_job_sample.delete_job(client_service, job_name)
base_company_sample.delete_company(client_service, company_name)
def run_sample(company_name, job_name):
basic_keyword_search(client_service, company_name, "Systems Administrator")
category_search(client_service, company_name, ["COMPUTER_AND_IT"])
date_range = {"start_time": "2018-07-01T00:00:00Z"}
date_range_search(client_service, company_name, date_range)
employment_types_search(
client_service, company_name, ["FULL_TIME", "CONTRACTOR", "PER_DIEM"]
)
company_display_name_search(client_service, company_name, ["Google"])
compensation_search(client_service, company_name)
language_code_search(client_service, company_name, ["pt-BR", "en-US"])
if __name__ == "__main__":
company_name, job_name = set_up()
# Wait several seconds for post processing
time.sleep(10)
run_sample(company_name, job_name)
tear_down(company_name, job_name)