Skip to content

pluggable URI resolvers for file_as_data_uri (avoid monkey-patching for custom storage schemes) #3791

@konstkol-amzn

Description

@konstkol-amzn

Context

inspect_ai._util.images.file_as_data_uri resolves URIs in ContentImage / ContentAudio / ContentVideo / ContentDocument to their byte contents by delegating to file_as_data, which routes non-HTTP paths through fsspec.open(...). For most schemes this works well, but s3:// URIs get handled by s3fs, which reads credentials from the process-wide AWS_PROFILE (and the ambient boto3 default chain).

This becomes a problem when different buckets in the same eval require different AWS profiles — e.g. a dataset bucket and a restricted bucket — or when a single process needs to combine s3:// dataset access with other boto3 consumers (model
providers, scorers) that use a different credential scope.

The problem

There's currently no clean way to inject a custom URI resolver for a specific scheme. The only working approach we've found is module-attribute monkey-patching.

Proposed extension point

A tiny scheme-registered resolver hook in inspect_ai._util.images (or a public re-export), along these lines:

# inspect_ai/_util/images.py                                                                                                                                                                                                                      
                                                                                                                                                                                                                                                  
_SCHEME_RESOLVERS: dict[str, Callable[[str], Awaitable[str]]] = {}                                                                                                                                                                                
                                                                                                                                                                                                                                                  
                                                                                                                                                                                                                                                  
def register_uri_resolver(                                                                                                                                                                                                                        
    scheme: str, resolver: Callable[[str], Awaitable[str]]                                                                                                                                                                                        
) -> None:                                                                                    
    """Register an async resolver for the given URI scheme.            
                                                                                              
    When file_as_data_uri encounters a URI starting with ``{scheme}://``,                                                
    it calls the resolver to rewrite it into something file_as_data can                       
    handle — a local path, an https URL, or a data: URI.                   
    """                                                                                       
    _SCHEME_RESOLVERS[scheme] = resolver                                                                                 
                                                                                              
                                                                                              
def unregister_uri_resolver(scheme: str) -> None:                                             
    """Remove a previously-registered resolver. No-op if the scheme is absent."""             
    _SCHEME_RESOLVERS.pop(scheme, None)                                                       
                                                                                              
                                                                                              
async def file_as_data_uri(file: str) -> str:                                                                                                                                                
    if is_data_uri(file):                                                                                                                                                                    
        return file                                                                           
                                                                                              
    for scheme, resolver in _SCHEME_RESOLVERS.items():                                                                   
        if isinstance(file, str) and file.startswith(f"{scheme}://"):                         
            file = await resolver(file)                                                       
            if is_data_uri(file):                                                                                                                                                            
                return file                                                                                              
            break                                                                             
                                                                                                                                                                                             
    bytes_, mime_type = await file_as_data(file)                                                                                                                                             
    return as_data_uri(mime_type, base64.b64encode(bytes_).decode("utf-8"))                                              

Properties:

  • Zero change to the public signature of file_as_data_uri.
  • Default behavior unchanged when no resolver is registered (empty dict → for loop is a no-op → existing code path runs).
  • Async — works naturally with inspect-ai's pre-solver base64 pass, which is already async.
  • Scheme-keyed — not a global hook that has to match every URI.
  • Resolver returns a URI/path; file_as_data handles bytes-loading. Keeps the resolver's responsibility narrow.

Caller side

from inspect_ai._util.images import register_uri_resolver                                                                


async def _resolve_s3(uri: str) -> str:                     
    # scoped boto3.Session, blocking GET offloaded to a worker thread                                                    
    return await anyio.to_thread.run_sync(my_scoped_s3_download, uri)                                                    


register_uri_resolver("s3", _resolve_s3)                    

One line at package-import time; no monkey-patching.

POC validation

We've prototyped this against inspect_ai==0.3.210 on a real multimodal eval:

  • Task: mmmu_pro_fs_pt (3340 samples, multimodal MCQ).
  • Limit: 5 samples, --max-connections=1.
  • Endpoint: SageMaker vLLM deployment.
  • Result: all 5 samples completed, scorer returned 3 Correct / 2 Incorrect, no credential errors, no ProfileNotFound, no file_as_data_uri crashes. Per-sample downloads fired just before inspect-ai's pre-solver base64 pass — exactly the timing the pre-existing https path uses.

The POC patched the installed images.py locally with ~20 lines (the snippet above).

Open questions

Public vs. internal namespace. Should register_uri_resolver live at inspect_ai._util.images (internal) or inspect_ai.util (public)? Our preference is public — this is a user-facing extension point.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions