Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 77 additions & 16 deletions india_compliance/gst_india/api_classes/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
from base64 import b64decode
from urllib.parse import urljoin

Expand All @@ -18,7 +19,28 @@
class BaseAPI:
API_NAME = "GST"
BASE_PATH = ""
SENSITIVE_INFO = ("x-api-key",)
PLACEHOLDER = "*****"
DEFAULT_MASK_MAP = {
"headers": [
"x-api-key",
"auth-token",
"auth_token",
"AuthToken",
"password",
"Password",
],
"output": [
"auth-token",
"auth_token",
"AuthToken",
"sek",
"Sek",
"rek",
"Rek",
],
"data": ["app_key", "AppKey", "password", "Password"],
"body": ["app_key", "AppKey", "password", "Password"],
}

def __init__(self, *args, **kwargs):
self.settings = frappe.get_cached_doc("GST Settings")
Expand All @@ -43,7 +65,6 @@ def __init__(self, *args, **kwargs):
self.setup(*args, **kwargs)

def setup(*args, **kwargs):
# Override in subclass
pass

def fetch_credentials(self, gstin, service, require_password=True):
Expand Down Expand Up @@ -74,7 +95,6 @@ def _fetch_credentials(self, row, require_password=True):
def get_url(self, *parts):
parts = list(parts)

# If the first part is a URL, return it as it is
if parts and parts[0].startswith("https"):
return parts[0]

