diff --git a/.gitignore b/.gitignore index a57c57a..745500e 100644 --- a/.gitignore +++ b/.gitignore @@ -149,3 +149,5 @@ cython_debug/ .vscode/ contracts/ + +testproj/ diff --git a/nitric/application.py b/nitric/application.py index 44ca9eb..38e2e6a 100644 --- a/nitric/application.py +++ b/nitric/application.py @@ -28,6 +28,8 @@ class Nitric: """Represents a nitric app.""" + _has_run = False + _workers: List[FunctionServer] = [] _cache: Dict[str, Dict[str, Any]] = { "api": {}, @@ -61,6 +63,16 @@ def _create_resource(cls, resource: Type[BT], name: str, *args: Any, **kwargs: A "The nitric server may not be running or the host/port is inaccessible" ) from cre + @classmethod + def has_run(cls) -> bool: + """ + Check if the Nitric application has been started. + + Returns: + bool: True if the Nitric application has been started, False otherwise. + """ + return cls._has_run + @classmethod def run(cls) -> None: """ @@ -68,6 +80,9 @@ def run(cls) -> None: This will execute in an existing event loop if there is one, otherwise it will attempt to create its own. """ + if cls._has_run: + print("The Nitric application has already been started, Nitric.run() should only be called once.") + cls._has_run = True try: try: loop = asyncio.get_running_loop() diff --git a/nitric/channel.py b/nitric/channel.py new file mode 100644 index 0000000..d2e0002 --- /dev/null +++ b/nitric/channel.py @@ -0,0 +1,52 @@ +import atexit +import re +from urllib.parse import urlparse +from grpclib.client import Channel + +from nitric.application import Nitric +from nitric.config import settings +from nitric.exception import NitricNotRunningException + + +def format_url(url: str): + """Add the default http scheme prefix to urls without one.""" + if not re.match("^((?:http|ftp|https):)?//", url.lower()): + return "http://{0}".format(url) + return url + + +class ChannelManager: + """A singleton class to manage the gRPC channel.""" + + channel = None + + @classmethod + def get_channel(cls) -> Channel: + """Return the channel instance.""" + + if cls.channel is None: + cls._create_channel() + return cls.channel # type: ignore + + @classmethod + def _create_channel(cls): + """Create a new channel instance.""" + + channel_url = urlparse(format_url(settings.SERVICE_ADDRESS)) + cls.channel = Channel(host=channel_url.hostname, port=channel_url.port) + atexit.register(cls._close_channel) + + @classmethod + def _close_channel(cls): + """Close the channel instance.""" + + if cls.channel is not None: + cls.channel.close() + cls.channel = None + + # If the program exits without calling Nitric.run(), it may have been a mistake. + if not Nitric.has_run(): + print( + "WARNING: The Nitric application was not started. " + "If you intended to start the application, call Nitric.run() before exiting." + ) diff --git a/nitric/exception.py b/nitric/exception.py index 62f8b1b..c89e3ea 100644 --- a/nitric/exception.py +++ b/nitric/exception.py @@ -157,6 +157,13 @@ def __init__(self, message: str): super().__init__("Unable to connect to nitric server." + (" " + message if message else "")) +class NitricNotRunningException(Exception): + """The Nitric application wasn't started before the program exited.""" + + def __init__(self): + super().__init__("The Nitric application was not started, call Nitric.run() to start the application.") + + def exception_from_grpc_error(error: GRPCError): """Translate a gRPC error to a nitric api exception.""" return exception_from_grpc_code(error.status.value, error.message or "") diff --git a/nitric/resources/apis.py b/nitric/resources/apis.py index 606ca76..ea40b4b 100644 --- a/nitric/resources/apis.py +++ b/nitric/resources/apis.py @@ -60,7 +60,7 @@ ResourceType, ) from nitric.resources.resource import Resource as BaseResource -from nitric.utils import new_default_channel +from nitric.channel import ChannelManager @dataclass @@ -479,7 +479,7 @@ async def _route_request_iterator(self): async def start(self) -> None: """Register this API route handler and handle http requests.""" - channel = new_default_channel() + channel = ChannelManager.get_channel() server = ApiStub(channel=channel) # Attach security rules for this route diff --git a/nitric/resources/buckets.py b/nitric/resources/buckets.py index 756475d..b2c82fa 100644 --- a/nitric/resources/buckets.py +++ b/nitric/resources/buckets.py @@ -52,7 +52,7 @@ StorageWriteRequest, ) from nitric.resources.resource import SecureResource -from nitric.utils import new_default_channel +from nitric.channel import ChannelManager class BucketNotifyRequest: @@ -142,7 +142,7 @@ class BucketRef(object): def __init__(self, name: str): """Construct a Nitric Storage Client.""" - self._channel: Union[Channel, None] = new_default_channel() + self._channel: Union[Channel, None] = ChannelManager.get_channel() self._storage_stub = StorageStub(channel=self._channel) self.name = name @@ -428,7 +428,7 @@ async def _listener_request_iterator(self): async def start(self) -> None: """Register this bucket listener and listen for events.""" - channel = new_default_channel() + channel = ChannelManager.get_channel() server = StorageListenerStub(channel=channel) try: diff --git a/nitric/resources/kv.py b/nitric/resources/kv.py index 05285e4..7be6cac 100644 --- a/nitric/resources/kv.py +++ b/nitric/resources/kv.py @@ -42,7 +42,8 @@ ResourceType, ) from nitric.resources.resource import SecureResource -from nitric.utils import dict_from_struct, new_default_channel, struct_from_dict +from nitric.utils import dict_from_struct, struct_from_dict +from nitric.channel import ChannelManager class KeyValueStoreRef: @@ -54,7 +55,7 @@ class KeyValueStoreRef: def __init__(self, name: str): """Construct a reference to a deployed key value store.""" - self._channel: Channel = new_default_channel() + self._channel: Channel = ChannelManager.get_channel() self._kv_stub = KvStoreStub(channel=self._channel) self.name = name diff --git a/nitric/resources/queues.py b/nitric/resources/queues.py index 503c97a..e4d078d 100644 --- a/nitric/resources/queues.py +++ b/nitric/resources/queues.py @@ -31,7 +31,8 @@ from nitric.proto.queues.v1 import QueuesStub as QueueServiceStub from nitric.proto.resources.v1 import Action, ResourceDeclareRequest, ResourceIdentifier, ResourceType from nitric.resources.resource import SecureResource -from nitric.utils import dict_from_struct, new_default_channel, struct_from_dict +from nitric.utils import dict_from_struct, struct_from_dict +from nitric.channel import ChannelManager @dataclass(frozen=True, order=True) @@ -91,7 +92,7 @@ class QueueRef(object): def __init__(self, name: str) -> None: """Construct a Nitric Queue Client.""" - self._channel: Channel = new_default_channel() + self._channel: Channel = ChannelManager.get_channel() self._queue_stub = QueueServiceStub(channel=self._channel) self.name = name diff --git a/nitric/resources/resource.py b/nitric/resources/resource.py index e391e48..63d255c 100644 --- a/nitric/resources/resource.py +++ b/nitric/resources/resource.py @@ -34,7 +34,7 @@ ResourcesStub, ResourceType, ) -from nitric.utils import new_default_channel +from nitric.channel import ChannelManager T = TypeVar("T", bound="Resource") @@ -48,7 +48,7 @@ def __init__(self, name: str): """Construct a new resource.""" self.name = name self._reg: Optional[Task[Any]] = None # type: ignore - self._channel = new_default_channel() + self._channel = ChannelManager.get_channel() self._resources_stub = ResourcesStub(channel=self._channel) @abstractmethod @@ -85,9 +85,6 @@ def _perms_to_actions(self, *args: Any) -> List[Action]: pass async def _register_policy_async(self, *args: str) -> None: - # if self._reg is not None: - # await asyncio.wait({self._reg}) - policy = PolicyResource( principals=[ResourceIdentifier(type=ResourceType.Service)], actions=self._perms_to_actions(*args), diff --git a/nitric/resources/schedules.py b/nitric/resources/schedules.py index 80915c0..ff2d2dd 100644 --- a/nitric/resources/schedules.py +++ b/nitric/resources/schedules.py @@ -37,7 +37,7 @@ ScheduleEvery, SchedulesStub, ) -from nitric.utils import new_default_channel +from nitric.channel import ChannelManager class ScheduleServer(FunctionServer): @@ -93,7 +93,7 @@ async def _schedule_request_iterator(self): async def start(self) -> None: """Register this schedule and start listening for requests.""" - channel = new_default_channel() + channel = ChannelManager.get_channel() schedules_stub = SchedulesStub(channel=channel) try: diff --git a/nitric/resources/secrets.py b/nitric/resources/secrets.py index e0d2919..8d3c0eb 100644 --- a/nitric/resources/secrets.py +++ b/nitric/resources/secrets.py @@ -31,7 +31,7 @@ from nitric.proto.secrets.v1 import SecretAccessRequest, SecretManagerStub, SecretPutRequest from nitric.proto.secrets.v1 import SecretVersion as VersionMessage from nitric.resources.resource import SecureResource -from nitric.utils import new_default_channel +from nitric.channel import ChannelManager class SecretRef: @@ -43,7 +43,7 @@ class SecretRef: def __init__(self, name: str) -> None: """Construct a Nitric Storage Client.""" - self._channel: Channel = new_default_channel() + self._channel: Channel = ChannelManager.get_channel() self._secrets_stub = SecretManagerStub(channel=self._channel) self.name = name diff --git a/nitric/resources/sql.py b/nitric/resources/sql.py index 633c03e..3ab7d2d 100644 --- a/nitric/resources/sql.py +++ b/nitric/resources/sql.py @@ -32,7 +32,7 @@ ResourceType, ) from nitric.resources.resource import Resource as BaseResource -from nitric.utils import new_default_channel +from nitric.channel import ChannelManager from nitric.application import Nitric from nitric.proto.sql.v1 import SqlStub, SqlConnectionStringRequest @@ -50,7 +50,7 @@ def __init__(self, name: str, migrations: Union[str, None] = None): """Construct a new SQL Database.""" super().__init__(name) - self._channel: Union[Channel, None] = new_default_channel() + self._channel: Union[Channel, None] = ChannelManager.get_channel() self._sql_stub = SqlStub(channel=self._channel) self.name = name self.migrations = migrations diff --git a/nitric/resources/topics.py b/nitric/resources/topics.py index c430244..bac5133 100644 --- a/nitric/resources/topics.py +++ b/nitric/resources/topics.py @@ -37,7 +37,8 @@ from nitric.proto.topics.v1 import RegistrationRequest, SubscriberStub from nitric.proto.topics.v1 import TopicPublishRequest, TopicsStub from nitric.resources.resource import SecureResource -from nitric.utils import dict_from_struct, new_default_channel, struct_from_dict +from nitric.utils import dict_from_struct, struct_from_dict +from nitric.channel import ChannelManager TopicPermission = Literal["publish"] @@ -51,7 +52,7 @@ class TopicRef: def __init__(self, name: str) -> None: """Construct a reference to a deployed Topic.""" - self._channel: Channel = new_default_channel() + self._channel: Channel = ChannelManager.get_channel() self._topics_stub = TopicsStub(channel=self._channel) self.name = name @@ -161,7 +162,7 @@ async def _message_request_iterator(self): async def start(self) -> None: """Register this subscriber and listen for messages.""" - channel = new_default_channel() + channel = ChannelManager.get_channel() server = SubscriberStub(channel=channel) try: diff --git a/nitric/resources/websockets.py b/nitric/resources/websockets.py index 6f3ea4a..2519950 100644 --- a/nitric/resources/websockets.py +++ b/nitric/resources/websockets.py @@ -51,7 +51,7 @@ WebsocketStub, ) from nitric.resources.resource import Resource as BaseResource -from nitric.utils import new_default_channel +from nitric.channel import ChannelManager class WebsocketRef: @@ -59,7 +59,7 @@ class WebsocketRef: def __init__(self) -> None: """Construct a Nitric Websocket Client.""" - self._channel: Channel = new_default_channel() + self._channel: Channel = ChannelManager.get_channel() self._websocket_stub = WebsocketStub(channel=self._channel) async def send(self, socket: str, connection_id: str, data: bytes): @@ -206,7 +206,7 @@ async def _ws_request_iterator(self): async def start(self) -> None: """Register this websocket handler and listen for messages.""" - channel = new_default_channel() + channel = ChannelManager.get_channel() server = WebsocketHandlerStub(channel=channel) try: diff --git a/nitric/utils.py b/nitric/utils.py index 2656bd9..1f5bb41 100644 --- a/nitric/utils.py +++ b/nitric/utils.py @@ -16,29 +16,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import re from typing import Any, Optional -from urllib.parse import urlparse from betterproto.lib.google.protobuf import Struct from google.protobuf.json_format import MessageToDict from google.protobuf.struct_pb2 import Struct as WorkingStruct -from grpclib.client import Channel - -from nitric.config import settings - - -def format_url(url: str): - """Add the default http scheme prefix to urls without one.""" - if not re.match("^((?:http|ftp|https):)?//", url.lower()): - return "http://{0}".format(url) - return url - - -def new_default_channel(): - """Create new gRPC channel from settings.""" - channel_url = urlparse(format_url(settings.SERVICE_ADDRESS)) - return Channel(host=channel_url.hostname, port=channel_url.port) # These functions convert to/from python dict <-> betterproto.lib.google.protobuf.Struct diff --git a/testproj/functions/hello.py b/testproj/functions/hello.py deleted file mode 100644 index 6eb79a7..0000000 --- a/testproj/functions/hello.py +++ /dev/null @@ -1,30 +0,0 @@ -from nitric.resources import api, kv -from nitric.application import Nitric -from nitric.context import HttpContext - - -users = kv("test").allow("get", "set") - -public = api("test") - -print("this should print") - - -# Run every 5 minutes -@public.get("/test/:thing") -async def process_transactions(ctx: HttpContext): - """Process transactions.""" - ctx.res.body = b"Hello, world!" - thing = ctx.req.params["thing"] - await users.set(thing, {"thing": ctx.req.params["thing"]}) - out = await users.get(thing) - - async_keys = users.keys() - - async for key in async_keys: - print("got key:", key) - - print(out) - - -Nitric.run()