Skip to content

Changed observer module from local to remote in Python Client #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
.idea
learning_orchestra_client/.idea
learning_orchestra_client/__pycache__
learning_orchestra_client/dataset/__pycache__
learning_orchestra_client/transform/__pycache__
learning_orchestra_client/main.py
learning_orchestra_client/explore/__pycache__
learning_orchestra_client/builder/__pycache__
__pycache__
docs
venv
sentiment_analysis_output.py
mnist_output.py
mnist_treatment.py
164 changes: 101 additions & 63 deletions learning_orchestra_client/observe/observe.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,122 @@
from pymongo import MongoClient, change_stream

import requests
from learning_orchestra_client._util._response_treat import ResponseTreat

class Observer:
debug = True

__TIMEOUT_TIME_MULTIPLICATION = 1000
__INPUT_NAME = "filename"

__FILENAME_REQUEST_FIELD = 'filename'
__OBSERVER_TYPE_REQUEST_FIELD = 'observe_type'
__TIMEOUT_REQUEST_FIELD = 'timeout'
__OBSERVER_NAME_REQUEST_FIELD = 'observer_name'
__OBSERVER_PIPELINE_REQUEST_FIELD = 'pipeline'
__MICROSERVICE_PORT = '5010'


def __init__(self, cluster_ip: str):
cluster_ip = cluster_ip.replace("http://", "")
mongo_url = f'mongodb://root:owl45%2321@{cluster_ip}'
mongo_client = MongoClient(
mongo_url
)
self.__api_path = "/api/learningOrchestra/v1/observer"
self.__service_base_url = f'{cluster_ip}:{self.__MICROSERVICE_PORT}'
self.__service_url = f'{self.__service_base_url}{self.__api_path}'
self.cluster_ip = cluster_ip.replace("http://", "")
self.__response_treat = ResponseTreat()

self.__database = mongo_client.database
def wait(self, name: str, timeout: int=0,
observer_name:str='', pretty_response: bool = False) -> dict:

def wait(self, name: str, timeout: int = None) -> dict:
"""
:description: Observe the end of a pipe for a timeout seconds or
Observe the end of a pipe for a timeout seconds or
until the pipe finishes its execution.

name: Represents the pipe name. Any tune, train, predict service can
wait its finish with a
wait method call.
timeout: the maximum time to wait the observed step, in seconds.
:param name: Represents the pipe name. Any tune, train, predict service can wait its finish with a wait method call.
:param timeout: the maximum time to wait the observed step, in seconds. If set to 0, there will be no timeout
:param observer_name: the name of the observer (default: observer_)

:return: Returns a dictionary with the content of a mongo collection, representing any pipe result
"""

return self.watch(name=name,
timeout=timeout,
type="wait",
observer_name=observer_name,
pretty_response=pretty_response)

def watch(self, name: str, timeout: int=0, type:str="wait",
observer_name:str='',pipeline:[]=None,
pretty_response: bool = False) -> dict:

"""
Observe the pipe for a timeout seconds or
until the pipe finishes its execution. It is a more complete method,
you can use it to configure your own pipelines if you wish. For more
simplistic uses, try the methods "wait" and "start_observing_pipe"

:param name: the name of the pipe to be observed. A train, predict, explore, transform or any other pipe can be observed.
:param timeout: the maximum time to wait the observed step, in seconds. If set to 0, there will be no timeout
:param type: type of the observation, it can be "wait" to observe the end of the pipe, "observe" to observe until the pipe change it's content or "custom" if you wish to provide your own mongo pipeline
:param observer_name: the name of the observer (default observer_)
:param pipeline: the custom pipeline that you wish to use on the observer. It is only used if type is set to "custom"

:return: If True it returns a String. Otherwise, it returns
a dictionary with the content of a mongo collection, representing
any pipe result
:return: Returns a dictionary with the content of a mongo collection, representing any pipe result
"""

dataset_collection = self.__database[name]
metadata_query = {"_id": 0}
dataset_metadata = dataset_collection.find_one(metadata_query)

if dataset_metadata["finished"]:
return dataset_metadata

observer_query = [
{'$match': {
'$and':
[
{'operationType': 'update'},
{'fullDocument.finished': {'$eq': True}}
]
}}
]
return dataset_collection.watch(
observer_query,
full_document='updateLookup',
max_await_time_ms=timeout * self.__TIMEOUT_TIME_MULTIPLICATION
).next()['fullDocument']

