Skip to content

Application de certification PCI DSS 11.6.1 permettant la vérification de skimmer sur une page de paiement

License

Notifications You must be signed in to change notification settings

LaCapitainerie/PCI-Scanner

Repository files navigation

PCI-Scanner

Presentation

This script is about to resolve the PCI certification 11.6.1, the main goal is to detect and prevent any skimmer that would be in a payement page

How it works ?

the application is separate in 4 distincts docker container

  • API , a python uvicorn easily maintanable server is deploy and allow user to make scan, retrieve report and add client and perimeter
  • Reverse Proxy , an nginx server is there to allow dynamic redirection and paths
  • DB , a PostgreSQL is deploy with a volume to keep informations even when rebooting
  • CRON , a simple crontab server allow automatic scan of perimeter and mail sending

API

How it work

SQLAlchemy

It's a python ORM that link to the DB to avoid doublons of work, with it SQL request are really simple, on top of the file app.py there are some Class like this one :

class Client(Base):
    __tablename__ = "clients"
    client_id = Column(Integer, primary_key=True, index=True)
    name = Column(Text, nullable=False)

Theses are the table of the SQL DB, so to use it correctly

Note

always be sure your class match the db There, you can do any request simply by calling the db and the classes.

Fastapi

Is the route orchestrator that is mainly used in python because of it's simplicity and speed. Route are made like this :

@app.get("/clients", response_class=HTMLResponse)
def get_clients(db: Session = Depends(get_db)):
    clients = db.query(Client).all()
    return HTMLResponse(content=str(dumps([client.__dict__ for client in clients])), status_code=200)

How to maintain the code and add some

for SQLAlchemy it's simple you just have to take what the SQL table look like and copy it to python, nothing more, nothing less

for Fastapi, a route is composed of 3 subparts :

  • route data
  • arguments
  • SQL query

Route data

@app.get("/clients", response_class=HTMLResponse)

in this example the http method is the function used in the dataclasses. @app.get, then the route used is "/clients", and the response an HTMLResponse, you can put other thing for the response but you will always use this one as you need to seed infos to your clients even just a confirmation toast.

Arguments

def get_clients(db: Session = Depends(get_db)):

this one a bit tricky, cause there is plenty of method to add arguments, first the one you see above, have to always be there a it's the link to the DB, others args you could use are from dynamic path or query, or body.

for all of theses, you can simply add them like it's an arg of a function

Important

args from dynamic paths needs to be in the route name

@app.get("/scans/{scan_id}", response_class=HTMLResponse)
def get_scan(scan_id: int, db: Session = Depends(get_db), page: str = "0", perpage: str = "10"):

There, 2 types of args exists :

  • Dynamic path arg, which is scan_id pass in the route definition and in the arguemnt function
  • Query arg, which are page and perpage, which i choose to defined by default in case a request don't use them, other thing would be to defined the as Optional[str], to make them str | None, but you would have to make some cases where each are None, which can be long if you have some

Tip

Define query arg with a default value avoid making test and case if the value is None

Last type of Arg is the request body :

@app.post("/scan", response_class=HTMLResponse)
async def create_scan(request: Request, db: Session = Depends(get_db)):

this one can actually store more of them as it's a json that is pass, that's why the function is asynchronous, you have to unjson before getting data :

body:dict[str, Any] = await request.json()

SQL Query

as you see in every route there is an argument db: Session = Depends(get_db), this one is used to get the connection to the DB, from there you can just call some function and the return will be a tuple of the query as so :

scans_perimeters = db.query(Scan, Perimeter)\
    .join(Perimeter, Scan.perimeter == Perimeter.perimeter_id)\
    .join(Client, Perimeter.client_id == Client.client_id)\
    .filter(Client.name == client)\
    .order_by(Scan.ran.desc())\
    .offset(int(perpage)*int(page))\
    .limit(int(perpage)).all()

In this one we want to retrieve Scan and Perimeter table linked up, so we query them, then we make the key join to query Perimeter, join another table for the filter, order it to make appear report from the recent to the old one, and then we use the query args page and perpage to get only data we want and give them to the client.

DB

DB is actually a single file inside the folder, you can update it as you wan't but will have to make an export of data first as you will need to reinit the db from scratch which will delete all data, Attention don't forget to update the ORM File to match up python code

Caution

When updating the database table you will have to delete the database to make another one so save your data before

CRON

Finally Cron scanner is 2 files, a Dockerfile that you don't have to touch and a bash script that check every day the DB to see if some scan have to run today, if they will be run else nothing will happen

About

Application de certification PCI DSS 11.6.1 permettant la vérification de skimmer sur une page de paiement

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published