Expand Down Expand Up @@ -111,7 +131,6 @@ def _make_request(
url=self.get_url(endpoint),
params=params,
headers={
# auto-generated hash, required by some endpoints
**self.default_headers,
**(headers or {}),
},
Expand Down Expand Up @@ -280,23 +299,65 @@ def mask_sensitive_info(self, log):
output = log.output
data = log.data
request_body = data and data.get("body")
placeholder = "*****"

for key in self.SENSITIVE_INFO:
if key in request_headers:
request_headers[key] = placeholder
# Define specific locations where each type of sensitive info should be masked
sensitive_info_mapping = self._get_sensitive_info_mapping()

if output and key in output:
output[key] = placeholder
self._mask_sensitive_info(
request_headers, sensitive_info_mapping.get("headers")
)

self._mask_sensitive_info(output, sensitive_info_mapping.get("output"))
self._mask_sensitive_info(data, sensitive_info_mapping.get("data"))
self._mask_sensitive_info(request_body, sensitive_info_mapping.get("body"))

def _get_sensitive_info_mapping(self):
"""
Define specific locations where different types of sensitive information should be masked.
Override _get_sensitive_info_overrides() in subclasses to customize specific locations.

if not data:
continue
Returns:
dict: Mapping of data location to list of sensitive keys
"""

if key in data:
data[key] = placeholder
default_mapping = copy.deepcopy(self.DEFAULT_MASK_MAP)

if request_body and key in request_body:
request_body[key] = placeholder
# Get subclass-specific overrides
overrides = self._get_sensitive_info_overrides()

# Merge overrides with default mapping
if overrides:
for location, keys in overrides.items():
if location in default_mapping:
# Replace the entire list for this location
default_mapping[location] = keys if keys is not None else []
else:
# Add new location
default_mapping[location] = keys if keys is not None else []

return default_mapping

def _get_sensitive_info_overrides(self):
"""
Override this method in subclasses to customize sensitive info mapping for specific locations.
Only specify the locations you want to change - others will use the default mapping.

Returns:
dict: Mapping of data location to list of sensitive keys (only for locations to override)
"""
return {}

def _mask_sensitive_info(self, target, sensitive_keys):
"""
Mask sensitive information in the target dictionary based on the provided keys.
This method is called by mask_sensitive_info to apply masking based on the mapping.
"""
if not (target and sensitive_keys):
return

for key in sensitive_keys:
if key in target:
target[key] = self.PLACEHOLDER

def generate_app_key(self, service):
app_key = frappe.generate_hash(length=32)
Expand Down
2 changes: 1 addition & 1 deletion india_compliance/gst_india/api_classes/nic/e_invoice.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

class EInvoiceAPI(BaseAPI):
API_NAME = "e-Invoice"
SENSITIVE_INFO = BaseAPI.SENSITIVE_INFO + ("password", "Password", "AppKey")

IGNORED_ERROR_CODES = {
"1005": "Invalid Token",
# Generate IRN errors
Expand Down
2 changes: 1 addition & 1 deletion india_compliance/gst_india/api_classes/nic/e_waybill.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

class EWaybillAPI(BaseAPI):
API_NAME = "e-Waybill"
SENSITIVE_INFO = BaseAPI.SENSITIVE_INFO + ("password", "app_key")

IGNORED_ERROR_CODES = {
"238": "Invalid auth token",
# Cancel e-waybill errors
Expand Down
7 changes: 0 additions & 7 deletions india_compliance/gst_india/api_classes/taxpayer_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,6 @@ def decrypt_data(self, encrypted_json):


class TaxpayerAuthenticate(BaseAPI):
SENSITIVE_INFO = BaseAPI.SENSITIVE_INFO + (
"auth-token",
"auth_token",
"app_key",
"sek",
"rek",
)

IGNORED_ERROR_CODES = {
"RETOTPREQUEST": "otp_requested",
Expand Down
175 changes: 175 additions & 0 deletions india_compliance/gst_india/api_classes/test_mask_sensitive_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import frappe
from frappe.tests import IntegrationTestCase

from india_compliance.gst_india.api_classes.base import BaseAPI


class TestMaskSensitiveInfo(IntegrationTestCase):
def test_base_api_mask_sensitive_info_mapping(self):
"""Test that BaseAPI correctly maps sensitive info to appropriate locations"""
api = BaseAPI()
mapping = api._get_sensitive_info_mapping()

self.assertIn("headers", mapping)
self.assertIn("output", mapping)
self.assertIn("data", mapping)
self.assertIn("body", mapping)

self.assertIn("x-api-key", mapping["headers"])
self.assertNotIn("x-api-key", mapping["output"])
self.assertNotIn("x-api-key", mapping["data"])
self.assertNotIn("x-api-key", mapping["body"])

def test_mask_sensitive_info_headers_only(self):
"""Test that sensitive headers are masked only in headers"""
api = BaseAPI()

log = frappe._dict(
{
"request_headers": {
"x-api-key": "secret_key",
"content-type": "application/json",
},
"output": {"x-api-key": "should_not_be_masked", "result": "success"},
"data": {"x-api-key": "should_not_be_masked", "param": "value"},
"body": None,
}
)

api.mask_sensitive_info(log)

self.assertEqual(log.request_headers["x-api-key"], api.PLACEHOLDER)
self.assertEqual(log.request_headers["content-type"], "application/json")

self.assertEqual(log.output["x-api-key"], "should_not_be_masked")
self.assertEqual(log.data["x-api-key"], "should_not_be_masked")

def test_mask_sensitive_info_with_request_body(self):
"""Test that sensitive info in request body is handled correctly"""
api = BaseAPI()

log = frappe._dict(
{
"request_headers": {"content-type": "application/json"},
"output": {"result": "success"},
"data": {
"params": {"test": "value"},
"body": {"password": "secret_password", "AppKey": "secret_app_key"},
},
}
)

api.mask_sensitive_info(log)

self.assertEqual(log.data["body"]["password"], BaseAPI.PLACEHOLDER)
self.assertEqual(log.data["body"]["AppKey"], BaseAPI.PLACEHOLDER)
self.assertEqual(log.data["params"]["test"], "value")

def test_mask_sensitive_info_comprehensive(self):
"""Test complete masking with all data structures"""
api = BaseAPI.__new__(BaseAPI)

log = frappe._dict(
{
"request_headers": {
"x-api-key": "secret_api_key",
"auth-token": "secret_auth_token",
"content-type": "application/json",
},
"output": {
"auth_token": "response_auth_token",
"sek": "session_encryption_key",
"rek": "response_encryption_key",
"result": "success",
},
"data": {
"app_key": "data_app_key",
"body": {"app_key": "body_app_key", "username": "test_user"},
},
}
)

api.mask_sensitive_info(log)

self.assertEqual(log.request_headers["x-api-key"], BaseAPI.PLACEHOLDER)
self.assertEqual(log.request_headers["auth-token"], BaseAPI.PLACEHOLDER)
self.assertEqual(log.request_headers["content-type"], "application/json")

self.assertEqual(log.output["auth_token"], BaseAPI.PLACEHOLDER)
self.assertEqual(log.output["sek"], BaseAPI.PLACEHOLDER)
self.assertEqual(log.output["rek"], BaseAPI.PLACEHOLDER)
self.assertEqual(log.output["result"], "success")

self.assertEqual(log.data["app_key"], BaseAPI.PLACEHOLDER)
self.assertEqual(log.data["body"]["app_key"], BaseAPI.PLACEHOLDER)
self.assertEqual(log.data["body"]["username"], "test_user")

def test_mask_sensitive_info_handles_missing_data(self):
"""Test that masking works when some data structures are missing"""
api = BaseAPI()

log = frappe._dict(
{
"request_headers": {"x-api-key": "secret_key"},
"output": None,
"data": None,
}
)

api.mask_sensitive_info(log)

self.assertEqual(log.request_headers["x-api-key"], BaseAPI.PLACEHOLDER)

def test_mask_sensitive_info_no_false_positives(self):
"""Test that legitimate data with similar keys is not masked inappropriately"""
api = BaseAPI.__new__(BaseAPI)

log = frappe._dict(
{
"request_headers": {"content-type": "application/json"},
"output": {
"password_reset_link": "https://example.com/reset",
"result": "success",
},
"data": {
"user_password_policy": "strong",
"body": {"password": "actual_secret"},
},
}
)

api.mask_sensitive_info(log)

self.assertEqual(log.output["password_reset_link"], "https://example.com/reset")
self.assertEqual(log.data["user_password_policy"], "strong")
self.assertEqual(log.data["body"]["password"], BaseAPI.PLACEHOLDER)

def test_sensitive_info_overrides_functionality(self):
"""Test that the override system works correctly - only specified locations are overridden"""

class CustomAPI(BaseAPI):
def _get_sensitive_info_overrides(self):
return {
"headers": ["custom-header", "x-api-key"],
}

api = CustomAPI.__new__(CustomAPI)
mapping = api._get_sensitive_info_mapping()

self.assertEqual(mapping["headers"], ["custom-header", "x-api-key"])

self.assertEqual(mapping["output"], BaseAPI.DEFAULT_MASK_MAP["output"])
self.assertEqual(mapping["data"], BaseAPI.DEFAULT_MASK_MAP["data"])
self.assertEqual(mapping["body"], BaseAPI.DEFAULT_MASK_MAP["body"])

def test_empty_overrides_uses_defaults(self):
"""Test that empty overrides still use default mapping"""

class NoOverrideAPI(BaseAPI):
def _get_sensitive_info_overrides(self):
return {}

api = NoOverrideAPI.__new__(NoOverrideAPI)
mapping = api._get_sensitive_info_mapping()

self.assertEqual(mapping, BaseAPI.DEFAULT_MASK_MAP)
Loading