Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,29 +45,35 @@ def __init__(self, parent: "RobotLanguageServerProtocol") -> None:
self.parent.diagnostics.collect.add(self.collect_unused_keyword_references)
self.parent.diagnostics.collect.add(self.collect_unused_variable_references)

self.parent.diagnostics.on_get_related_documents.add(self._on_get_related_documents)
self.parent.diagnostics.on_get_related_documents.add(
self._on_get_related_documents
)

def _on_initialized(self, sender: Any) -> None:
self.parent.diagnostics.analyze.add(self.analyze)
self.parent.documents_cache.namespace_initialized(self._on_namespace_initialized)
self.parent.documents_cache.namespace_initialized(
self._on_namespace_initialized
)
self.parent.documents_cache.libraries_changed.add(self._on_libraries_changed)
self.parent.documents_cache.variables_changed.add(self._on_variables_changed)

def _on_libraries_changed(self, sender: Any, libraries: List[LibraryDoc]) -> None:
for doc in self.parent.documents.documents:
namespace = self.parent.documents_cache.get_only_initialized_namespace(doc)
if namespace is not None:
lib_docs = (e.library_doc for e in namespace.get_libraries().values())
if any(lib_doc in lib_docs for lib_doc in libraries):
self.parent.diagnostics.force_refresh_document(doc)
docs_to_refresh: set[TextDocument] = set()
for lib_doc in libraries:
docs_to_refresh.update(
self.parent.documents_cache.get_library_users(lib_doc)
)
for doc in docs_to_refresh:
self.parent.diagnostics.force_refresh_document(doc)

def _on_variables_changed(self, sender: Any, variables: List[LibraryDoc]) -> None:
for doc in self.parent.documents.documents:
namespace = self.parent.documents_cache.get_only_initialized_namespace(doc)
if namespace is not None:
lib_docs = (e.library_doc for e in namespace.get_variables_imports().values())
if any(lib_doc in lib_docs for lib_doc in variables):
self.parent.diagnostics.force_refresh_document(doc)
docs_to_refresh: set[TextDocument] = set()
for var_doc in variables:
docs_to_refresh.update(
self.parent.documents_cache.get_variables_users(var_doc)
)
for doc in docs_to_refresh:
self.parent.diagnostics.force_refresh_document(doc)

@language_id("robotframework")
def analyze(self, sender: Any, document: TextDocument) -> None:
Expand All @@ -79,41 +85,35 @@ def _on_namespace_initialized(self, sender: Any, namespace: Namespace) -> None:
self.parent.diagnostics.force_refresh_document(namespace.document)

@language_id("robotframework")
def _on_get_related_documents(self, sender: Any, document: TextDocument) -> Optional[List[TextDocument]]:
def _on_get_related_documents(
self, sender: Any, document: TextDocument
) -> Optional[List[TextDocument]]:
namespace = self.parent.documents_cache.get_only_initialized_namespace(document)
if namespace is None:
return None
source = str(document.uri.to_path())
return self.parent.documents_cache.get_importers(source)

result = []

lib_doc = namespace.get_library_doc()
for doc in self.parent.documents.documents:
if doc.language_id != "robotframework":
continue

doc_namespace = self.parent.documents_cache.get_only_initialized_namespace(doc)
if doc_namespace is None:
continue

if doc_namespace.is_analyzed():
for ref in doc_namespace.get_namespace_references():
if ref.library_doc == lib_doc:
result.append(doc)

return result

def modify_diagnostics(self, document: TextDocument, diagnostics: List[Diagnostic]) -> List[Diagnostic]:
return self.parent.documents_cache.get_diagnostic_modifier(document).modify_diagnostics(diagnostics)
def modify_diagnostics(
self, document: TextDocument, diagnostics: List[Diagnostic]
) -> List[Diagnostic]:
return self.parent.documents_cache.get_diagnostic_modifier(
document
).modify_diagnostics(diagnostics)

@language_id("robotframework")
def collect_namespace_diagnostics(
self, sender: Any, document: TextDocument, diagnostics_type: DiagnosticsCollectType
self,
sender: Any,
document: TextDocument,
diagnostics_type: DiagnosticsCollectType,
) -> DiagnosticsResult:
try:
namespace = self.parent.documents_cache.get_namespace(document)

