diff --git a/samples/set_refresh_schedule.py b/samples/set_refresh_schedule.py
index 50fccaf2b..decdc223f 100644
--- a/samples/set_refresh_schedule.py
+++ b/samples/set_refresh_schedule.py
@@ -34,6 +34,7 @@ def usage(args):
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--workbook", "-w")
group.add_argument("--datasource", "-d")
+ group.add_argument("--flow", "-f")
parser.add_argument("schedule")
return parser.parse_args(args)
@@ -61,6 +62,13 @@ def get_datasource_by_name(server, name):
return datasources.pop()
+def get_flow_by_name(server, name):
+ request_filter = make_filter(Name=name)
+ flows, _ = server.flows.get(request_filter)
+ assert len(flows) == 1
+ return flows.pop()
+
+
def get_schedule_by_name(server, name):
schedules = [x for x in TSC.Pager(server.schedules) if x.name == name]
assert len(schedules) == 1
@@ -82,8 +90,13 @@ def run(args):
with server.auth.sign_in(tableau_auth):
if args.workbook:
item = get_workbook_by_name(server, args.workbook)
- else:
+ elif args.datasource:
item = get_datasource_by_name(server, args.datasource)
+ elif args.flow:
+ item = get_flow_by_name(server, args.flow)
+ else:
+ print("A scheduleable item must be included")
+ return
schedule = get_schedule_by_name(server, args.schedule)
assign_to_schedule(server, item, schedule)
diff --git a/tableauserverclient/models/task_item.py b/tableauserverclient/models/task_item.py
index 1a7940368..32299a853 100644
--- a/tableauserverclient/models/task_item.py
+++ b/tableauserverclient/models/task_item.py
@@ -9,6 +9,7 @@ class TaskItem(object):
class Type:
ExtractRefresh = "extractRefresh"
DataAcceleration = "dataAcceleration"
+ RunFlow = "runFlow"
# This mapping is used to convert task type returned from server
_TASK_TYPE_MAPPING = {
diff --git a/tableauserverclient/server/endpoint/datasources_endpoint.py b/tableauserverclient/server/endpoint/datasources_endpoint.py
index d597e905c..1f3ecd7b8 100644
--- a/tableauserverclient/server/endpoint/datasources_endpoint.py
+++ b/tableauserverclient/server/endpoint/datasources_endpoint.py
@@ -460,5 +460,10 @@ def delete_revision(self, datasource_id: str, revision_number: str) -> None:
self.delete_request(url)
logger.info(
- "Deleted single datasource revsision (ID: {0}) (Revision: {1})".format(datasource_id, revision_number)
+ "Deleted single datasource revision (ID: {0}) (Revision: {1})".format(datasource_id, revision_number)
)
+
+ # a convenience method
+ @api(version="2.8")
+ def schedule_extract_refresh(self, schedule_id: int, item: DatasourceItem) -> None: # actually should return a task
+ return self.parent_srv.schedules.add_to_schedule(schedule_id, datasource=item)
diff --git a/tableauserverclient/server/endpoint/flows_endpoint.py b/tableauserverclient/server/endpoint/flows_endpoint.py
index 98174954b..04685f9e7 100644
--- a/tableauserverclient/server/endpoint/flows_endpoint.py
+++ b/tableauserverclient/server/endpoint/flows_endpoint.py
@@ -237,3 +237,8 @@ def add_dqw(self, item: FlowItem, warning: "DQWItem") -> None:
@api(version="3.5")
def delete_dqw(self, item: FlowItem) -> None:
self._data_quality_warnings.clear(item)
+
+ # a convenience method
+ @api(version="3.3")
+ def schedule_flow_run(self, schedule_id: int, item: FlowItem) -> None: # actually should return a task
+ return self.parent_srv.schedules.add_to_schedule(schedule_id, flow=item)
diff --git a/tableauserverclient/server/endpoint/schedules_endpoint.py b/tableauserverclient/server/endpoint/schedules_endpoint.py
index f2bd4d489..21c828989 100644
--- a/tableauserverclient/server/endpoint/schedules_endpoint.py
+++ b/tableauserverclient/server/endpoint/schedules_endpoint.py
@@ -1,20 +1,20 @@
import copy
import logging
+import warnings
from collections import namedtuple
from typing import TYPE_CHECKING, Callable, List, Optional, Tuple, Union
-from .endpoint import Endpoint, api
+from .endpoint import Endpoint, api, parameter_added_in
from .exceptions import MissingRequiredFieldError
from .. import RequestFactory, PaginationItem, ScheduleItem, TaskItem
logger = logging.getLogger("tableau.endpoint.schedules")
-# Oh to have a first class Result concept in Python...
AddResponse = namedtuple("AddResponse", ("result", "error", "warnings", "task_created"))
OK = AddResponse(result=True, error=None, warnings=None, task_created=None)
if TYPE_CHECKING:
from ..request_options import RequestOptions
- from ...models import DatasourceItem, WorkbookItem
+ from ...models import DatasourceItem, WorkbookItem, FlowItem
class Schedules(Endpoint):
@@ -81,54 +81,71 @@ def create(self, schedule_item: ScheduleItem) -> ScheduleItem:
return new_schedule
@api(version="2.8")
+ @parameter_added_in(flow="3.3")
def add_to_schedule(
self,
schedule_id: str,
workbook: "WorkbookItem" = None,
datasource: "DatasourceItem" = None,
- task_type: str = TaskItem.Type.ExtractRefresh,
+ flow: "FlowItem" = None,
+ task_type: str = None,
) -> List[AddResponse]:
- def add_to(
- resource: Union["DatasourceItem", "WorkbookItem"],
- type_: str,
- req_factory: Callable[
- [
- str,
- str,
- ],
- bytes,
- ],
- ) -> AddResponse:
- id_ = resource.id
- url = "{0}/{1}/{2}s".format(self.siteurl, schedule_id, type_)
- add_req = req_factory(id_, task_type=task_type) # type: ignore[call-arg, arg-type]
- response = self.put_request(url, add_req)
-
- error, warnings, task_created = ScheduleItem.parse_add_to_schedule_response(
- response, self.parent_srv.namespace
- )
- if task_created:
- logger.info("Added {} to {} to schedule {}".format(type_, id_, schedule_id))
- if error is not None or warnings is not None:
- return AddResponse(
- result=False,
- error=error,
- warnings=warnings,
- task_created=task_created,
- )
- else:
- return OK
-
- items = []
+ # There doesn't seem to be a good reason to allow one item of each type?
+ if workbook and datasource:
+ warnings.warn("Passing in multiple items for add_to_schedule will be deprecated", PendingDeprecationWarning)
+ items: List[
+ Tuple[str, Union[WorkbookItem, FlowItem, DatasourceItem], str, Callable[[Optional[str], str], bytes], str]
+ ] = []
if workbook is not None:
- items.append((workbook, "workbook", RequestFactory.Schedule.add_workbook_req))
+ if not task_type:
+ task_type = TaskItem.Type.ExtractRefresh
+ items.append((schedule_id, workbook, "workbook", RequestFactory.Schedule.add_workbook_req, task_type))
if datasource is not None:
+ if not task_type:
+ task_type = TaskItem.Type.ExtractRefresh
+ items.append((schedule_id, datasource, "datasource", RequestFactory.Schedule.add_datasource_req, task_type))
+ if flow is not None and not (workbook or datasource): # Cannot pass a flow with any other type
+ if not task_type:
+ task_type = TaskItem.Type.RunFlow
items.append(
- (datasource, "datasource", RequestFactory.Schedule.add_datasource_req) # type:ignore[arg-type]
- )
+ (schedule_id, flow, "flow", RequestFactory.Schedule.add_flow_req, task_type)
+ ) # type:ignore[arg-type]
- results = (add_to(*x) for x in items)
+ results = (self._add_to(*x) for x in items)
# list() is needed for python 3.x compatibility
return list(filter(lambda x: not x.result, results)) # type:ignore[arg-type]
+
+ def _add_to(
+ self,
+ schedule_id,
+ resource: Union["DatasourceItem", "WorkbookItem", "FlowItem"],
+ type_: str,
+ req_factory: Callable[
+ [
+ str,
+ str,
+ ],
+ bytes,
+ ],
+ item_task_type,
+ ) -> AddResponse:
+ id_ = resource.id
+ url = "{0}/{1}/{2}s".format(self.siteurl, schedule_id, type_)
+ add_req = req_factory(id_, task_type=item_task_type) # type: ignore[call-arg, arg-type]
+ response = self.put_request(url, add_req)
+
+ error, warnings, task_created = ScheduleItem.parse_add_to_schedule_response(response, self.parent_srv.namespace)
+ if task_created:
+ logger.info("Added {} to {} to schedule {}".format(type_, id_, schedule_id))
+
+ if error is not None or warnings is not None:
+ return AddResponse(
+ result=False,
+ error=error,
+ warnings=warnings,
+ task_created=task_created,
+ )
+ else:
+ return OK
diff --git a/tableauserverclient/server/endpoint/workbooks_endpoint.py b/tableauserverclient/server/endpoint/workbooks_endpoint.py
index af395918a..811832708 100644
--- a/tableauserverclient/server/endpoint/workbooks_endpoint.py
+++ b/tableauserverclient/server/endpoint/workbooks_endpoint.py
@@ -518,4 +518,9 @@ def delete_revision(self, workbook_id: str, revision_number: str) -> None:
url = "/".join([self.baseurl, workbook_id, "revisions", revision_number])
self.delete_request(url)
- logger.info("Deleted single workbook revsision (ID: {0}) (Revision: {1})".format(workbook_id, revision_number))
+ logger.info("Deleted single workbook revision (ID: {0}) (Revision: {1})".format(workbook_id, revision_number))
+
+ # a convenience method
+ @api(version="2.8")
+ def schedule_extract_refresh(self, schedule_id: int, item: WorkbookItem) -> None: # actually should return a task
+ return self.parent_srv.schedules.add_to_schedule(schedule_id, workbook=item)
diff --git a/tableauserverclient/server/request_factory.py b/tableauserverclient/server/request_factory.py
index 4b75490e7..7e4038979 100644
--- a/tableauserverclient/server/request_factory.py
+++ b/tableauserverclient/server/request_factory.py
@@ -557,6 +557,9 @@ def add_workbook_req(self, id_: Optional[str], task_type: str = TaskItem.Type.Ex
def add_datasource_req(self, id_: Optional[str], task_type: str = TaskItem.Type.ExtractRefresh) -> bytes:
return self._add_to_req(id_, "datasource", task_type)
+ def add_flow_req(self, id_: Optional[str], task_type: str = TaskItem.Type.RunFlow) -> bytes:
+ return self._add_to_req(id_, "flow", task_type)
+
class SiteRequest(object):
def update_req(self, site_item: "SiteItem"):
diff --git a/test/assets/flow_get_by_id.xml b/test/assets/flow_get_by_id.xml
new file mode 100644
index 000000000..d1c626105
--- /dev/null
+++ b/test/assets/flow_get_by_id.xml
@@ -0,0 +1,10 @@
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/test/assets/schedule_add_flow.xml b/test/assets/schedule_add_flow.xml
new file mode 100644
index 000000000..9934c38e5
--- /dev/null
+++ b/test/assets/schedule_add_flow.xml
@@ -0,0 +1,9 @@
+
+
+
+
+
+
+
+
+
diff --git a/test/test_schedule.py b/test/test_schedule.py
index efdda5814..807467918 100644
--- a/test/test_schedule.py
+++ b/test/test_schedule.py
@@ -20,9 +20,11 @@
ADD_WORKBOOK_TO_SCHEDULE = os.path.join(TEST_ASSET_DIR, "schedule_add_workbook.xml")
ADD_WORKBOOK_TO_SCHEDULE_WITH_WARNINGS = os.path.join(TEST_ASSET_DIR, "schedule_add_workbook_with_warnings.xml")
ADD_DATASOURCE_TO_SCHEDULE = os.path.join(TEST_ASSET_DIR, "schedule_add_datasource.xml")
+ADD_FLOW_TO_SCHEDULE = os.path.join(TEST_ASSET_DIR, "schedule_add_flow.xml")
WORKBOOK_GET_BY_ID_XML = os.path.join(TEST_ASSET_DIR, "workbook_get_by_id.xml")
DATASOURCE_GET_BY_ID_XML = os.path.join(TEST_ASSET_DIR, "datasource_get_by_id.xml")
+FLOW_GET_BY_ID_XML = os.path.join(TEST_ASSET_DIR, "flow_get_by_id.xml")
class ScheduleTests(unittest.TestCase):
@@ -314,3 +316,18 @@ def test_add_datasource(self) -> None:
datasource = self.server.datasources.get_by_id("bar")
result = self.server.schedules.add_to_schedule("foo", datasource=datasource)
self.assertEqual(0, len(result), "Added properly")
+
+ def test_add_flow(self) -> None:
+ self.server.version = "3.3"
+ baseurl = "{}/sites/{}/schedules".format(self.server.baseurl, self.server.site_id)
+
+ with open(FLOW_GET_BY_ID_XML, "rb") as f:
+ flow_response = f.read().decode("utf-8")
+ with open(ADD_FLOW_TO_SCHEDULE, "rb") as f:
+ add_flow_response = f.read().decode("utf-8")
+ with requests_mock.mock() as m:
+ m.get(self.server.flows.baseurl + "/bar", text=flow_response)
+ m.put(baseurl + "/foo/flows", text=flow_response)
+ flow = self.server.flows.get_by_id("bar")
+ result = self.server.schedules.add_to_schedule("foo", flow=flow)
+ self.assertEqual(0, len(result), "Added properly")