Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions bugwarrior/collect.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
import copy
from functools import cache
from importlib.metadata import entry_points
import logging
import multiprocessing
import time
from typing import TYPE_CHECKING

from jinja2 import Template
from taskw.task import Task

if TYPE_CHECKING:
from bugwarrior.config.validation import Config
from bugwarrior.services import Service

log = logging.getLogger(__name__)

# Sentinels for process completion status
SERVICE_FINISHED_OK = 0
SERVICE_FINISHED_ERROR = 1


def get_service(service_name: str):
@cache
def get_service(service_name: str) -> type["Service"]:
try:
(service,) = entry_points(group='bugwarrior.service', name=service_name)
except ValueError as e:
Expand All @@ -33,16 +40,22 @@ def get_service(service_name: str):
return service.load()


def _aggregate_issues(conf, main_section, target, queue):
def get_service_instances(conf: "Config") -> list["Service"]:
return [
get_service(service_config.service)(service_config, conf.main)
for service_config in conf.service_configs
]


def _aggregate_issues(service: "Service", queue: multiprocessing.Queue):
"""This worker function is separated out from the main
:func:`aggregate_issues` func only so that we can use multiprocessing
on it for speed reasons.
"""

start = time.time()

target = service.config.target
try:
service = get_service(conf[target].service)(conf[target], conf[main_section])
issue_count = 0
for issue in service.issues():
queue.put(issue)
Expand All @@ -67,24 +80,23 @@ def _aggregate_issues(conf, main_section, target, queue):
log.info(f"Done with [{target}] in {duration}.")


def aggregate_issues(conf, main_section, debug):
def aggregate_issues(conf: "Config", debug: bool):
"""Return all issues from every target."""
log.info("Starting to aggregate remote issues.")

# Create and call service objects for every target in the config
targets = conf[main_section].targets

queue = multiprocessing.Queue()

log.info("Spawning %i workers." % len(targets))
services = get_service_instances(conf)

log.info("Spawning %i workers." % len(services))

if debug:
for target in targets:
_aggregate_issues(conf, main_section, target, queue)
for service in services:
_aggregate_issues(service, queue)
else:
for target in targets:
for service in services:
proc = multiprocessing.Process(
target=_aggregate_issues, args=(conf, main_section, target, queue)
target=_aggregate_issues, args=(service, queue)
)
proc.start()

Expand All @@ -94,7 +106,7 @@ def aggregate_issues(conf, main_section, debug):
# and tell some of our workers some incomplete things.
time.sleep(1)

currently_running = len(targets)
currently_running = len(services)
while currently_running > 0:
issue = queue.get(True)
try:
Expand Down
28 changes: 16 additions & 12 deletions bugwarrior/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
import sys
from typing import TYPE_CHECKING

import click
from lockfile import LockTimeout
Expand All @@ -12,6 +13,8 @@
from bugwarrior.config import get_config_path, get_keyring, load_config
from bugwarrior.db import get_defined_udas_as_strings, synchronize

if TYPE_CHECKING:
from bugwarrior.config.validation import Config
log = logging.getLogger(__name__)


Expand All @@ -25,7 +28,9 @@ def _get_section_name(flavor):
return 'general'


def _try_load_config(main_section, interactive=False, quiet=False):
def _try_load_config(
main_section: str, interactive: bool = False, quiet: bool = False
) -> "Config":
try:
return load_config(main_section, interactive, quiet)
except OSError:
Expand Down Expand Up @@ -99,17 +104,15 @@ def pull(dry_run, flavor, interactive, debug, quiet):
main_section = _get_section_name(flavor)
config = _try_load_config(main_section, interactive, quiet)

lockfile_path = os.path.join(
config[main_section].data.path, 'bugwarrior.lockfile'
)
lockfile_path = os.path.join(config.main.data.path, 'bugwarrior.lockfile')
lockfile = PIDLockFile(lockfile_path)
lockfile.acquire(timeout=10)
try:
# Get all the issues. This can take a while.
issue_generator = aggregate_issues(config, main_section, debug)
issue_generator = aggregate_issues(config, debug)

# Stuff them in the taskwarrior db as necessary
synchronize(issue_generator, config, main_section, dry_run)
synchronize(issue_generator, config, dry_run)
finally:
lockfile.release()
except LockTimeout:
Expand Down Expand Up @@ -138,11 +141,12 @@ def vault():

def targets():
config = _try_load_config('general')
for target in config['general'].targets:
service_class = get_service(config[target].service)
for value in [v for v in dict(config[target]).values() if isinstance(v, str)]:
if '@oracle:use_keyring' in value:
yield service_class.get_keyring_service(config[target])
for service_config in config.service_configs:
for value in dict(service_config).values():
if isinstance(value, str) and '@oracle:use_keyring' in value:
yield get_service(service_config.service).get_keyring_service(
service_config
)