return DiagnosticsResult(
self.collect_namespace_diagnostics, self.modify_diagnostics(document, namespace.get_diagnostics())
self.collect_namespace_diagnostics,
self.modify_diagnostics(document, namespace.get_diagnostics()),
)
except (CancelledError, SystemExit, KeyboardInterrupt):
raise
Expand Down Expand Up @@ -141,7 +141,10 @@ def collect_namespace_diagnostics(
@language_id("robotframework")
@_logger.call
def collect_unused_keyword_references(
self, sender: Any, document: TextDocument, diagnostics_type: DiagnosticsCollectType
self,
sender: Any,
document: TextDocument,
diagnostics_type: DiagnosticsCollectType,
) -> DiagnosticsResult:
config = self.parent.workspace.get_configuration(AnalysisConfig, document.uri)

Expand All @@ -153,15 +156,19 @@ def collect_unused_keyword_references(

return self._collect_unused_keyword_references(document)

def _collect_unused_keyword_references(self, document: TextDocument) -> DiagnosticsResult:
def _collect_unused_keyword_references(
self, document: TextDocument
) -> DiagnosticsResult:
try:
namespace = self.parent.documents_cache.get_namespace(document)

result: List[Diagnostic] = []
for kw in (namespace.get_library_doc()).keywords.values():
check_current_task_canceled()

references = self.parent.robot_references.find_keyword_references(document, kw, False, True)
references = self.parent.robot_references.find_keyword_references(
document, kw, False, True
)
if not references:
result.append(
Diagnostic(
Expand All @@ -174,7 +181,10 @@ def _collect_unused_keyword_references(self, document: TextDocument) -> Diagnost
)
)

return DiagnosticsResult(self.collect_unused_keyword_references, self.modify_diagnostics(document, result))
return DiagnosticsResult(
self.collect_unused_keyword_references,
self.modify_diagnostics(document, result),
)
except (CancelledError, SystemExit, KeyboardInterrupt):
raise
except BaseException as e:
Expand All @@ -200,19 +210,26 @@ def _collect_unused_keyword_references(self, document: TextDocument) -> Diagnost
@language_id("robotframework")
@_logger.call
def collect_unused_variable_references(
self, sender: Any, document: TextDocument, diagnostics_type: DiagnosticsCollectType
self,
sender: Any,
document: TextDocument,
diagnostics_type: DiagnosticsCollectType,
) -> DiagnosticsResult:
config = self.parent.workspace.get_configuration(AnalysisConfig, document.uri)

if not config.find_unused_references:
return DiagnosticsResult(self.collect_unused_variable_references, [])

if diagnostics_type != DiagnosticsCollectType.SLOW:
return DiagnosticsResult(self.collect_unused_variable_references, None, True)
return DiagnosticsResult(
self.collect_unused_variable_references, None, True
)

return self._collect_unused_variable_references(document)

def _collect_unused_variable_references(self, document: TextDocument) -> DiagnosticsResult:
def _collect_unused_variable_references(
self, document: TextDocument
) -> DiagnosticsResult:
try:
namespace = self.parent.documents_cache.get_namespace(document)

Expand All @@ -222,14 +239,25 @@ def _collect_unused_variable_references(self, document: TextDocument) -> Diagnos
check_current_task_canceled()

if isinstance(
var, (LibraryArgumentDefinition, EnvironmentVariableDefinition, GlobalVariableDefinition)
var,
(
LibraryArgumentDefinition,
EnvironmentVariableDefinition,
GlobalVariableDefinition,
),
):
continue

if var.name_token is not None and var.name_token.value and var.name_token.value.startswith("_"):
if (
var.name_token is not None
and var.name_token.value
and var.name_token.value.startswith("_")
):
continue

references = self.parent.robot_references.find_variable_references(document, var, False, True)
references = self.parent.robot_references.find_variable_references(
document, var, False, True
)
if not references:
result.append(
Diagnostic(
Expand All @@ -243,7 +271,10 @@ def _collect_unused_variable_references(self, document: TextDocument) -> Diagnos
)
)

return DiagnosticsResult(self.collect_unused_variable_references, self.modify_diagnostics(document, result))
return DiagnosticsResult(
self.collect_unused_variable_references,
self.modify_diagnostics(document, result),
)
except (CancelledError, SystemExit, KeyboardInterrupt):
raise
except BaseException as e:
Expand Down
26 changes: 24 additions & 2 deletions packages/robot/src/robotcode/robot/diagnostics/data_cache.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
import pickle
import tempfile
from abc import ABC, abstractmethod
from enum import Enum
from pathlib import Path
Expand All @@ -12,6 +14,8 @@
class CacheSection(Enum):
LIBRARY = "libdoc"
VARIABLES = "variables"
RESOURCE = "resource"
NAMESPACE = "namespace"


class DataCache(ABC):
Expand Down Expand Up @@ -85,5 +89,23 @@ def save_cache_data(self, section: CacheSection, entry_name: str, data: Any) ->
cached_file = self.build_cache_data_filename(section, entry_name)

cached_file.parent.mkdir(parents=True, exist_ok=True)
with cached_file.open("wb") as f:
pickle.dump(data, f)

# Atomic write: write to temp file, then rename
# This ensures readers never see partial/corrupt data
temp_fd, temp_path = tempfile.mkstemp(
dir=cached_file.parent,
prefix=cached_file.stem + "_",
suffix=".tmp",
)
try:
with os.fdopen(temp_fd, "wb") as f:
pickle.dump(data, f)
# Atomic rename (POSIX guarantees atomicity; Windows may fail if target exists)
Path(temp_path).replace(cached_file)
except Exception:
# Clean up temp file on failure (temp file may be left behind on SystemExit/KeyboardInterrupt)
try:
os.unlink(temp_path)
except OSError:
pass
raise
Loading