Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 274 additions & 12 deletions src/Runner.Worker/ActionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ public sealed class ActionManager : RunnerService, IActionManager
PreStepTracker = new Dictionary<Guid, IActionRunner>()
};
var containerSetupSteps = new List<JobExtensionRunner>();
// Stack-local cache: same action (owner/repo@ref) is resolved only once,
// even if it appears at multiple depths in a composite tree.
var resolvedDownloadInfos = new Dictionary<string, WebApi.ActionDownloadInfo>(StringComparer.OrdinalIgnoreCase);
var depth = 0;
// We are running at the start of a job
if (rootStepId == default(Guid))
Expand All @@ -102,10 +105,19 @@ public sealed class ActionManager : RunnerService, IActionManager
}
IEnumerable<Pipelines.ActionStep> actions = steps.OfType<Pipelines.ActionStep>();
executionContext.Output("Prepare all required actions");

// Eagerly discover, resolve, and download all reachable actions
// via BFS before the recursive walk. This collapses all network
// I/O into a tight loop so the recursion is purely local.
if (rootStepId == default(Guid))
{
await EagerlyResolveAllReachableActionsAsync(executionContext, actions.ToList(), resolvedDownloadInfos);
}

PrepareActionsState result = new PrepareActionsState();
try
{
result = await PrepareActionsRecursiveAsync(executionContext, state, actions, depth, rootStepId);
result = await PrepareActionsRecursiveAsync(executionContext, state, actions, resolvedDownloadInfos, depth, rootStepId);
}
catch (FailedToResolveActionDownloadInfoException ex)
{
Expand Down Expand Up @@ -161,13 +173,14 @@ public sealed class ActionManager : RunnerService, IActionManager
return new PrepareResult(containerSetupSteps, result.PreStepTracker);
}