@vault.command()
Expand Down Expand Up @@ -214,7 +218,7 @@ def uda(flavor):
main_section = _get_section_name(flavor)
conf = _try_load_config(main_section)
print("# Bugwarrior UDAs")
for uda in get_defined_udas_as_strings(conf, main_section):
for uda in get_defined_udas_as_strings(conf):
print(uda)
print("# END Bugwarrior UDAs")

Expand Down
113 changes: 66 additions & 47 deletions bugwarrior/config/load.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import codecs
import configparser
import logging
import os
from pathlib import Path
from typing import Any

try:
import tomllib # python>=3.11
except ImportError:
import tomli as tomllib # backport

from . import schema
from .validation import Config, validate_config

# The name of the environment variable that can be used to ovewrite the path
# to the bugwarriorrc file
Expand Down Expand Up @@ -52,58 +53,76 @@ def get_config_path():
return paths[0]


def format_config(config: dict) -> dict[str, Any]:
if "general" in config:
config.setdefault("flavor", {})["general"] = config.pop("general")

config["services"] = [
{**config.pop(section), "target": section}
for section in list(config)
if section not in {"hooks", "notifications", "flavor"}
]
return config


def parse_toml_file(configpath: str) -> dict:
with open(configpath, 'rb') as file:
return tomllib.load(file)


def parse_ini_file(configpath: str) -> dict:
rawconfig = BugwarriorConfigParser()
with open(configpath, encoding="utf-8") as buff:
rawconfig.read_file(buff)

config = {"flavor": {}}
for section in rawconfig.sections():
if section in ['hooks', 'notifications']:
config[section] = dict(rawconfig[section])
elif section == 'general' or section.startswith('flavor.'):
Comment thread
ryneeverett marked this conversation as resolved.
name = section.removeprefix('flavor.')
config["flavor"][name] = {
key.replace('.', '_'): value
for key, value in rawconfig[section].items()
}

# All other sections are assumed to be services
else:
service = rawconfig[section].pop('service')
service_prefix = 'ado' if service == 'azuredevops' else service
config[section] = {'service': service}
for key, value in rawconfig[section].items():
try:
prefix, unprefixed_key = key.split('.')
except ValueError: # missing prefix
prefix = None
unprefixed_key = key
if prefix != service_prefix:
raise SystemExit(
f"[{section}]\n{key} <-expected prefix "
f"'{service_prefix}': did you mean "
f"'{service_prefix}.{unprefixed_key}'?"
)
config[section][unprefixed_key] = value

return config


def parse_file(configpath: str) -> dict:
if os.path.splitext(configpath)[-1] == '.toml':
with open(configpath, 'rb') as f:
config = tomllib.load(f)
# Flatten flavors into top-level sections (if they're unquoted).
for k, v in config.get('flavor', {}).items():
config[f'flavor.{k}'] = v
config.pop('flavor', None)
if Path(configpath).suffix == '.toml':
config = parse_toml_file(configpath)
else:
rawconfig = BugwarriorConfigParser()
with codecs.open(configpath, "r", "utf-8") as buff:
rawconfig.read_file(buff)
config = {}
for section in rawconfig.sections():
if section in ['hooks', 'notifications']:
config[section] = dict(rawconfig[section])
elif section == 'general':
config[section] = {
k.replace('log.', 'log_'): v for k, v in rawconfig[section].items()
}
elif section.startswith('flavor.'):
config[section] = {
k.replace('.', '_'): v for k, v in rawconfig[section].items()
}
else:
service = rawconfig[section].pop('service')
service_prefix = 'ado' if service == 'azuredevops' else service
config[section] = {'service': service}
for k, v in rawconfig[section].items():
try:
prefix, key = k.split('.')
except ValueError: # missing prefix
prefix = None
key = k
if prefix != service_prefix:
raise SystemExit(
f"[{section}]\n{k} <-expected prefix "
f"'{service_prefix}': did you mean "
f"'{service_prefix}.{key}'?"
)
config[section][key] = v
return config
config = parse_ini_file(configpath)
return format_config(config)


def load_config(main_section, interactive, quiet) -> dict:
def load_config(main_section, interactive, quiet) -> Config:
configpath = get_config_path()
rawconfig = parse_file(configpath)
rawconfig[main_section]['interactive'] = interactive
config = schema.validate_config(rawconfig, main_section, configpath)
rawconfig['flavor'][main_section]['interactive'] = interactive
config = validate_config(rawconfig, main_section, configpath)
configure_logging(
config[main_section].log_file,
'WARNING' if quiet else config[main_section].log_level,
config.main.log_file, 'WARNING' if quiet else config.main.log_level
)
return config

Expand Down
Loading
Loading