Source code for omxware.config.Configuration
# -*- coding: utf-8 -*-
import urllib3
from keycloak import KeycloakOpenID as KeyCloak
from keycloak.exceptions import KeycloakAuthenticationError
from omxware.config.OmxConfig import OmxConfig
from omxware.utils.AESCipher import AESCipher
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
"""OMXWware Configuration class"""
[docs]class Configuration:
__omx_token = ''
__keycloak_token = ''
__server = ''
__userinfo = {}
__env = None
def __init__(self, omx_token, env="public"):
self.__env = env
server_url = OmxConfig.get_server(env)
self.__omx_token = omx_token
self.__server = server_url
if self.is_valid_token() != True:
raise ConnectionError
[docs] def env(self):
return self.__env
[docs] def server_url(self):
return self.__server
[docs] def token(self):
return self.__omx_token
[docs] def user_info(self):
return self.__userinfo
[docs] def auth_token(self):
if self.is_valid_token():
return self.__keycloak_token
else:
return None
def __parse_token(self):
aes = AESCipher()
token_decrypted = aes.decrypt(self.token())
return token_decrypted.split('::::')
[docs] def is_valid_token(self):
credentials = self.__parse_token();
username = credentials[1]
pwd = credentials[2]
keycloak = KeyCloak(server_url="https://auth.s2s-omxware.us-south.containers.appdomain.cloud/auth/",
client_id="omx-zeppelin",
realm_name="omxware",
client_secret_key="c05b7553-cf21-4f0c-ab81-a38aca3ba172",
verify=False)
if self.__env != 'public':
keycloak = KeyCloak(server_url="https://omx-auth.sl.cloud9.ibm.com/auth/",
client_id="omx-zeppelin",
realm_name="omxware",
client_secret_key="1320e78d-025d-48eb-ad3e-451281786932",
verify=False)
try:
# Get Token
token = keycloak.token(username, pwd)
self.__userinfo = keycloak.userinfo(token['access_token'])
self.__keycloak_token = token['access_token']
return True
except KeycloakAuthenticationError as auth_error:
# Exception object looks like this
# keycloak.exceptions.KeycloakAuthenticationError:
# 401: b'{"error":"invalid_grant","error_description":"Invalid user credentials"}'
error_msg = ''
if auth_error['error_description'] != None:
error_msg = auth_error['error_description']
if error_msg.strip() != None:
print(error_msg)
else:
print(auth_error['error_description'])
return False