Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions queue_job/jobrunner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,25 @@ class QueueJobRunnerThread(Thread):
def __init__(self):
Thread.__init__(self)
self.daemon = True
port = os.environ.get('ODOO_QUEUE_JOB_PORT') or config['xmlrpc_port']
self.runner = QueueJobRunner(port or 8069)
scheme = (os.environ.get('ODOO_QUEUE_JOB_SCHEME') or
config.misc.get("queue_job", {}).get('scheme'))
host = (os.environ.get('ODOO_QUEUE_JOB_HOST') or
config.misc.get("queue_job", {}).get('host') or
config['xmlrpc_interface'])
port = (os.environ.get('ODOO_QUEUE_JOB_PORT') or
config.misc.get("queue_job", {}).get('port') or
config['xmlrpc_port'])
user = (os.environ.get('ODOO_QUEUE_JOB_HTTP_AUTH_USER') or
config.misc.get("queue_job", {}).
get('http_auth_user'))
password = (os.environ.get('ODOO_QUEUE_JOB_HTTP_AUTH_PASSWORD') or
config.misc.get("queue_job", {}).
get('http_auth_password'))
self.runner = QueueJobRunner(scheme or 'http',
host or 'localhost',
port or 8069,
user,
password)

def run(self):
# sleep a bit to let the workers start at ease
Expand Down
95 changes: 77 additions & 18 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,53 @@
How to use it?
--------------

* Optionally adjust your configuration through environment variables:
* By default, the job runner will

- set ``ODOO_QUEUE_JOB_CHANNELS=root:4`` (or any other channels
configuration) if you don't want the default ``root:1``.
- use ``root:1`` as channels configuration (one single channel)
- connect to Odoo via
- host ``xmlrpc_interface`` or ``localhost`` if unset
- port ``xmlrpc_port``, or ``8069`` if unset
- connect to the database via ``db_host`` and ``db_port``

- if ``xmlrpc-port`` is not set, you can set it for the jobrunner only with:
``ODOO_QUEUE_JOB_PORT=8069``.
* To adjust these values, you can either use environment variables:

* Alternatively, configure the channels through the Odoo configuration
file, like:
- ``ODOO_QUEUE_JOB_CHANNELS=root:4``
- ``ODOO_QUEUE_JOB_SCHEME=https``
- ``ODOO_QUEUE_JOB_HOST=load-balancer``
- ``ODOO_QUEUE_JOB_PORT=443``
- ``ODOO_QUEUE_JOB_HTTP_AUTH_USER=connector``
- ``ODOO_QUEUE_JOB_HTTP_AUTH_PASSWORD=s3cr3t``
- ``ODOO_QUEUE_JOB_JOBRUNNER_DB_HOST=master-db``
- ``ODOO_QUEUE_JOB_JOBRUNNER_DB_PORT=5432``

* Or, alternatively, you can add a ``[queue_job]`` section
in Odoo's configuration file, like this:

.. code-block:: ini

[queue_job]
channels = root:4
scheme = https
host = load-balancer
port = 443
http_auth_user = connector
http_auth_password = s3cr3t
jobrunner_db_host = master-db
jobrunner_db_port = 5432

* Or, if using ``anybox.recipe.odoo``, add this to your buildout configuration:
* If using ``anybox.recipe.odoo``, add this to your buildout configuration:

.. code-block:: ini

[odoo]
recipe = anybox.recipe.odoo
(...)
queue_job.channels = root:4
queue_job.scheme = https
queue_job.host = load-balancer
queue_job.port = 443
queue_job.http_auth_user = jobrunner
queue_job.http_auth_password = s3cr3t

* Start Odoo with ``--load=web,web_kanban,queue_job``
and ``--workers`` greater than 1 [2]_, or set the ``server_wide_modules``
Expand Down Expand Up @@ -168,19 +191,36 @@ def _odoo_now():
return _datetime_to_epoch(dt)


def _async_http_get(port, db_name, job_uuid):
def _connection_info_for(db_name):
db_or_uri, connection_info = odoo.sql_db.connection_info_for(db_name)

for p in ('host', 'port'):
cfg = (os.environ.get('ODOO_QUEUE_JOB_JOBRUNNER_DB_%s' % p.upper()) or
config.misc
.get("queue_job", {}).get('jobrunner_db_' + p))

if cfg:
connection_info[p] = cfg

return connection_info


def _async_http_get(scheme, host, port, user, password, db_name, job_uuid):

if not session.cookies:
# obtain an anonymous session
_logger.info("obtaining an anonymous session for the job runner")
url = ('http://localhost:%s/queue_job/session' % (port,))
response = session.get(url, timeout=30)
url = ('%s://%s:%s/queue_job/session' % (scheme, host, port))
auth = None
if user:
auth = (user, password)
response = session.get(url, timeout=30, auth=auth)
response.raise_for_status()

# Method to set failed job (due to timeout, etc) as pending,
# to avoid keeping it as enqueued.
def set_job_pending():
connection_info = odoo.sql_db.connection_info_for(db_name)[1]
connection_info = _connection_info_for(db_name)
conn = psycopg2.connect(**connection_info)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with closing(conn.cursor()) as cr:
Expand All @@ -194,12 +234,15 @@ def set_job_pending():
# if this was python3 I would be doing this with
# asyncio, aiohttp and aiopg
def urlopen():
url = ('http://localhost:%s/queue_job/runjob?db=%s&job_uuid=%s' %
(port, db_name, job_uuid))
url = ('%s://%s:%s/queue_job/runjob?db=%s&job_uuid=%s' %
(scheme, host, port, db_name, job_uuid))
try:
auth = None
if user:
auth = (user, password)
# we are not interested in the result, so we set a short timeout
# but not too short so we trap and log hard configuration errors
response = session.get(url, timeout=1)
response = session.get(url, timeout=1, auth=auth)

# raise_for_status will result in either nothing, a Client Error
# for HTTP Response codes between 400 and 500 or a Server Error
Expand All @@ -220,7 +263,7 @@ class Database(object):

def __init__(self, db_name):
self.db_name = db_name
connection_info = odoo.sql_db.connection_info_for(db_name)[1]
connection_info = _connection_info_for(db_name)
self.conn = psycopg2.connect(**connection_info)
self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.has_queue_job = self._has_queue_job()
Expand Down Expand Up @@ -293,8 +336,18 @@ def set_job_enqueued(self, uuid):

class QueueJobRunner(object):

def __init__(self, port=8069, channel_config_string=None):
def __init__(self,
scheme='http',
host='localhost',
port=8069,
user=None,
password=None,
channel_config_string=None):
self.scheme = scheme
self.host = host
self.port = port
self.user = user
self.password = password
self.channel_manager = ChannelManager()
if channel_config_string is None:
channel_config_string = _channels()
Expand Down Expand Up @@ -340,7 +393,13 @@ def run_jobs(self):
_logger.info("asking Odoo to run job %s on db %s",
job.uuid, job.db_name)
self.db_by_name[job.db_name].set_job_enqueued(job.uuid)
_async_http_get(self.port, job.db_name, job.uuid)
_async_http_get(self.scheme,
self.host,
self.port,
self.user,
self.password,
job.db_name,
job.uuid)

def process_notifications(self):
for db in self.db_by_name.values():
Expand Down