[PATCH v8 10/28] gunyah: rsc_mgr: Add resource manager RPC core
Elliot Berman
quic_eberman at quicinc.com
Fri Jan 20 15:18:16 PST 2023
On 1/17/2023 2:33 PM, Alex Elder wrote:
> On 12/19/22 4:58 PM, Elliot Berman wrote:
>> The resource manager is a special virtual machine which is always
>> running on a Gunyah system. It provides APIs for creating and destroying
>> VMs, secure memory management, sharing/lending of memory between VMs,
>> and setup of inter-VM communication. Calls to the resource manager are
>> made via message queues.
>
> I mentioned in an earlier message that I did a really thorough
> review of this. Actually I haven't reviewed the interrupt part
> yet, but I'll get to that. I want to get my comments out now.
>
> I found one bug, but I'd guess it's not likely to occur (though
> user space can trigger it so it's serious). You'll see it below.
>
>> This patch implements the basic probing and RPC mechanism to make those
>> API calls. Request/response calls can be made with gh_rm_call.
>> Drivers can also register to notifications pushed by RM via
>> gh_rm_register_notifier
>
> Is there a human-readable document that explains the RPC
> protocol and formats? (It would provide a way to compare
> the implementation with the intended design.)
>
> Here's what I see:
>
> It appears that each sent message is delivered exactly once,
> and messages arrive in order and are assumed to be intact.
>
> A Gunyah message holds a 240 byte maximum payload, while a
> resource manager RPC message (carried in a Gunyah message)
> has a 232 byte maximum payload.
>
> An RPC "connection" is either a notification message (only
> *from* the resource manager) or a request/response message
> pair sharing the same sequence ID (with the request only
> going *to* the resource manager). An RPC "connection"
> consists of a single RPC message (of any of the three types)
> followed by 0-62 "fragment" messages, where the payload for
> each message is concatenated by the receiver into a single
> buffer (up to 63 * 232 = 14616 bytes in size).
>
> An RPC header contains a sequence ID that's unique for
> each active "connection" to the resource manager (and
> ignored for notifications). It also contains a message
> ID supplied by the user of the resource manager interface
> (e.g., a VM); the message ID in a response must match the
> message ID in its matching request.
>
This is the correct understanding. There is a document, I will work with
the right folks to see how we make it available.
>> Specific API calls that resource manager supports will be implemented in
>> subsequent patches.
>>
>> Signed-off-by: Elliot Berman <quic_eberman at quicinc.com>
>> ---
>> MAINTAINERS | 2 +-
>> drivers/virt/gunyah/Kconfig | 14 +
>> drivers/virt/gunyah/Makefile | 3 +
>> drivers/virt/gunyah/rsc_mgr.c | 584 +++++++++++++++++++++++++++++++++
>> drivers/virt/gunyah/rsc_mgr.h | 36 ++
>> include/linux/gunyah_rsc_mgr.h | 18 +
>> 6 files changed, 656 insertions(+), 1 deletion(-)
>> create mode 100644 drivers/virt/gunyah/rsc_mgr.c
>> create mode 100644 drivers/virt/gunyah/rsc_mgr.h
>> create mode 100644 include/linux/gunyah_rsc_mgr.h
>>
>> diff --git a/MAINTAINERS b/MAINTAINERS
>> index 667480bfd387..bb315385e155 100644
>> --- a/MAINTAINERS
>> +++ b/MAINTAINERS
>> @@ -8944,7 +8944,7 @@ F: Documentation/virt/gunyah/
>> F: arch/arm64/gunyah/
>> F: drivers/mailbox/gunyah-msgq.c
>> F: drivers/virt/gunyah/
>> -F: include/linux/gunyah.h
>> +F: include/linux/gunyah*.h
>> HABANALABS PCI DRIVER
>> M: Oded Gabbay <ogabbay at kernel.org>
>> diff --git a/drivers/virt/gunyah/Kconfig b/drivers/virt/gunyah/Kconfig
>> index 127156a678a6..4a6189dedab2 100644
>> --- a/drivers/virt/gunyah/Kconfig
>> +++ b/drivers/virt/gunyah/Kconfig
>> @@ -10,3 +10,17 @@ config GUNYAH
>> Say Y/M here to enable the drivers needed to interact in a Gunyah
>> virtual environment.
>> +
>> +if GUNYAH
>> +config GUNYAH_RESOURCE_MANAGER
>
> Is Gunyah useful without the resource manager?
>
> There might be good reasons to have different kernel config
> options for parts of Gunyah (like, perhaps you want to allow
> a *different* resource manager to be used). But if, overall,
> a component is fundamentally required for Gunyah to operate,
> I don't think it's helpful to configure it separately.
>
> This comment applies to all your config options. I won't
> mention it again, but maybe you can consider it, and for
> anything that's pretty fundamental, don't make it an option.
>
I've removed GUNYAH_MESSAGE_QUEUES, GUNYAH_RESOURCE_MANAGER, and
GUNYAH_VM_MANAGER.
These config options remain:
- GUNYAH (Patch 5)
- GUNYAH_PLATFORM_HOOKS (Patch 17)
- SAMPLE_GUNYAH (Patch 16)
From the RFC patches, I'm planning on keeping all the options
introduced there since they are not fundamental to launching a VM.
- GUNYAH_VCPU (Patch 25)
- GUNYAH_IRQFD (Patch 27)
- GUNYAH_IOEVENTFD (Patch 28)
>> + tristate "Gunyah Resource Manager"
>> + depends on MAILBOX
>> + select GUNYAH_MESSAGE_QUEUES
>> + help
>> + The resource manager (RM) is a privileged application VM
>> supporting
>> + the Gunyah Hypervisor. Enable this driver to support communicating
>> + with Gunyah RM. This is typically required for a VM running under
>> + Gunyah wanting to have Gunyah-awareness.
>> +
>> + Say Y/M here if unsure.
>> +endif
>> diff --git a/drivers/virt/gunyah/Makefile b/drivers/virt/gunyah/Makefile
>> index 2ac4ee64b89d..e7cf59b9e64e 100644
>> --- a/drivers/virt/gunyah/Makefile
>> +++ b/drivers/virt/gunyah/Makefile
>> @@ -1 +1,4 @@
>> obj-$(CONFIG_GUNYAH) += gunyah.o
>> +
>> +gunyah_rsc_mgr-y += rsc_mgr.o
>> +obj-$(CONFIG_GUNYAH_RESOURCE_MANAGER) += gunyah_rsc_mgr.o
>> diff --git a/drivers/virt/gunyah/rsc_mgr.c
>> b/drivers/virt/gunyah/rsc_mgr.c
>> new file mode 100644
>> index 000000000000..6b281576f5af
>> --- /dev/null
>> +++ b/drivers/virt/gunyah/rsc_mgr.c
>> @@ -0,0 +1,584 @@
>> +// SPDX-License-Identifier: GPL-2.0-only
>> +/*
>> + * Copyright (c) 2022 Qualcomm Innovation Center, Inc. All rights
>> reserved.
>> + */
>> +
>> +#include <linux/of.h>
>> +#include <linux/slab.h>
>> +#include <linux/mutex.h>
>> +#include <linux/sched.h>
>> +#include <linux/gunyah.h>
>> +#include <linux/module.h>
>> +#include <linux/of_irq.h>
>> +#include <linux/kthread.h>
>> +#include <linux/notifier.h>
>> +#include <linux/workqueue.h>
>> +#include <linux/completion.h>
>> +#include <linux/gunyah_rsc_mgr.h>
>> +#include <linux/platform_device.h>
>> +
>> +#include "rsc_mgr.h"
>> +
>> +/* Resource Manager Header */
>> +struct gh_rm_rpc_hdr {
>
> This is a style thing, but I prefer *not* having struct
> definitions with related value definitions interleaved.
> My preference would be to define the values above the
> struct definitions. A comment can indicate which field
> a given group of definitions is associated with. And
> if it's ambiguous, the symbol names can further tie
> them together.
>
Done.
>> +#define RM_RPC_HDR_VERSION_ONE 0x1
>
> I don't think HEADER_VERSION_ONE is any more understandable
> than just "1" if that's its value. Unless you're envisioning
> a more complex versioning scheme, using 1, 2, 3 (etc.) for
> the versions is cleaner than VERSION_ONE, VERSION_TWO,
> VERSION_THREE, etc.
> >> +#define RM_RPC_API_VERSION_MASK GENMASK(3, 0)
>> +
>> +#define RM_RPC_HDR_WORDS 0x2
>
> Here's another case where it might just be simpler to open-code
> the numeric value in the code. Even better, I believe this
> represents the number of words in an RPC header. That is better
> expressed as:
> (sizeof(struct gh_rm_rpc_hdr) / sizeof(u32))
>
> At this point it seems there *is* only one version. Even so,
> the point of this (currently anyway) is to define the Gunyah
> RPC API that *this driver* supports. That doesn't change for
> any message header, so maybe you could just define something
> like this:
>
> #define RM_RPC_API_VERSION FIELD_PREP(RM_RPC_API_VERSION_MASK, 1)
> #define RM_RPC_HDR_WORDS FIELD_PREP(RM_RPC_HEADER_WORDS_MASK,
> (sizeof(structgh_rm_rpc_hdr) /
> sizeof(u32))
> #define RM_RPC_API (RM_RPC_API_VERSION | RM_RPC_HDR_WORDS)
>
> Then you can use RM_RPC_API in gh_rm_send_request() to fill in
> the api field of the RPC message header. You could also check
> it in gh_rm_msgq_rx_data() (something you don't currently do).
> This might look a little ugly, but it simplifies the code where
> it's used.
>
I like this approach, it doesn't seem uglier to me.
>> +#define RM_RPC_HEADER_WORDS_MASK GENMASK(7, 4)
>> + u8 api;
>> +
>> +#define RM_RPC_TYPE_CONT 0x0
>> +#define RM_RPC_TYPE_REQ 0x1
>
> s/REQ/REQUEST/ (maybe)
>
> Also there is a missing tab between the name and its value.
>
>> +#define RM_RPC_TYPE_RPLY 0x2
>
> s/RPLY/REPLY/
>
>> +#define RM_RPC_TYPE_NOTIF 0x3
>> +#define RM_RPC_TYPE_MASK GENMASK(1, 0)
>> +
>> +#define GH_RM_MAX_NUM_FRAGMENTS 62
>> +#define RM_RPC_FRAGMENTS_MASK GENMASK(7, 2)
>> + u8 type;
>> + __le16 seq;
>> + __le32 msg_id;
>> +} __packed;
>> +
>> +/* Standard reply header */
>> +struct gh_rm_rpc_reply_hdr {
>> + struct gh_rm_rpc_hdr rpc_hdr;
>> + u32 err_code;
>
> Why is endianness represented in the sequence numbe r and
> message id in the header, but not in the error code here?
> Is the error code set by local software or something?
>
I had missed adding this in the explicit le32_to_cpu conversions, it's
added now.
> The values that can land in err_code are defined in
> "rsc_mgr.h" defined at the end of this patch. Some
> examples are GH_RM_ERROR_OK and GH_RM_ERROR_NOMEM.
> Please indicate that somehow, for example:
>
> u32 err_code; /* GH_RM_ERROR_**/
>
Done.
> These values are (almost) always positive. But I see
> two problems:
> - If non-zero, this value is returned by gh_rm_call(), and
> that function is expected to return a *negative* value
> on error.
> - RM_ERROR_UNIMPLEMENTED is defined as 0xffffffff; that
> value, when interpreted as a signed int, is -1, which
> if interpreted as an errno is EPERM. That is at best
> a misleading result if returned by gh_rm_call().
>
Done. I've added retry logic for GH_ERROR_RETRY (so far never returned
by resource manager, but it's in the spec).
>> +} __packed;
>> +
>> +#define GH_RM_MAX_MSG_SIZE (GH_MSGQ_MAX_MSG_SIZE - sizeof(struct
>> gh_rm_rpc_hdr))
>> +
>> +/**
>> + * struct gh_rm_connection - Represents a complete message from
>> resource manager
>> + * @payload: Combined payload of all the fragments (msg headers
>> stripped off).
>> + * @size: Size of the payload.
>
> The size is the number of bytes of payload received *so far* (in
> case there are fragments). It is not the size of the payload
> until it has been completely received.
>
I've updated the comment to mention this.
>> + * @ret: Linux return code, set in case there was an error processing
>> connection
>> + * @msg_id: Message ID from the header.
>> + * @type: RM_RPC_TYPE_RPLY or RM_RPC_TYPE_NOTIF.
>> + * @num_fragments: total number of fragments expected to be received.
>> + * @fragments_received: fragments received so far.
>> + * @rm_error: For request/reply sequences with standard replies.
>> + * @seq: Sequence ID for the main message.
>> + * @seq_done: Signals caller that the RM reply has been received
>> + */
>> +struct gh_rm_connection {
>> + void *payload;
>> + size_t size;
>> + int ret;
>> + u32 msg_id;
>
> The msg_id field is actually opaque from this driver's perspective.
> The only thing that matters is if a response message ID *matches*
> a request message ID. It could be maintained in __le32 byte order
> (as received).
>
Done.
>> + u8 type;
>> +
>> + u8 num_fragments;
>> + u8 fragments_received;
>> +
>
> The next three fields are (as the comment says) only used for
> request/reply sequences. You could put them in a structure,
> then have that structure share space in a union with a work
> structure used for notification messages. That could replace
> struct gh_rm_notif_complete (which would become unnecessary).
>
Done.
>> + /* only for req/reply sequence */
>> + u32 rm_error;
>> + u16 seq;
>> + struct completion seq_done;
>> +};
>> +
>> +struct gh_rm_notif_complete {
>> + struct gh_rm_connection *conn;
>> + struct work_struct work;
>> +};
>> +
>> +struct gh_rm_rpc {
>
> I actually think this structure should just be called "gh_rm".
> Yes, it's the RPC interface for the resource manager, but this
> interface (which happens to use RPC) abstractly represents the
> resource manager *itself*.
>
Done.
>> + struct device *dev;
>> + struct gunyah_resource tx_ghrsc, rx_ghrsc;
>> + struct gh_msgq msgq;
>> + struct mbox_client msgq_client;
>> + struct gh_rm_connection *active_rx_connection;
>> + int last_tx_ret;
>> +
>> + struct idr call_idr;
>> + struct mutex call_idr_lock;
>> +
>> + struct mutex send_lock;
>> +
>> + struct work_struct recv_work;
>> +};
>> +
>> +static struct gh_rm_connection *gh_rm_alloc_connection(u32 msg_id, u8
>> type)
>> +{
>> + struct gh_rm_connection *connection;
>> +
>> + connection = kzalloc(sizeof(*connection), GFP_KERNEL);
>> + if (!connection)
>> + return NULL;
>> +
>> + connection->type = type;
>> + connection->msg_id = msg_id;
>> +
>> + return connection;
>> +}
>> +
>> +static int gh_rm_init_connection_payload(struct gh_rm_connection
>> *connection, void *msg,
>> + size_t hdr_size, size_t msg_size)
>> +{
>> + struct gh_rm_rpc_hdr *hdr = msg;
>> + size_t max_buf_size, payload_size;
>> +
>> + if (hdr_size > msg_size)
>> + return -EINVAL;
>> +
>> + payload_size = msg_size - hdr_size;
>
> The maximum payload size is 14616 bytes. The msg_size comes
> from the resource manager--which I guess is trustworthy. But
> you're already checking for a short message above, so you
> might as well check for a message that's too big as well.
> (Or just decide not to check either...)
>
Done.
>> + connection->num_fragments = FIELD_GET(RM_RPC_FRAGMENTS_MASK,
>> hdr->type);
>
> Here too, num_fragments had better not be more than
> GH_RM_MAX_NUM_FRAGMENTS (= 62).
>
>> + connection->fragments_received = 0;
>> +
>> + /* There's not going to be any payload, no need to allocate
>> buffer. */
>> + if (!payload_size && !connection->num_fragments)
>> + return 0;
>> +
>> + /*
>> + * maximum payload size is GH_MSGQ_MAX_MSG_SIZE- hdr_size
>> + * and can received (hdr->fragments + 1) of those
>> + */
>> + max_buf_size = (GH_MSGQ_MAX_MSG_SIZE - hdr_size) *
>> (connection->num_fragments + 1);
>> +
>> + connection->payload = kzalloc(max_buf_size, GFP_KERNEL);
>> + if (!connection->payload)
>> + return -ENOMEM;
>> +
>> + memcpy(connection->payload, msg + hdr_size, payload_size);
>> + connection->size = payload_size;
>> + return 0;
>> +}
>> +
>> +static void gh_rm_notif_work(struct work_struct *work)
>> +{
>> + struct gh_rm_notif_complete *notif = container_of(work, struct
>> gh_rm_notif_complete, work);
>> + struct gh_rm_connection *connection = notif->conn;
>> +
>> + /* No users of notifications, yet. */
>> +
>> + kfree(connection->payload);
>> + kfree(connection);
>> + kfree(notif);
>> +}
>> +
>> +static struct gh_rm_connection *gh_rm_process_notif(struct gh_rm_rpc
>> *rsc_mgr,
>
> Here you use "rsc_mgr" as the symbol having this type, while
> elsewhere you use "rm". Pick one, and use it consistenly (I
> suggest using "rm" to represent a "struct gh_rm").
>
Done
>> + void *msg, size_t msg_size)
>> +{
>> + struct gh_rm_rpc_hdr *hdr = msg;
>> + struct gh_rm_connection *connection;
>> +
>> + connection = gh_rm_alloc_connection(le32_to_cpu(hdr->msg_id),
>
> We already know the header type is RM_RPC_TYPE_NOTIF. Better
> to state that explicitly here rather than decoding what's in
> the header (again).
>
Done.
>> + FIELD_GET(RM_RPC_TYPE_MASK, hdr->type));
>> + if (!connection) {
>> + dev_err(rsc_mgr->dev, "Failed to alloc connection for
>> notification, dropping.\n");
>> + return NULL;
>> + }
>> +
>> + if (gh_rm_init_connection_payload(connection, msg, sizeof(*hdr),
>> msg_size)) {
>
> This could conceivably return -EINVAL as well (which is not an
> allocation failure, despite what the message says). The same
> thing is done in gh_rm_process_rply(). (If the resource manager
> is as trusted as it should be, and the hardware is not broken,
> -EINVAL should never get returned...)
>
Updated the message to allow for -EINVAL.
>> + dev_err(rsc_mgr->dev, "Failed to alloc connection buffer for
>> notification, dropping.\n");
>> + kfree(connection);
>> + return NULL;
>> + }
>> +
>> + return connection;
>> +}
>> +
>> +static struct gh_rm_connection *gh_rm_process_rply(struct gh_rm_rpc
>> *rsc_mgr,
>> + void *msg, size_t msg_size)
>> +{
>> + struct gh_rm_rpc_reply_hdr *reply_hdr = msg;
>> + struct gh_rm_rpc_hdr *hdr = msg;
>
> Please use:
>
> struct gh_rm_rpc_hdr *hdr = &msg->rpc_hdr;
>
Done.
>> + struct gh_rm_connection *connection;
>> + u16 seq_id = le16_to_cpu(hdr->seq);
>> +
>> + if (mutex_lock_interruptible(&rsc_mgr->call_idr_lock))
>> + return ERR_PTR(-ERESTARTSYS);
>
> THIS IS A BUG.
>
> If this call gets interrupted, we'll return the ERESTARTSYS pointer.
> That's non-null.
>
> But the return value from gh_rm_process_rply() is blindly assigned
> to the rsc_mgr->active_rx_connection pointer in gh_rm_msgq_rx_data()
> if the RPC message type is RM_RPC_TYPE_RPLY.
>
> Thereafter, if that pointer is non-null, it's assumed to refer
> to a valid memory location.
>
> I'm not sure what the right handling is. You could save the
> return value and check it with IS_ERR_OR_NULL(). Perhaps you
> could return NULL here too, but it's actually not clear to me
> you can simply ignore this situation and have the result be
> correct.
>
I don't need interruptible lock here anyway as this function is now only
called from threaded irq context.
>> +
>> + connection = idr_find(&rsc_mgr->call_idr, seq_id);
>> + mutex_unlock(&rsc_mgr->call_idr_lock);
>> +
>> + if (!connection) {
>> + dev_err(rsc_mgr->dev, "Failed to find connection for sequence
>> %u\n", seq_id);
>> + return NULL;
>> + }
>> + if (connection->msg_id != le32_to_cpu(hdr->msg_id)) {
>> + dev_err(rsc_mgr->dev, "Replyfor sequence %u expected msg_id:
>> %x but got %x\n",
>> + seq_id, connection->msg_id, le32_to_cpu(hdr->msg_id));
>> + /*
>> + * Don't complete connection and error the client, maybe
>> + * resource manager will send us the expected reply sequence
>> soon.
>> + */
>> + return NULL;
>> + }
>> +
>> + if (gh_rm_init_connection_payload(connection, msg,
>> sizeof(*reply_hdr), msg_size)) {
>> + dev_err(rsc_mgr->dev, "Failed to alloc connection buffer for
>> sequence %d\n",
>> + seq_id);
>> + /* Send connection complete and error the client. */
>> + connection->ret = -ENOMEM;
>> + complete(&connection->seq_done);
>> + return NULL;
>> + }
>> +
>> + connection->rm_error = reply_hdr->err_code;
>> + return connection;
>
> Since you're already passing in the resource manager as an argument,
> you could take care of assigning its ->connection pointer here as well.
> Doing it this way *might* clean up the caller a bit (but sometimes
> you have to try it to see).
>
In the current implementation, all of the handling
"active_rx_connection" is done just in gh_rm_msgq_rx_data, which I think
makes it easier to track how it's being used. If it's spread it out to
gh_rm_process_*, I think it gets harder to follow if someone is new to
the code.
>> +}
>> +
>> +static void gh_rm_process_cont(struct gh_rm_rpc *rsc_mgr, struct
>> gh_rm_connection *connection,
>> + void *msg, size_t msg_size)
>> +{
>> + struct gh_rm_rpc_hdr *hdr = msg;
>> + size_t payload_size = msg_size - sizeof(*hdr);
>
> Maybe:
> void *payload = hdr + 1;
>
> Here too, the message size is coming from the resource manager.
> But it seems prudent to ensure msg_size is >= sizeof(*hdr)
> and <= GH_MSGQ_MAXMSG_SIZE.
>
The max msg_size comes from hypervisor, but I will add a check that all
the messages we receive are <= GH_MSGQ_MAX_MSG_SIZE.
>> +
>> + /*
>> + * hdr->fragments and hdr->msg_id preserves thevalue from first
>> reply
>> + * or notif message. To detect mishandling, check it's still intact.
>> + */
>> + if (connection->msg_id != le32_to_cpu(hdr->msg_id))
>> + dev_err(rsc_mgr->dev, "Appending mismatched continuation with
>> id %d to connection with id %d\n",
>> + le32_to_cpu(hdr->msg_id), connection->msg_id);
>> + if (connection->num_fragments != FIELD_GET(RM_RPC_FRAGMENTS_MASK,
>> hdr->type))
>> + dev_err(rsc_mgr->dev, "Number of fragments mismatch for seq:
>> %d\n",
>> + le16_to_cpu(hdr->seq));
>
> The sequencer ID is irrelevant on a notification "connection".
> Maybe you should report the message type too?
>
> You copy the contents into the receive buffer anyway?
>
> I think you should drop the remainder of the "connection"
> if you see *any* corruption like this. (I would argue
> this is a bug as well, though maybe it "never happens.")
>
This is currently a case of "never happens". I think dropping remainder
of the connection when this corruption happens is fine approach, too.
I'll do that.
>> +
>> + memcpy(connection->payload + connection->size, msg +
>> sizeof(*hdr), payload_size);
>> + connection->size += payload_size;
>> + connection->fragments_received++;
>> +}
>> +
>> +static bool gh_rm_complete_connection(struct gh_rm_rpc *rsc_mgr,
>> + struct gh_rm_connection *connection)
>
> This is called in one place, and it is always passed the active RX
> connection pointer. If true is returned, that pointer is nulled.
> You could drop the second argument and just do the assignment
> here instead. (With the above-mentioned bug, this is also a
> place where the non-null ERESTARTSYS pointer gets dererferenced.)
>
Same as above comment about tracking active_rx_connection.
>> +{
>> + struct gh_rm_notif_complete *notif_work;
>> +
>> + if (!connection)
>> + return false;
>> +
>> + if (connection->fragments_received != connection->num_fragments)
>> + return false;
>> +
>> + switch (connection->type) {
>> + case RM_RPC_TYPE_RPLY:
>> + complete(&connection->seq_done);
>> + break;
>> + case RM_RPC_TYPE_NOTIF:
>> + notif_work = kzalloc(sizeof(*notif_work), GFP_KERNEL);
>> + if (notif_work == NULL)
>> + break;
>
> Is there any signficant consequence possible from quietly
> ignoring a notification from the resource manager? Should
> this situation be reported at least?
>
> If you embedded the work structure in the connection structure
> as I mentioned earlier, you could avoid this ever being a
> problem.
>
Done by embedding the work structure in connection structure.
>> +
>> + notif_work->conn = connection;
>> + INIT_WORK(¬if_work->work,gh_rm_notif_work);
>> +
>> + schedule_work(¬if_work->work);
>> + break;
>> + default:
>> + dev_err(rsc_mgr->dev, "Invalid message type (%d) received\n",
>> connection->type);
>> + break;
>> + }
>> +
>> + return true;
>> +}
>> +
>> +static void gh_rm_abort_connection(struct gh_rm_connection *connection)
>
> Although both callers immediately overwrite the rm->active_rx_connection
> pointer after this call, you could pass the resource manager pointer
> and make it NULL after the abort, so the old pointer is *gone*.
>
>> +{
>> + switch (connection->type) {
>> + case RM_RPC_TYPE_RPLY:
>> + connection->ret = -EIO;
>> + complete(&connection->seq_done);
>> + break;
>> + case RM_RPC_TYPE_NOTIF:
>> + fallthrough;
>
> The case statement isn't really valuable here. You could
> just do:
>
> if (connection->type == RM_RPC_TYPE_RPLY) {
> ... complete
> } else {
> ... free
> }
>
> (Fine as-is though.)
>
>> + default:
>> + kfree(connection->payload);
>> + kfree(connection);
>> + }
>> +}
>> +
>> +static void gh_rm_msgq_rx_data(struct mbox_client *cl, void *mssg)
>> +{
>> + struct gh_rm_rpc *rsc_mgr = container_of(cl, struct gh_rm_rpc,
>> msgq_client);
>> + struct gh_msgq_rx_data *rx_data = mssg;
>> + void *msg = rx_data->data;
>> + size_t msg_size = rx_data->length;
>> + struct gh_rm_rpc_hdr *hdr;
>> +
>> + if (msg_size <= sizeof(struct gh_rm_rpc_hdr)) {
>> + dev_err(rsc_mgr->dev, "Incomplete message size: %ld is too
>> small\n", msg_size);
>> + return;
>> + }
>
> This is received data, so as long as you're verifying the message
> size you should also ensure it's within the maximum allowed. You
> should also verify the header api byte is correct.
>
Done.
>> +
>> + hdr = msg;
>> + switch (FIELD_GET(RM_RPC_TYPE_MASK, hdr->type)) {
>> + case RM_RPC_TYPE_NOTIF:
>
> Here and in the other cases below, I think it's cleaner if you
> move the check of the active_rx_connection poiner (whether it's
> NULL or not) into the called functions. That way they're all
> checking based on their own requirements rather than having
> that logic be here.
>
>> + if (rsc_mgr->active_rx_connection) {
>> + /* Not possible per protocol. Do something better than
>> BUG_ON */
>> + dev_err(rsc_mgr->dev, "Received start of new notification
>> without finishing existing message series.\n");
>> + gh_rm_abort_connection(rsc_mgr->active_rx_connection);
>> + }
>> + rsc_mgr->active_rx_connection = gh_rm_process_notif(rsc_mgr,
>> msg, msg_size);
>> + break;
>> + case RM_RPC_TYPE_RPLY:
>> + if (rsc_mgr->active_rx_connection) {
>> + /* Not possible per protocol. Do something better than
>> BUG_ON */
>> + dev_err(rsc_mgr->dev, "Received start of new reply
>> without finishing existing message series.\n");
>> + gh_rm_abort_connection(rsc_mgr->active_rx_connection);
>> + }
>> + rsc_mgr->active_rx_connection = gh_rm_process_rply(rsc_mgr,
>> msg, msg_size);
>> + break;
>> + case RM_RPC_TYPE_CONT:
>> + if (!rsc_mgr->active_rx_connection) {
>> + dev_err(rsc_mgr->dev, "Received a continuation message
>> without receiving initial message\n");
>> + break;
>> + }
>> + gh_rm_process_cont(rsc_mgr, rsc_mgr->active_rx_connection,
>> msg, msg_size);
>> + break;
>> + default:
>> + dev_err(rsc_mgr->dev, "Invalid message type (%lu) received\n",
>> + FIELD_GET(RM_RPC_TYPE_MASK, hdr->type));
>> + return;
>> + }
>> +
>> + if (gh_rm_complete_connection(rsc_mgr,
>> rsc_mgr->active_rx_connection))
>> + rsc_mgr->active_rx_connection = NULL;
>> +}
>> +
>> +static void gh_rm_msgq_tx_done(struct mbox_client *cl, void *mssg,
>> int r)
>> +{
>> + struct gh_rm_rpc *rsc_mgr = container_of(cl, struct gh_rm_rpc,
>> msgq_client);
>> +
>> + kfree(mssg);
>> + rsc_mgr->last_tx_ret = r;
>> +}
>> +
>
> It is allowable to pass a null req_buff pointer here. The handling in
> that case is a little different, and I think it's important to both
> state that a null pointer is OK, and to explain what it means (or
> how its processing is different).
>
Done.
>> +static int gh_rm_send_request(struct gh_rm_rpc *rsc_mgr, u32 message_id,
>> + const void *req_buff, size_t req_buff_size,
>> + struct gh_rm_connection *connection)
>> +{
>> + size_t buff_size_remaining = req_buff_size;
>> + const void *req_buff_curr = req_buff;
>> + struct gh_rm_rpc_hdr *hdr;
>> + u32 cont_fragments = 0;
>> + size_t payload_size;
>> + struct gh_msgq_tx_data *msg;
>> + int i, ret;
>> +
>> + if (req_buff_size)
>> + cont_fragments = (req_buff_size - 1) / GH_RM_MAX_MSG_SIZE;
>> +
>> + if (WARN(cont_fragments > GH_RM_MAX_NUM_FRAGMENTS,
>> + "Limit exceeded for the number of fragments: %u\n",
>> cont_fragments))
>> + return -E2BIG;
>
> You could just do:
>
> if (WARN(req_buff_size > GH_RM_MAX_MSG_SIZE * GHRM_MAX_NUM_FRAGMENTS,
> ...
>
> And then your loop below could just do:
>
> u8 type = RM_RPC_TYPE_REQ;
>
> while (req_buff_size) {
> payload_size = max(buff_size_remaining,
> GH_RM_MAX_MSG_SIZE);
>
> req_buff_size -= payload_size;
> ...
> msg->push = !req_buff_size;
> ...
> type = RM_RPC_TYPE_CONT;
> }
>
Done.
>> +
>> + ret = mutex_lock_interruptible(&rsc_mgr->send_lock);
>> + if (ret)
>> + return ret;
>> +
>> + /* Consider also the 'request' packet for the loop count */
>> + for (i = 0; i <= cont_fragments; i++) {
>> + if (buff_size_remaining > GH_RM_MAX_MSG_SIZE) {
>> + payload_size = GH_RM_MAX_MSG_SIZE;
>> + buff_size_remaining -= payload_size;
>> + } else {
>> + payload_size = buff_size_remaining;
>> + }
>> +
>> + msg = kzalloc(sizeof(*msg)+ GH_MSGQ_MAX_MSG_SIZE, GFP_KERNEL);
>
> Use:
> msg = kzalloc(sizeof(*msg) + payload_size, GFP_KERNEL);
>
> But even better:
> msg = kzalloc(struct_size(msg, data, payload_size),GFP_KERNEL);
>
> OR... since these buffers are always 256 bytes apiece, you could
> create a slab cache so they're allocated efficiently.
>
Will switch to kmem_cache.
>> + if (!msg) {
>> + ret = -ENOMEM;
>> + gotoout;
>> + }
>> +
>> + /* Fill header */
>> + hdr = (struct gh_rm_rpc_hdr *)msg->data;
>> + hdr->api = FIELD_PREP(RM_RPC_API_VERSION_MASK,
>> RM_RPC_HDR_VERSION_ONE) |
>> + FIELD_PREP(RM_RPC_HEADER_WORDS_MASK, RM_RPC_HDR_WORDS);
>
> I suggested defining this api value at the top, since it's
> constant for a given build.
>
Ack.
>> + hdr->type = FIELD_PREP(RM_RPC_TYPE_MASK,
>> + i == 0 ? RM_RPC_TYPE_REQ : RM_RPC_TYPE_CONT) |
>> + FIELD_PREP(RM_RPC_FRAGMENTS_MASK, cont_fragments);
>
> I think FIELD_PREP() (and u32_encode_bits()) can look a
> little cumbersome. I think this would be more readable:
>
> u8 type = i ? RM_RPC_TYPE_CONT : RM_RPC_TYPE_REQ;
>
> hdr->type = FIELD_PREP(RM_RPC_TYPE_MASK, type);
> hdr->type |= FIELD_PREP(RM_RPC_FRAGMENTS_MASK, cont_fragments);
>
>
>> + hdr->seq = cpu_to_le16(connection->seq);
>> + hdr->msg_id = cpu_to_le32(message_id);
>> +
>> + /* Copy payload */
>
> Maybe:
> void *payload = hdr + 1;
> ...
> memcpy(payload, req_buff_curr, payload_size);
>
>> + memcpy(msg->data + sizeof(*hdr), req_buff_curr, payload_size);
>> + req_buff_curr += payload_size;
>> +
>> + /* Force the last fragment to immediately alert the receiver */
>> + msg->push = i == cont_fragments;
>> + msg->length = sizeof(*hdr)+ payload_size;
>> +
>> + ret = mbox_send_message(gh_msgq_chan(&rsc_mgr->msgq), msg);
>> + if (ret < 0) {
>> + kfree(msg);
>> + break;
>> + }
>> +
>> + if (rsc_mgr->last_tx_ret) {
>> + ret = rsc_mgr->last_tx_ret;
>> + break;
>> + }
>> + }
>> +
>> +out:
>> + mutex_unlock(&rsc_mgr->send_lock);
>> + return ret < 0 ? ret : 0;
>> +}
>> +
>> +/**
>> + * gh_rm_call: Achieve request-response type communication with RPC
>
> * @rsc_mgr: ...
>
>> + * @message_id: The RM RPC message-id
>> + * @req_buff: Request buffer that contains the payload
>> + * @req_buff_size: Total size of the payload
>> + * @resp_buf: Pointer to a response buffer
>> + * @resp_buff_size: Size of the response buffer
>> + *
>> + * Make a request to the RM-VM and wait for reply back. For a successful
>> + * response, the function returns the payload. The size of the
>> payload is set in
>> + * resp_buff_size. The resp_buf should be freed by the caller.
>> + *
>> + * Context: Process context. Will sleep waiting for reply.
>> + * Return: >0 is standard reply error from RM. <0 on internal error.
>> + */
>> +int gh_rm_call(struct gh_rm_rpc *rsc_mgr, u32 message_id, void
>> *req_buff, size_t req_buff_size,
>> + void **resp_buf, size_t *resp_buff_size)
>> +{
>> + struct gh_rm_connection *connection;
>> + int ret;
>> +
>> + /* messaged_id 0 is reserved */
>> + if (!message_id)
>> + return -EINVAL;
>> +
>
> Maybe check for null rsc_mgr first.
>
Every caller is guaranteed to have valid rm pointer.
>> + if (!rsc_mgr)
>> + return -EPROBE_DEFER;
>> +
>> + connection = gh_rm_alloc_connection(message_id, RM_RPC_TYPE_RPLY);
>> + if (!connection)
>> + return -ENOMEM;
>> +
>> + init_completion(&connection->seq_done);
>> +
>> + /* Allocate a new seq number for this connection */
>> + if (mutex_lock_interruptible(&rsc_mgr->call_idr_lock)) {
>
> Maybe use the goto ... pattern here.
>
>> + kfree(connection);
>> + return -ERESTARTSYS;
>> + }
>> + connection->seq = idr_alloc_cyclic(&rsc_mgr->call_idr,
>> connection, 0, U16_MAX, GFP_KERNEL);
>
> Note that idr_alloc_cyclic() *can* return an error.
>
Done.
>> + mutex_unlock(&rsc_mgr->call_idr_lock);
>> +
>> + /* Send the request to the Resource Manager */
>> + ret = gh_rm_send_request(rsc_mgr, message_id, req_buff,
>> req_buff_size, connection);
>> + if (ret < 0)
>> + goto out;
>> +
>> + /* Wait for response */
>> + ret = wait_for_completion_interruptible(&connection->seq_done);
>> + if (ret)
>> + goto out;
>> +
>> + if (connection->ret) {
>> + ret = connection->ret;
>
> I note that this is a negative errno. (You do explain
> that non-zero return means error above.)
>
>> + kfree(connection->payload);
>> + goto out;
>> + }
>> +
>> + if (connection->rm_error) {
>> + ret = connection->rm_error;
>
> While this is positive resource manager error (with one exception
> I mentioned earlier).
>
>> + kfree(connection->payload);
>> + goto out;
>> + }
>> +
>> + *resp_buf = connection->payload;
>> + *resp_buff_size = connection->size;
>> +
>> +out:
>> + mutex_lock(&rsc_mgr->call_idr_lock);
>
> Why is the mutex_lock() here not interruptible? Or rather, if
> you can do a non-interruptible wait here, why not above?
>
I've removed the interruptible call_idr_lock because all the sections
which use are quick.
>> + idr_remove(&rsc_mgr->call_idr, connection->seq);
>> + mutex_unlock(&rsc_mgr->call_idr_lock);
>> +
>> + kfree(connection);
>> + return ret;
>> +}
>> +
>> +static int gh_msgq_platform_probe_direction(struct platform_device
>> *pdev,
>> + u8 gh_type, int idx, struct gunyah_resource *ghrsc)
>
> I think I might call this gh_msgq_platform_probe_msgq() and pass a
> Boolean indicating whether it's being called for the RX (versus TX)
> message queue. Then deduce the gh_type and the index from that
> argument.
>
Done.
> -Alex
>
>> +{
>> + int ret;
>> + struct device_node *node = pdev->dev.of_node;
>> +
>> + ghrsc->type = gh_type;
>> +
>> + ghrsc->irq = platform_get_irq(pdev, idx);
>> + if (ghrsc->irq < 0) {
>> + dev_err(&pdev->dev, "Failed to get irq%d: %d\n", idx,
>> ghrsc->irq);
>> + return ghrsc->irq;
>> + }
>> +
>> + ret = of_property_read_u64_index(node, "reg", idx,&ghrsc->capid);
>> + if (ret) {
>> + dev_err(&pdev->dev, "Failed to get capid%d: %d\n", idx, ret);
>> + return ret;
>> + }
>> +
>> + return 0;
>> +}
>> +
>> +static int gh_rm_drv_probe(struct platform_device *pdev)
>> +{
>> + struct gh_rm_rpc *rsc_mgr;
>> + int ret;
>> +
>> + rsc_mgr = devm_kzalloc(&pdev->dev, sizeof(*rsc_mgr), GFP_KERNEL);
>> + if (!rsc_mgr)
>> + return -ENOMEM;
>> +
>> + platform_set_drvdata(pdev, rsc_mgr);
>> + rsc_mgr->dev = &pdev->dev;
>> +
>> + mutex_init(&rsc_mgr->call_idr_lock);
>> + idr_init(&rsc_mgr->call_idr);
>> + mutex_init(&rsc_mgr->send_lock);
>> +
>> + ret = gh_msgq_platform_probe_direction(pdev,
>> GUNYAH_RESOURCE_TYPE_MSGQ_TX, 0,
>> + &rsc_mgr->tx_ghrsc);
>> + if (ret)
>> + return ret;
>> +
>> + ret = gh_msgq_platform_probe_direction(pdev,
>> GUNYAH_RESOURCE_TYPE_MSGQ_RX, 1,
>> + &rsc_mgr->rx_ghrsc);
>> + if (ret)
>> + return ret;
>> +
>> + rsc_mgr->msgq_client.dev = &pdev->dev;
>> + rsc_mgr->msgq_client.tx_block = true;
>> + rsc_mgr->msgq_client.rx_callback = gh_rm_msgq_rx_data;
>> + rsc_mgr->msgq_client.tx_done = gh_rm_msgq_tx_done;
>> +
>> + return gh_msgq_init(&pdev->dev, &rsc_mgr->msgq,
>> &rsc_mgr->msgq_client,
>> + &rsc_mgr->tx_ghrsc, &rsc_mgr->rx_ghrsc);
>> +}
>> +
>> +static int gh_rm_drv_remove(struct platform_device *pdev)
>> +{
>> + struct gh_rm_rpc *rm = platform_get_drvdata(pdev);
>> +
>> + mbox_free_channel(gh_msgq_chan(&rm->msgq));
>> + gh_msgq_remove(&rm->msgq);
>> +
>> + return 0;
>> +}
>> +
>> +static const struct of_device_id gh_rm_of_match[] = {
>> + { .compatible = "gunyah-resource-manager" },
>> + {}
>> +};
>> +MODULE_DEVICE_TABLE(of, gh_rm_of_match);
>> +
>> +static struct platform_driver gh_rm_driver = {
>> + .probe = gh_rm_drv_probe,
>> + .remove = gh_rm_drv_remove,
>> + .driver = {
>> + .name = "gh_rsc_mgr",
>> + .of_match_table = gh_rm_of_match,
>> + },
>> +};
>> +module_platform_driver(gh_rm_driver);
>> +
>> +MODULE_LICENSE("GPL");
>> +MODULE_DESCRIPTION("Gunyah Resource Manager Driver");
>> diff --git a/drivers/virt/gunyah/rsc_mgr.h
>> b/drivers/virt/gunyah/rsc_mgr.h
>> new file mode 100644
>> index 000000000000..b5bb36a7a4cc
>> --- /dev/null
>> +++ b/drivers/virt/gunyah/rsc_mgr.h
>> @@ -0,0 +1,36 @@
>> +/* SPDX-License-Identifier: GPL-2.0-only */
>> +/*
>> + * Copyright (c) 2022 Qualcomm Innovation Center, Inc. All rights
>> reserved.
>> + */
>> +#ifndef __GH_RSC_MGR_PRIV_H
>> +#define __GH_RSC_MGR_PRIV_H
>> +
>> +#include <linux/gunyah.h>
>> +#include <linux/types.h>
>> +
>> +/* RM Error codes */
>> +#define GH_RM_ERROR_OK 0x0
>> +#define GH_RM_ERROR_UNIMPLEMENTED 0xFFFFFFFF
>> +#define GH_RM_ERROR_NOMEM 0x1
>> +#define GH_RM_ERROR_NORESOURCE 0x2
>> +#define GH_RM_ERROR_DENIED 0x3
>> +#define GH_RM_ERROR_INVALID 0x4
>> +#define GH_RM_ERROR_BUSY 0x5
>> +#define GH_RM_ERROR_ARGUMENT_INVALID 0x6
>> +#define GH_RM_ERROR_HANDLE_INVALID 0x7
>> +#define GH_RM_ERROR_VALIDATE_FAILED 0x8
>> +#define GH_RM_ERROR_MAP_FAILED 0x9
>> +#define GH_RM_ERROR_MEM_INVALID 0xA
>> +#define GH_RM_ERROR_MEM_INUSE 0xB
>> +#define GH_RM_ERROR_MEM_RELEASED 0xC
>> +#define GH_RM_ERROR_VMID_INVALID 0xD
>> +#define GH_RM_ERROR_LOOKUP_FAILED 0xE
>> +#define GH_RM_ERROR_IRQ_INVALID 0xF
>> +#define GH_RM_ERROR_IRQ_INUSE 0x10
>> +#define GH_RM_ERROR_IRQ_RELEASED 0x11
>> +
>> +struct gh_rm_rpc;
>> +int gh_rm_call(struct gh_rm_rpc *rsc_mgr, u32 message_id, void
>> *req_buff, size_t req_buff_size,
>> + void **resp_buf, size_t *resp_buff_size);
>> +
>> +#endif
>> diff --git a/include/linux/gunyah_rsc_mgr.h
>> b/include/linux/gunyah_rsc_mgr.h
>> new file mode 100644
>> index 000000000000..b4f55c19954b
>> --- /dev/null
>> +++ b/include/linux/gunyah_rsc_mgr.h
>> @@ -0,0 +1,18 @@
>> +/* SPDX-License-Identifier: GPL-2.0-only */
>> +/*
>> + * Copyright (c) 2022 Qualcomm Innovation Center, Inc. All rights
>> reserved.
>> + */
>> +
>> +#ifndef _GUNYAH_RSC_MGR_H
>> +#define _GUNYAH_RSC_MGR_H
>> +
>> +#include <linux/list.h>
>> +#include <linux/notifier.h>
>> +#include <linux/gunyah.h>
>> +
>> +#define GH_VMID_INVAL U16_MAX
>> +
>> +/* Gunyah recognizes VMID0 as an alias to the current VM's ID */
>> +#define GH_VMID_SELF 0
>> +
>> +#endif
>
More information about the linux-arm-kernel
mailing list