def observe_pipe(self, name: str, timeout: int = None) -> \
change_stream.CollectionChangeStream:
if type == "all" or type == "wait" or type == '1':
type = "wait"
elif type == "finish" or type == "observe" or type == '2':
type = "observe"
else:
raise NameError("Invalid type parameter: " + type)

request_url = f'{self.__service_url}'
request_body = {
self.__FILENAME_REQUEST_FIELD: name,
self.__OBSERVER_TYPE_REQUEST_FIELD: type,
self.__TIMEOUT_REQUEST_FIELD: timeout,
self.__OBSERVER_NAME_REQUEST_FIELD: observer_name,
self.__OBSERVER_PIPELINE_REQUEST_FIELD: pipeline
}

observer_uri = requests.post(url=f'{request_url}',
json=request_body)

if(observer_uri.status_code >= 200 and observer_uri.status_code < 400):
url = f"{self.__service_base_url}{observer_uri.json()['result']}"

response = requests.get(url=url)
else:
raise Exception(observer_uri.json()['result'])

if response.status_code >= 200 and response.status_code < 400:
response = self.__response_treat.treatment(response,pretty_response)
else:
if response.status_code == 408:
raise TimeoutError(response.json()['result'])

raise Exception(response.json()['result'])

delete_resp = requests.delete(url=url)
return response


def start_observing_pipe(self, name: str, timeout: int=0,
observer_name:str='',
pretty_response: bool = False) -> dict:
"""
:description: It waits until a pipe change its content
It waits until a pipe change its content
(replace, insert, update and delete mongoDB collection operation
types), so it is a bit different
from wait method with a timeout and a finish explicit condition.

:name: the name of the pipe to be observed. A train, predict, explore,
transform or any
other pipe can be observed.
timeout: the maximum time to wait the observed step, in milliseconds.
:param name: the name of the pipe to be observed. A train, predict, explore, transform or any other pipe can be observed.
:param timeout: the maximum time to wait the observed step, in seconds. If set to 0, there will be no timeout
:param observer_name: the name of the observer (default observer_)

:return: A pymongo CollectionChangeStream object. You must use the
builtin next() method to iterate over changes.
:returns: a dictionary with the content of a mongo collection, representing any pipe result
"""

observer_query = [
{'$match': {
'$or': [
{'operationType': 'replace'},
{'operationType': 'insert'},
{'operationType': 'update'},
{'operationType': 'delete'}

]
}}
]
return self.__database[name].watch(
observer_query,
max_await_time_ms=timeout * self.__TIMEOUT_TIME_MULTIPLICATION,
full_document='updateLookup')
return self.watch(name=name,
timeout=timeout,
type="observe",
observer_name=observer_name,
pretty_response=pretty_response)
2 changes: 1 addition & 1 deletion pipeline/imdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from learning_orchestra_client.train.scikitlearn import TrainScikitLearn
from learning_orchestra_client.predict.scikitlearn import PredictScikitLearn

CLUSTER_IP = "http://34.123.167.241"
CLUSTER_IP = "http://35.247.203.13"

dataset_csv = DatasetCsv(CLUSTER_IP)

Expand Down
3 changes: 2 additions & 1 deletion pipeline/mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from learning_orchestra_client.predict.tensorflow import PredictTensorflow
from learning_orchestra_client.evaluate.tensorflow import EvaluateTensorflow

CLUSTER_IP = "http://35.224.50.116"
CLUSTER_IP = "http://35.247.197.191"

dataset_generic = DatasetGeneric(CLUSTER_IP)

dataset_generic.insert_dataset_async(
dataset_name="mnist_train_images",
url="https://drive.google.com/u/0/uc?"
Expand Down
6 changes: 3 additions & 3 deletions pipeline/titanic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
from learning_orchestra_client.transform.data_type import TransformDataType
from learning_orchestra_client.builder.builder import BuilderSparkMl

CLUSTER_IP = "http://35.193.116.104"
CLUSTER_IP = "http://35.247.197.191"

dataset_csv = DatasetCsv(CLUSTER_IP)

dataset_csv.insert_dataset_async(
url="https://filebin.net/boniydu54k710l54/train.csv?t=s350xryf",
url="https://filebin.net/48b0fwidk4amp7fa/train.csv",
dataset_name="titanic_training",
)
dataset_csv.insert_dataset_async(
url="https://filebin.net/udtf7eogfgasqnx5/test.csv?t=h79pcy0l",
url="https://filebin.net/1ewibio2rziv6lrm/test.csv",
dataset_name="titanic_testing"
)

Expand Down