in connectors/sources/mongo.py [0:0]
def get_client(self):
certfile = ""
try:
client_params = {}
if self.configuration["direct_connection"]:
client_params["directConnection"] = True
if len(self.user) > 0 or len(self.password) > 0:
client_params["username"] = self.user
client_params["password"] = self.password
if self.configuration["ssl_enabled"]:
client_params["tls"] = True
if self.ssl_ca:
pem_certificates = get_pem_format(key=self.ssl_ca)
with NamedTemporaryFile(
mode="w", suffix=".pem", delete=False
) as cert:
cert.write(pem_certificates)
certfile = cert.name
client_params["tlsCAFile"] = certfile
client_params["tlsInsecure"] = self.tls_insecure
else:
client_params["tls"] = False
client = AsyncIOMotorClient(self.host, **client_params)
db = client[self.configuration["database"]]
self.collection = db[self.configuration["collection"]]
yield client
finally:
self.remove_temp_file(certfile)