private async Task<PrepareActionsState> PrepareActionsRecursiveAsync(IExecutionContext executionContext, PrepareActionsState state, IEnumerable<Pipelines.ActionStep> actions, Int32 depth = 0, Guid parentStepId = default(Guid))
private async Task<PrepareActionsState> PrepareActionsRecursiveAsync(IExecutionContext executionContext, PrepareActionsState state, IEnumerable<Pipelines.ActionStep> actions, Dictionary<string, WebApi.ActionDownloadInfo> resolvedDownloadInfos, Int32 depth = 0, Guid parentStepId = default(Guid))
{
ArgUtil.NotNull(executionContext, nameof(executionContext));
if (depth > Constants.CompositeActionsMaxDepth)
{
throw new Exception($"Composite action depth exceeded max depth {Constants.CompositeActionsMaxDepth}");
}

var repositoryActions = new List<Pipelines.ActionStep>();

foreach (var action in actions)
Expand Down Expand Up @@ -195,27 +208,32 @@ public sealed class ActionManager : RunnerService, IActionManager

if (repositoryActions.Count > 0)
{
// Get the download info
var downloadInfos = await GetDownloadInfoAsync(executionContext, repositoryActions);
// Resolve download info, skipping any actions already cached.
await ResolveNewActionsAsync(executionContext, repositoryActions, resolvedDownloadInfos);

// Download each action
// Download each action (in parallel when multiple unique actions exist).
var downloadKeys = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var downloadTasks = new List<Task>();
foreach (var action in repositoryActions)
{
var lookupKey = GetDownloadInfoLookupKey(action);
if (string.IsNullOrEmpty(lookupKey))
if (string.IsNullOrEmpty(lookupKey) || !downloadKeys.Add(lookupKey))
{
continue;
}

if (!downloadInfos.TryGetValue(lookupKey, out var downloadInfo))
if (!resolvedDownloadInfos.TryGetValue(lookupKey, out var downloadInfo))
{
throw new Exception($"Missing download info for {lookupKey}");
}

await DownloadRepositoryActionAsync(executionContext, downloadInfo);
downloadTasks.Add(DownloadRepositoryActionAsync(executionContext, downloadInfo));
}
await Task.WhenAll(downloadTasks);

// Parse action.yml and collect composite sub-actions for batched
// resolution below. Pre/post step registration is deferred until
// after recursion so that HasPre/HasPost reflect the full subtree.
var nextLevel = new List<(Pipelines.ActionStep action, Guid parentId)>();

// More preparation based on content in the repository (action.yml)
foreach (var action in repositoryActions)
{
var setupInfo = PrepareRepositoryActionAsync(executionContext, action);
Expand Down Expand Up @@ -247,8 +265,50 @@ public sealed class ActionManager : RunnerService, IActionManager
}
else if (setupInfo != null && setupInfo.Steps != null && setupInfo.Steps.Count > 0)
{
state = await PrepareActionsRecursiveAsync(executionContext, state, setupInfo.Steps, depth + 1, action.Id);
foreach (var step in setupInfo.Steps)
{
nextLevel.Add((step, action.Id));
}
}
}

// Resolve all next-level sub-actions in one batch API call,
// then recurse per parent (which hits the cache, not the API).
if (nextLevel.Count > 0)
{
var nextLevelRepoActions = nextLevel
.Where(x => x.action.Reference.Type == Pipelines.ActionSourceType.Repository)
.Select(x => x.action)
.ToList();
await ResolveNewActionsAsync(executionContext, nextLevelRepoActions, resolvedDownloadInfos);

// Pre-download next-level actions in parallel so that the
// recursive calls below find watermarks on disk and skip
// redundant downloads.
var preDownloadKeys = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var preDownloadTasks = new List<Task>();
foreach (var action in nextLevelRepoActions)
{
var lookupKey = GetDownloadInfoLookupKey(action);
if (!string.IsNullOrEmpty(lookupKey) && preDownloadKeys.Add(lookupKey) && resolvedDownloadInfos.TryGetValue(lookupKey, out var downloadInfo))
{
preDownloadTasks.Add(DownloadRepositoryActionAsync(executionContext, downloadInfo));
}
}
await Task.WhenAll(preDownloadTasks);

foreach (var group in nextLevel.GroupBy(x => x.parentId))
{
var groupActions = group.Select(x => x.action).ToList();
state = await PrepareActionsRecursiveAsync(executionContext, state, groupActions, resolvedDownloadInfos, depth + 1, group.Key);
}
}

// Register pre/post steps after recursion so that HasPre/HasPost
// are correct (they depend on _cachedEmbeddedPreSteps/PostSteps
// being populated by the recursive calls above).
foreach (var action in repositoryActions)
{
var repoAction = action.Reference as Pipelines.RepositoryPathReference;
if (repoAction.RepositoryType != Pipelines.PipelineConstants.SelfAlias)
{
Expand Down Expand Up @@ -754,6 +814,208 @@ private async Task BuildActionContainerAsync(IExecutionContext executionContext,
return actionDownloadInfos.Actions;
}

/// <summary>
/// Only resolves actions not already in resolvedDownloadInfos.
/// Results are cached for reuse at deeper recursion levels.
/// </summary>
private async Task ResolveNewActionsAsync(IExecutionContext executionContext, List<Pipelines.ActionStep> actions, Dictionary<string, WebApi.ActionDownloadInfo> resolvedDownloadInfos)
{
var actionsToResolve = new List<Pipelines.ActionStep>();
var pendingKeys = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
foreach (var action in actions)
{
var lookupKey = GetDownloadInfoLookupKey(action);
if (!string.IsNullOrEmpty(lookupKey) && !resolvedDownloadInfos.ContainsKey(lookupKey) && pendingKeys.Add(lookupKey))
{
actionsToResolve.Add(action);
}
}

if (actionsToResolve.Count > 0)
{
var downloadInfos = await GetDownloadInfoAsync(executionContext, actionsToResolve);
foreach (var kvp in downloadInfos)
{
resolvedDownloadInfos[kvp.Key] = kvp.Value;
}
}
}

/// <summary>
/// BFS discovery loop: resolve and download all reachable actions upfront
/// so the recursive walk makes zero network calls.
/// </summary>
private async Task EagerlyResolveAllReachableActionsAsync(
IExecutionContext executionContext,
List<Pipelines.ActionStep> initialActions,
Dictionary<string, WebApi.ActionDownloadInfo> resolvedDownloadInfos)
{
// downloadedKeys tracks repo downloads (owner/repo@ref — no path).
// scannedKeys tracks which sub-path action.ymls have been scanned
// (owner/repo/path@ref) so we don't re-scan the same composite.
var downloadedKeys = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var scannedKeys = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var pending = new List<Pipelines.ActionStep>(initialActions);

while (pending.Count > 0)
{
// Collect repo actions we haven't scanned yet
var repoActions = new List<Pipelines.ActionStep>();
foreach (var action in pending)
{
if (action.Reference.Type != Pipelines.ActionSourceType.Repository)
{
continue;
}
var scanKey = GetScanKey(action);
if (!string.IsNullOrEmpty(scanKey) && scannedKeys.Add(scanKey))
{
repoActions.Add(action);
}
}
pending.Clear();

if (repoActions.Count == 0)
{
break;
}

// Resolve only repos not yet in the cache
var toResolve = repoActions
.Where(a =>
{
var key = GetDownloadInfoLookupKey(a);
return !string.IsNullOrEmpty(key) && !downloadedKeys.Contains(key);
})
.ToList();

if (toResolve.Count > 0)
{
await ResolveNewActionsAsync(executionContext, toResolve, resolvedDownloadInfos);

// Download in parallel
var downloadTasks = new List<Task>();
foreach (var action in toResolve)
{
var key = GetDownloadInfoLookupKey(action);
if (downloadedKeys.Add(key) && resolvedDownloadInfos.TryGetValue(key, out var info))
{
downloadTasks.Add(DownloadRepositoryActionAsync(executionContext, info));
}
}
await Task.WhenAll(downloadTasks);
}

// Scan ALL actions for composite sub-references (including
// sub-paths of already-downloaded repos).
foreach (var action in repoActions)
{
var subRefs = ScanForRepoReferences(executionContext, action);
if (subRefs != null)
{
pending.AddRange(subRefs);
}
}
}
}

/// <summary>
/// Returns a scan key that includes the sub-path, so that different
/// sub-paths within the same repo are scanned independently.
/// Format: "owner/repo/path@ref" or "owner/repo@ref" when no path.
/// </summary>
private static string GetScanKey(Pipelines.ActionStep action)
{
if (action.Reference.Type != Pipelines.ActionSourceType.Repository)
{
return null;
}

var repoRef = action.Reference as Pipelines.RepositoryPathReference;
if (repoRef == null ||
string.Equals(repoRef.RepositoryType, Pipelines.PipelineConstants.SelfAlias, StringComparison.OrdinalIgnoreCase))
{
return null;
}

if (!string.IsNullOrEmpty(repoRef.Path))
{
return $"{repoRef.Name}/{repoRef.Path}@{repoRef.Ref}";
}
return $"{repoRef.Name}@{repoRef.Ref}";
}

/// <summary>
/// Lightweight scan: load an action's manifest and return any repository
/// references from composite steps, without side effects (no GUID
/// assignment, no step caching).
/// </summary>
private List<Pipelines.ActionStep> ScanForRepoReferences(
IExecutionContext executionContext,
Pipelines.ActionStep action)
{
var repoRef = action.Reference as Pipelines.RepositoryPathReference;
if (repoRef == null ||
string.Equals(repoRef.RepositoryType, Pipelines.PipelineConstants.SelfAlias, StringComparison.OrdinalIgnoreCase))
{
return null;
}

string destDirectory = Path.Combine(
HostContext.GetDirectory(WellKnownDirectory.Actions),
repoRef.Name.Replace(Path.AltDirectorySeparatorChar, Path.DirectorySeparatorChar),
repoRef.Ref);
string actionEntryDirectory = destDirectory;
if (!string.IsNullOrEmpty(repoRef.Path))
{
actionEntryDirectory = Path.Combine(destDirectory, repoRef.Path);
}

var actionManifest = Path.Combine(actionEntryDirectory, Constants.Path.ActionManifestYmlFile);
var actionManifestYaml = Path.Combine(actionEntryDirectory, Constants.Path.ActionManifestYamlFile);

ActionDefinitionData actionDef = null;
try
{
var manifestManager = HostContext.GetService<IActionManifestManagerWrapper>();
if (File.Exists(actionManifest))
{
actionDef = manifestManager.Load(executionContext, actionManifest);
}
else if (File.Exists(actionManifestYaml))
{
actionDef = manifestManager.Load(executionContext, actionManifestYaml);
}
}
catch (Exception ex)
{
Trace.Warning($"Failed to scan action manifest for {repoRef.Name}: {ex.Message}");
return null;
}

if (actionDef?.Execution?.ExecutionType != ActionExecutionType.Composite)
{
return null;
}

var compositeAction = actionDef.Execution as CompositeActionExecutionData;
if (compositeAction?.Steps == null)
{
return null;
}

var result = new List<Pipelines.ActionStep>();
foreach (var step in compositeAction.Steps)
{
if (step is Pipelines.ActionStep actionStep &&
actionStep.Reference?.Type == Pipelines.ActionSourceType.Repository)
{
result.Add(actionStep);
}
}
return result.Count > 0 ? result : null;
}

private async Task DownloadRepositoryActionAsync(IExecutionContext executionContext, WebApi.ActionDownloadInfo downloadInfo)
{
Trace.Entering();
Expand Down
Loading