subset#
Sharded repodata subsets.
Traverse dependencies of installed and to-be-installed packages to generate a useful subset for the solver.
The algorithm developed here is a direct result of the following CEP:
https://conda.org/learn/ceps/cep-0016 (Sharded Repodata)
In this algorithm we treat a (channel, package name) as a node, its dependencies as edges. We then traverse all edges to discover all reachable (channel, package name) tuples. The solver should be able to find a solution with only this subset.
This subset is overgenerous since the user is unlikely to want to install very old packages and their dependencies. If this is too slow, we could deploy heuristics that automatically ignore older package versions. We could also allow the user to configure minimum versions of common packages and ignore older versions and their dependencies, falling back to a full solve if unsatisfiable.
We treat both sharded and monolithic repodata as if they were made up of per-package shards, computing a subset of both. This is because it is possible for the monolithic repodata to mention packages that exist in the true sharded repodata but would not be found by only traversing the shards.
We treat all repodata as sharded, even if no actual sharded repodata has been found.
## Example usage
The following constructs several repodata (noarch and linux-64) from a single channel name and a list of root packages:
``` from conda.models.channel import Channel from _conda.shards_subset import build_repodata_subset
channel = Channel("conda-forge-sharded/linux-64") channel_data = build_repodata_subset(["python", "pandas"], [channel.url()]) repodata = {}
- for url in channel_data:
repodata[url] = channel_data.build_repodata()
# ... this is what's fed to the solver ```
Classes#
Build a subset of repodata by traversing all packages that are dependencies |
|
Implement insert() interface of .cache.ShardCache() as a queue, instead of |
Functions#
|
Yield (NodeId, Node) for all root packages found in shardlikes. |
|
Retrieve all necessary information to build a repodata subset. |
|
Fetch batches of shards from cache until in_queue sees None. Enqueue found |
|
Fetch shards from the network that are received on in_queue, until we see |
|
For offline mode, where network requests are not allowed. |
Attributes#
- THREAD_WAIT_TIMEOUT = 5#
- REACHABLE_PIPELINED_MAX_TIMEOUTS = 10#
- QUEUE_TIMEOUT = 1#
- class Node#
- _nodes_from_packages(root_packages: list[str], shardlikes: collections.abc.Iterable[conda._private.shards.shards.ShardBase]) collections.abc.Iterator[tuple[NodeId, Node]]#
Yield (NodeId, Node) for all root packages found in shardlikes.
- class RepodataSubset(shardlikes: collections.abc.Iterable[conda._private.shards.shards.ShardBase], spec_to_package_name: collections.abc.Callable[[str], str] = spec_to_package_name, repodata_version: int = 1, depth: int = sys.maxsize)#
Build a subset of repodata by traversing all packages that are dependencies and transitive dependencies of a root set of packages.
- DEFAULT_STRATEGY = 'pipelined'#
- _spec_to_package_name: collections.abc.Callable[[str], str]#
- _repodata_version = 1#
- depth = 9223372036854775807#
- classmethod has_strategy(strategy: str) bool#
Return True if this class provides the named shard traversal strategy.
- _neighbors(node: Node) collections.abc.Iterator[Node]#
Retrieve all unvisited neighbors of a node.
Neighbors in the context are dependencies of a package.
NOTE: This method assumes that the required shards have already been retrieved from the network via batch_retrieve_from_network() before neighbors() is called. It uses visit_package() to access already-loaded shards.
- reachable(root_packages, *, strategy=DEFAULT_STRATEGY) None#
Run named reachability strategy or the default.
Update self.shardlikes with reachable package records. Later, [shardlike.build_repodata() for shardlike in shardlikes] can be used to generate repodata.json-format subsets of each channel.
- reachable_bfs(root_packages)#
Fetch all packages reachable from root_packages' by following dependencies using the "breadth-first search" algorithm.
Update associated self.shardlikes to contain enough data to build a repodata subset.
- _reachable_bfs(root_packages, shard_cache: conda._private.shards.cache.ShardCache)#
Inner reachable_bfs() implementation.
- reachable_pipelined(root_packages)#
Fetch all packages reachable from root_packages' by following dependencies.
Build repodata subset using concurrent threads to follow dependencies, fetch from cache, and fetch from network.
- _reachable_pipelined(root_packages, network_worker: collections.abc.Callable[[queue.SimpleQueue[collections.abc.Sequence[NodeId] | None], queue.SimpleQueue[list[tuple[NodeId, conda._private.shards.typing.ShardDict] | Exception]], RepodataSubset._reachable_pipelined.cache, collections.abc.Sequence[conda._private.shards.shards.ShardBase]], None], cache: RepodataSubset._reachable_pipelined.cache)#
Set up queues and threads for shard traversal with a configurable network_worker. Called by reachable_pipelined()
- _pipelined_traversal(root_packages, cache_in_queue: queue.SimpleQueue[list[NodeId] | None], shard_out_queue: queue.SimpleQueue[list[tuple[NodeId, conda._private.shards.typing.ShardDict]] | Exception], cache_thread: threading.Thread, network_thread: threading.Thread)#
Run reachability algorithm given queues to submit and receive shards.
- _visit_node(parent_node: Node, mentioned_packages: collections.abc.Iterable[str]) collections.abc.Iterable[NodeId]#
Broadcast mentioned packages across channels. yield pending NodeId's.
- _drain_pending(pending: set[NodeId], shardlikes_by_url: dict[str, conda._private.shards.shards.ShardBase]) tuple[list[tuple[NodeId, conda._private.shards.typing.ShardDict]], list[NodeId]]#
Check pending for in-memory shards. Clear pending.
Return a list of shards we have and shards we need to fetch.
- build_repodata_subset(root_packages: collections.abc.Iterable[str], channels: dict[str, conda.models.channel.Channel], algorithm: Literal['bfs', 'pipelined'] = RepodataSubset.DEFAULT_STRATEGY, spec_to_package_name_func: collections.abc.Callable[[str], str] = spec_to_package_name, repodata_version: int = 1, depth: int = sys.maxsize) dict[str, conda._private.shards.shards.ShardBase] | None#
Retrieve all necessary information to build a repodata subset.
This function implements the conda.gateways.shards.BuildRepodataSubset protocol, allowing it to be passed to solvers that support sharded repodata optimization.
- Params:
root_packages: iterable of installed and requested package names channels: Channel objects; dict form preferred. algorithm: desired traversal algorithm ("bfs" or "pipelined") spec_to_package_name_func: callable to convert package specs to names.
Defaults to the standard spec_to_package_name.
repodata_version: repodata format version (1 = classic, 3 = v3). depth: the maximum depth of dependant packages to include in the repodata
subset.
- Returns:
None if there are no shards available, or a mapping of channel URL's to ShardBase objects where build_repodata() returns the computed subset.
- _T#
- class QueueCache(queue)#
Implement insert() interface of .cache.ShardCache() as a queue, instead of giving network thread direct access to the database.
- queue: queue.SimpleQueue#
- insert(shard: conda._private.shards.cache.AnnotatedRawShard)#
- copy()#
- __enter__()#
- __exit__(*_)#
- cache_fetch_thread(in_queue: queue.SimpleQueue[collections.abc.Sequence[NodeId] | None], shard_out_queue: queue.SimpleQueue[collections.abc.Sequence[tuple[NodeId, conda._private.shards.typing.ShardDict] | Exception]], network_out_queue: queue.SimpleQueue[collections.abc.Sequence[NodeId] | None], cache: conda._private.shards.cache.ShardCache)#
Fetch batches of shards from cache until in_queue sees None. Enqueue found shards to shard_out_queue, and not found shards to network_out_queue.
When we see None on in_queue, send None to both out queues and exit.
- Parameters:
in_queue -- NodeId (URLs) to fetch.
shard_out_queue -- fetched shards sent to queue.
network_out_queue -- cache misses forwarded to queue. Same queue is network_fetch_thread's in_queue.
cache -- used to retrieve shards.
- network_fetch_thread(in_queue: queue.SimpleQueue[collections.abc.Sequence[NodeId] | None], shard_out_queue: queue.SimpleQueue[list[tuple[NodeId, conda._private.shards.typing.ShardDict] | Exception]], cache: conda._private.shards.cache.ShardCache | QueueCache, shardlikes: collections.abc.Sequence[conda._private.shards.shards.ShardBase])#
Fetch shards from the network that are received on in_queue, until we see None.
Unhandled exceptions also go to shard_out_queue, and exit this thread.
- Parameters:
in_queue -- NodeId (URLs) to fetch.
shard_out_queue -- fetched shards sent to queue.
cache -- once shards are decoded they are stored in cache.
shardlikes -- list of (network-only) shard index objects.
- offline_nofetch_thread(in_queue: queue.SimpleQueue[collections.abc.Sequence[NodeId] | None], shard_out_queue: queue.SimpleQueue[list[tuple[NodeId, conda._private.shards.typing.ShardDict] | Exception]], cache: conda._private.shards.cache.ShardCache, shardlikes: collections.abc.Sequence[conda._private.shards.shards.ShardBase])#
For offline mode, where network requests are not allowed. Pretend that every network request is an empty shard. Don't save those to the cache.
Depending on how many shards are in sqlite3 and which packages were requested, the user may or may not get enough repodata for a solution.
- Parameters:
in_queue -- NodeId (URLs) to fetch.
shard_out_queue -- fetched shards sent to queue.
cache -- once shards are decoded they are stored in cache.
shardlikes -- list of (network-only) shard index objects.