Skip to content

Commit

Permalink
Merge pull request softlayer#2139 from allmightyspiff/internalAuth
Browse files Browse the repository at this point in the history
Adding support for internal styles of authentication
  • Loading branch information
allmightyspiff authored Apr 30, 2024
2 parents 401ab9f + e16e0f9 commit 6d7d5a4
Show file tree
Hide file tree
Showing 22 changed files with 646 additions and 52 deletions.
4 changes: 2 additions & 2 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"files": "^.secrets.baseline$",
"lines": null
},
"generated_at": "2024-04-18T01:09:09Z",
"generated_at": "2024-04-25T01:18:20Z",
"plugins_used": [
{
"name": "AWSKeyDetector"
Expand Down Expand Up @@ -554,7 +554,7 @@
"hashed_secret": "a4c805a62a0387010cd172cfed6f6772eb92a5d6",
"is_secret": false,
"is_verified": false,
"line_number": 76,
"line_number": 81,
"type": "Secret Keyword",
"verified_result": null
}
Expand Down
40 changes: 40 additions & 0 deletions README-internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
This document is for internal users wanting to use this library to interact with the internal API. It will not work for `api.softlayer.com`.


## Certificate Example

For use with a utility certificate. In your config file (usually `~/.softlayer`), you need to set the following:

```
[softlayer]
endpoint_url = https://<internal api endpoint>/v3/internal/rest/
timeout = 0
theme = dark
auth_cert = /etc/ssl/certs/my_utility_cert-dev.pem
server_cert = /etc/ssl/certs/allCAbundle.pem
```

`auth_cert`: is your utility user certificate
`server_cert`: is the CA certificate bundle to validate the internal API ssl chain. Otherwise you get self-signed ssl errors without this.


```
import SoftLayer
import logging
import click
@click.command()
def testAuthentication():
client = SoftLayer.CertificateClient()
result = client.call('SoftLayer_Account', 'getObject', id=12345, mask="mask[id,companyName]")
print(result)
if __name__ == "__main__":
logger = logging.getLogger()
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
testAuthentication()
```

## Employee Example
221 changes: 206 additions & 15 deletions SoftLayer/API.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
"""
# pylint: disable=invalid-name
import time
import warnings

import concurrent.futures as cf
import json
Expand All @@ -28,11 +27,13 @@

__all__ = [
'create_client_from_env',
'employee_client',
'Client',
'BaseClient',
'API_PUBLIC_ENDPOINT',
'API_PRIVATE_ENDPOINT',
'IAMClient',
'CertificateClient'
]

VALID_CALL_ARGS = set((
Expand Down Expand Up @@ -143,33 +144,112 @@ def create_client_from_env(username=None,
return BaseClient(auth=auth, transport=transport, config_file=config_file)


def Client(**kwargs):
"""Get a SoftLayer API Client using environmental settings.
def employee_client(username=None,
access_token=None,
endpoint_url=None,
timeout=None,
auth=None,
config_file=None,
proxy=None,
user_agent=None,
transport=None,
verify=True):
"""Creates an INTERNAL SoftLayer API client using your environment.
Settings are loaded via keyword arguments, environemtal variables and config file.
Deprecated in favor of create_client_from_env()
:param username: your user ID
:param access_token: hash from SoftLayer_User_Employee::performExternalAuthentication(username, password, token)
:param password: password to use for employee authentication
:param endpoint_url: the API endpoint base URL you wish to connect to.
Set this to API_PRIVATE_ENDPOINT to connect via SoftLayer's private network.
:param proxy: proxy to be used to make API calls
:param integer timeout: timeout for API requests
:param auth: an object which responds to get_headers() to be inserted into the xml-rpc headers.
Example: `BasicAuthentication`
:param config_file: A path to a configuration file used to load settings
:param user_agent: an optional User Agent to report when making API
calls if you wish to bypass the packages built in User Agent string
:param transport: An object that's callable with this signature: transport(SoftLayer.transports.Request)
:param bool verify: decide to verify the server's SSL/TLS cert.
"""
warnings.warn("use SoftLayer.create_client_from_env() instead",
DeprecationWarning)
settings = config.get_client_settings(username=username,
api_key=None,
endpoint_url=endpoint_url,
timeout=timeout,
proxy=proxy,
verify=None,
config_file=config_file)

url = settings.get('endpoint_url')
verify = settings.get('verify', True)

if 'internal' not in url:
raise exceptions.SoftLayerError(f"{url} does not look like an Internal Employee url.")

if transport is None:
if url is not None and '/rest' in url:
# If this looks like a rest endpoint, use the rest transport
transport = transports.RestTransport(
endpoint_url=settings.get('endpoint_url'),
proxy=settings.get('proxy'),
timeout=settings.get('timeout'),
user_agent=user_agent,
verify=verify,
)
else:
# Default the transport to use XMLRPC
transport = transports.XmlRpcTransport(
endpoint_url=settings.get('endpoint_url'),
proxy=settings.get('proxy'),
timeout=settings.get('timeout'),
user_agent=user_agent,
verify=verify,
)

if access_token is None:
access_token = settings.get('access_token')

user_id = settings.get('userid')

# Assume access_token is valid for now, user has logged in before at least.
if access_token and user_id:
auth = slauth.EmployeeAuthentication(user_id, access_token)
return EmployeeClient(auth=auth, transport=transport)
else:
# This is for logging in mostly.
LOGGER.info("No access_token or userid found in settings, creating a No Auth client for now.")
return EmployeeClient(auth=None, transport=transport)


def Client(**kwargs):
"""Get a SoftLayer API Client using environmental settings."""
return create_client_from_env(**kwargs)


class BaseClient(object):
"""Base SoftLayer API client.
:param auth: auth driver that looks like SoftLayer.auth.AuthenticationBase
:param transport: An object that's callable with this signature:
transport(SoftLayer.transports.Request)
:param transport: An object that's callable with this signature: transport(SoftLayer.transports.Request)
"""

_prefix = "SoftLayer_"
auth: slauth.AuthenticationBase

def __init__(self, auth=None, transport=None, config_file=None):
if config_file is None:
config_file = CONFIG_FILE
self.auth = auth
self.config_file = config_file
self.settings = config.get_config(self.config_file)
self.__setAuth(auth)
self.__setTransport(transport)

def __setAuth(self, auth=None):
"""Prepares the authentication property"""
self.auth = auth

def __setTransport(self, transport=None):
"""Prepares the transport property"""
if transport is None:
url = self.settings['softlayer'].get('endpoint_url')
if url is not None and '/rest' in url:
Expand All @@ -194,9 +274,7 @@ def __init__(self, auth=None, transport=None, config_file=None):

self.transport = transport

def authenticate_with_password(self, username, password,
security_question_id=None,
security_question_answer=None):
def authenticate_with_password(self, username, password, security_question_id=None, security_question_answer=None):
"""Performs Username/Password Authentication
:param string username: your SoftLayer username
Expand Down Expand Up @@ -259,8 +337,7 @@ def call(self, service, method, *args, **kwargs):

invalid_kwargs = set(kwargs.keys()) - VALID_CALL_ARGS
if invalid_kwargs:
raise TypeError(
'Invalid keyword arguments: %s' % ','.join(invalid_kwargs))
raise TypeError('Invalid keyword arguments: %s' % ','.join(invalid_kwargs))

prefixes = (self._prefix, 'BluePages_Search', 'IntegratedOfferingTeam_Region')
if self._prefix and not service.startswith(prefixes):
Expand All @@ -286,6 +363,7 @@ def call(self, service, method, *args, **kwargs):
request.filter = kwargs.get('filter')
request.limit = kwargs.get('limit')
request.offset = kwargs.get('offset')
request.url = self.settings['softlayer'].get('endpoint_url')
if kwargs.get('verify') is not None:
request.verify = kwargs.get('verify')

Expand Down Expand Up @@ -391,6 +469,31 @@ def __len__(self):
return 0


class CertificateClient(BaseClient):
"""Client that works with a X509 Certificate for authentication.
Will read the certificate file from the config file (~/.softlayer usually).
> auth_cert = /path/to/authentication/cert.pm
> server_cert = /path/to/CAcert.pem
Set auth to a SoftLayer.auth.Authentication class to manually set authentication
"""

def __init__(self, auth=None, transport=None, config_file=None):
BaseClient.__init__(self, auth, transport, config_file)
self.__setAuth(auth)

def __setAuth(self, auth=None):
"""Prepares the authentication property"""
if auth is None:
auth_cert = self.settings['softlayer'].get('auth_cert')
serv_cert = self.settings['softlayer'].get('server_cert', None)
auth = slauth.X509Authentication(auth_cert, serv_cert)
self.auth = auth

def __repr__(self):
return "CertificateClient(transport=%r, auth=%r)" % (self.transport, self.auth)


class IAMClient(BaseClient):
"""IBM ID Client for using IAM authentication
Expand Down Expand Up @@ -575,6 +678,94 @@ def __repr__(self):
return "IAMClient(transport=%r, auth=%r)" % (self.transport, self.auth)


class EmployeeClient(BaseClient):
"""Internal SoftLayer Client
:param auth: auth driver that looks like SoftLayer.auth.AuthenticationBase
:param transport: An object that's callable with this signature: transport(SoftLayer.transports.Request)
"""

def __init__(self, auth=None, transport=None, config_file=None, account_id=None):
BaseClient.__init__(self, auth, transport, config_file)
self.account_id = account_id

def authenticate_with_internal(self, username, password, security_token=None):
"""Performs internal authentication
:param string username: your softlayer username
:param string password: your softlayer password
:param int security_token: your 2FA token, prompt if None
"""

self.auth = None
if security_token is None:
security_token = input("Enter your 2FA Token now: ")
if len(security_token) != 6:
raise exceptions.SoftLayerAPIError("Invalid security token: {}".format(security_token))

auth_result = self.call('SoftLayer_User_Employee', 'performExternalAuthentication',
username, password, security_token)

self.settings['softlayer']['access_token'] = auth_result['hash']
self.settings['softlayer']['userid'] = str(auth_result['userId'])
# self.settings['softlayer']['refresh_token'] = tokens['refresh_token']

config.write_config(self.settings, self.config_file)
self.auth = slauth.EmployeeAuthentication(auth_result['userId'], auth_result['hash'])

return auth_result

def authenticate_with_hash(self, userId, access_token):
"""Authenticates to the Internal SL API with an employee userid + token
:param string userId: Employee UserId
:param string access_token: Employee Hash Token
"""
self.auth = slauth.EmployeeAuthentication(userId, access_token)

def refresh_token(self, userId, auth_token):
"""Refreshes the login token"""

# Go directly to base client, to avoid infite loop if the token is super expired.
auth_result = BaseClient.call(self, 'SoftLayer_User_Employee', 'refreshEncryptedToken', auth_token, id=userId)
if len(auth_result) > 1:
for returned_data in auth_result:
# Access tokens should be 188 characters, but just incase its longer or something.
if len(returned_data) > 180:
self.settings['softlayer']['access_token'] = returned_data
else:
message = "Excepted 2 properties from refreshEncryptedToken, got {}|".format(auth_result)
raise exceptions.SoftLayerAPIError(message)

config.write_config(self.settings, self.config_file)
self.auth = slauth.EmployeeAuthentication(userId, auth_result[0])
return auth_result

def call(self, service, method, *args, **kwargs):
"""Handles refreshing Employee tokens in case of a HTTP 401 error"""
if (service == 'SoftLayer_Account' or service == 'Account') and not kwargs.get('id'):
if not self.account_id:
raise exceptions.SoftLayerError("SoftLayer_Account service requires an ID")
kwargs['id'] = self.account_id

try:
return BaseClient.call(self, service, method, *args, **kwargs)
except exceptions.SoftLayerAPIError as ex:
if ex.faultCode == "SoftLayer_Exception_EncryptedToken_Expired":
userId = self.settings['softlayer'].get('userid')
access_token = self.settings['softlayer'].get('access_token')
LOGGER.warning("Token has expired, trying to refresh. %s", ex.faultString)
self.refresh_token(userId, access_token)
# Try the Call again this time....
return BaseClient.call(self, service, method, *args, **kwargs)

else:
raise ex

def __repr__(self):
return "EmployeeClient(transport=%r, auth=%r)" % (self.transport, self.auth)


class Service(object):
"""A SoftLayer Service.
Expand Down
Loading

0 comments on commit 6d7d5a4

Please sign in to comment.