in src/pydolphinscheduler/core/task.py [0:0]
def get_content(self):
"""Get the file content according to the resource plugin."""
if self.ext_attr is None and self.ext is None:
return
_ext_attr = getattr(self, self.ext_attr)
if _ext_attr is not None:
if isinstance(_ext_attr, str) and _ext_attr.endswith(tuple(self.ext)):
res = self.get_plugin()
content = res.read_file(_ext_attr)
setattr(self, self.ext_attr.lstrip(Symbol.UNDERLINE), content)
else:
if self.resource_plugin is not None or (
self.workflow is not None
and self.workflow.resource_plugin is not None
):
index = _ext_attr.rfind(Symbol.POINT)
if index != -1:
raise ValueError(
f"This task does not support files with suffix {_ext_attr[index:]},"
f"only supports {Symbol.COMMA.join(str(suf) for suf in self.ext)}"
)
setattr(self, self.ext_attr.lstrip(Symbol.UNDERLINE), _ext_attr)