diff --git a/queue_job/jobrunner/__init__.py b/queue_job/jobrunner/__init__.py index c48065c1d5..90c048f410 100644 --- a/queue_job/jobrunner/__init__.py +++ b/queue_job/jobrunner/__init__.py @@ -28,8 +28,25 @@ class QueueJobRunnerThread(Thread): def __init__(self): Thread.__init__(self) self.daemon = True - port = os.environ.get('ODOO_QUEUE_JOB_PORT') or config['xmlrpc_port'] - self.runner = QueueJobRunner(port or 8069) + scheme = (os.environ.get('ODOO_QUEUE_JOB_SCHEME') or + config.misc.get("queue_job", {}).get('scheme')) + host = (os.environ.get('ODOO_QUEUE_JOB_HOST') or + config.misc.get("queue_job", {}).get('host') or + config['xmlrpc_interface']) + port = (os.environ.get('ODOO_QUEUE_JOB_PORT') or + config.misc.get("queue_job", {}).get('port') or + config['xmlrpc_port']) + user = (os.environ.get('ODOO_QUEUE_JOB_HTTP_AUTH_USER') or + config.misc.get("queue_job", {}). + get('http_auth_user')) + password = (os.environ.get('ODOO_QUEUE_JOB_HTTP_AUTH_PASSWORD') or + config.misc.get("queue_job", {}). + get('http_auth_password')) + self.runner = QueueJobRunner(scheme or 'http', + host or 'localhost', + port or 8069, + user, + password) def run(self): # sleep a bit to let the workers start at ease diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index ac1c6afd9b..e60a872fa7 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -22,23 +22,41 @@ How to use it? -------------- -* Optionally adjust your configuration through environment variables: +* By default, the job runner will - - set ``ODOO_QUEUE_JOB_CHANNELS=root:4`` (or any other channels - configuration) if you don't want the default ``root:1``. + - use ``root:1`` as channels configuration (one single channel) + - connect to Odoo via + - host ``xmlrpc_interface`` or ``localhost`` if unset + - port ``xmlrpc_port``, or ``8069`` if unset + - connect to the database via ``db_host`` and ``db_port`` - - if ``xmlrpc-port`` is not set, you can set it for the jobrunner only with: - ``ODOO_QUEUE_JOB_PORT=8069``. +* To adjust these values, you can either use environment variables: -* Alternatively, configure the channels through the Odoo configuration - file, like: + - ``ODOO_QUEUE_JOB_CHANNELS=root:4`` + - ``ODOO_QUEUE_JOB_SCHEME=https`` + - ``ODOO_QUEUE_JOB_HOST=load-balancer`` + - ``ODOO_QUEUE_JOB_PORT=443`` + - ``ODOO_QUEUE_JOB_HTTP_AUTH_USER=connector`` + - ``ODOO_QUEUE_JOB_HTTP_AUTH_PASSWORD=s3cr3t`` + - ``ODOO_QUEUE_JOB_JOBRUNNER_DB_HOST=master-db`` + - ``ODOO_QUEUE_JOB_JOBRUNNER_DB_PORT=5432`` + +* Or, alternatively, you can add a ``[queue_job]`` section + in Odoo's configuration file, like this: .. code-block:: ini [queue_job] channels = root:4 + scheme = https + host = load-balancer + port = 443 + http_auth_user = connector + http_auth_password = s3cr3t + jobrunner_db_host = master-db + jobrunner_db_port = 5432 -* Or, if using ``anybox.recipe.odoo``, add this to your buildout configuration: +* If using ``anybox.recipe.odoo``, add this to your buildout configuration: .. code-block:: ini @@ -46,6 +64,11 @@ recipe = anybox.recipe.odoo (...) queue_job.channels = root:4 + queue_job.scheme = https + queue_job.host = load-balancer + queue_job.port = 443 + queue_job.http_auth_user = jobrunner + queue_job.http_auth_password = s3cr3t * Start Odoo with ``--load=web,web_kanban,queue_job`` and ``--workers`` greater than 1 [2]_, or set the ``server_wide_modules`` @@ -168,19 +191,36 @@ def _odoo_now(): return _datetime_to_epoch(dt) -def _async_http_get(port, db_name, job_uuid): +def _connection_info_for(db_name): + db_or_uri, connection_info = odoo.sql_db.connection_info_for(db_name) + + for p in ('host', 'port'): + cfg = (os.environ.get('ODOO_QUEUE_JOB_JOBRUNNER_DB_%s' % p.upper()) or + config.misc + .get("queue_job", {}).get('jobrunner_db_' + p)) + + if cfg: + connection_info[p] = cfg + + return connection_info + + +def _async_http_get(scheme, host, port, user, password, db_name, job_uuid): if not session.cookies: # obtain an anonymous session _logger.info("obtaining an anonymous session for the job runner") - url = ('http://localhost:%s/queue_job/session' % (port,)) - response = session.get(url, timeout=30) + url = ('%s://%s:%s/queue_job/session' % (scheme, host, port)) + auth = None + if user: + auth = (user, password) + response = session.get(url, timeout=30, auth=auth) response.raise_for_status() # Method to set failed job (due to timeout, etc) as pending, # to avoid keeping it as enqueued. def set_job_pending(): - connection_info = odoo.sql_db.connection_info_for(db_name)[1] + connection_info = _connection_info_for(db_name) conn = psycopg2.connect(**connection_info) conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) with closing(conn.cursor()) as cr: @@ -194,12 +234,15 @@ def set_job_pending(): # if this was python3 I would be doing this with # asyncio, aiohttp and aiopg def urlopen(): - url = ('http://localhost:%s/queue_job/runjob?db=%s&job_uuid=%s' % - (port, db_name, job_uuid)) + url = ('%s://%s:%s/queue_job/runjob?db=%s&job_uuid=%s' % + (scheme, host, port, db_name, job_uuid)) try: + auth = None + if user: + auth = (user, password) # we are not interested in the result, so we set a short timeout # but not too short so we trap and log hard configuration errors - response = session.get(url, timeout=1) + response = session.get(url, timeout=1, auth=auth) # raise_for_status will result in either nothing, a Client Error # for HTTP Response codes between 400 and 500 or a Server Error @@ -220,7 +263,7 @@ class Database(object): def __init__(self, db_name): self.db_name = db_name - connection_info = odoo.sql_db.connection_info_for(db_name)[1] + connection_info = _connection_info_for(db_name) self.conn = psycopg2.connect(**connection_info) self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) self.has_queue_job = self._has_queue_job() @@ -293,8 +336,18 @@ def set_job_enqueued(self, uuid): class QueueJobRunner(object): - def __init__(self, port=8069, channel_config_string=None): + def __init__(self, + scheme='http', + host='localhost', + port=8069, + user=None, + password=None, + channel_config_string=None): + self.scheme = scheme + self.host = host self.port = port + self.user = user + self.password = password self.channel_manager = ChannelManager() if channel_config_string is None: channel_config_string = _channels() @@ -340,7 +393,13 @@ def run_jobs(self): _logger.info("asking Odoo to run job %s on db %s", job.uuid, job.db_name) self.db_by_name[job.db_name].set_job_enqueued(job.uuid) - _async_http_get(self.port, job.db_name, job.uuid) + _async_http_get(self.scheme, + self.host, + self.port, + self.user, + self.password, + job.db_name, + job.uuid) def process_notifications(self): for db in self.db_by_name.values():