pdfに設定したパスワードの総当たり解析
お世話になっております、スログラミングです。
今回はパチンコ・スロットに全く関係ありませんのでご承知おきください。
ちょっと個人的に困ったことがあってpythonで解決できたのでそのコードを紹介したいと思います。
最近はテレワークや、ハンコレス等の影響で、今まで紙でやりとりしていた資料をpdfで送るケースが増えてきていると思います。
そうすると、pdfにパスワードをつけて送付したりしますが、「パスワード何にしたかって忘れますよね。」
ということで、今回は、総当たり(Brute Force Attack:ブルートフォースアタック、ブルートフォース攻撃)でパスワードを解析する方法をPythonでプログラミングしてみました。
※他人のパスワードを解析する等の行為は絶対にやめてください。
本日のアジェンダはこちら。
1. 今回作成したコード
おそらくこんな僻地のブログに辿りついた方は相当切羽詰まっていると思いますので、いきなり最終コードからどうぞ。
※並列処理等も組み込んでいますが、凄い時間がかかります。
(PCスペックにもよりますが、2並列処理で、4文字英数字が30分強、5文字英数字が丸1日くらい。)
# coding: UTF-8 # PDF読み込み用 from pikepdf import Pdf # 総当たりの組み合わせ作成用 import itertools # 並列処理用 from multiprocessing import Pool # 処理時間の計測用 import datetime # パスワードを入力してファイルを開いてみる処理 def FileOpen(pw, file_pass): # pdfファイルの場所を指定 pdf_file = open(file_pass, 'rb') # ファイルが開けた時のパスワードを表示 pw = ''.join(pw) try: Pdf.open(pdf_file, password=pw) # ファイルが開けた時のパスワードを表示 print('パスワードは「{}」です'.format(pw)) # ファイルが開けたらTrueを返す return True except: # ファイルが開けなかったらスルー pass # 総当たりを作成する処理 def BruteForceAttack(file_pass, num_p, alphanumeric, pass_num): # プロセスを立ち上げる p = Pool(num_p) # 処理開始時間の表示 dt_now = datetime.datetime.now() print(dt_now, ", 処理開始") # 指定された文字数で処理を行う for i in pass_num: # 途中経過の表示 dt_now = datetime.datetime.now() print(dt_now, ", ", i, "文字開始") # 処理回数のカウント用 cnt = 1 # 総当たりの組み合わせを渡す for passlist in itertools.product(alphanumeric, repeat=i): # 1~100回目まで単発処理して処理時間を記録 if cnt <= 1: # 1回目の処理開始時間を記録 start = datetime.datetime.now() result = p.apply_async(FileOpen, (passlist, file_pass)) result.wait() elif 1 < cnt <= 100: result = p.apply_async(FileOpen, (passlist, file_pass)) result.wait() # 101回目以降は並列処理 else: result = p.apply_async(FileOpen, (passlist, file_pass)) if cnt == 100: # 100回目の処理終了時間を記録 end = datetime.datetime.now() # 1~100回目の処理にかかった時間を算出 start2end = end - start # 1回の処理にかかる時間を算出 wait = start2end.total_seconds() / 100 # 1秒分の処理時間が溜まる処理回数を算出 waitspan = 1 // wait * num_p # 1秒分の処理時間が溜まる度に処理終了を待つ if cnt > 100 and cnt % waitspan == 0: result.wait() # File Openに成功したら処理を終了する if result.get() == True: break # 処理回数をカウント cnt = cnt + 1 # 途中経過の表示 result.wait() dt_now = datetime.datetime.now() print(dt_now, ", ", i, "文字終了") # File Openに成功したら処理を終了する if result.get() == True: break # プロセスを閉じる p.close() p.join() # 処理終了時間の表示 dt_now = datetime.datetime.now() print(dt_now, ", 処理終了") if __name__ == "__main__": # pdfファイルの場所を指定 file_pass = r"D:\Test\test.pdf" # 並列処理のプロセス数 num_p = 2 # パスワードに使う文字 alphanumeric = "abcdefghijklmnopqrstuvwxyz0123456789" # 数字だけの場合(alphanumeric = "0123456789") # 大文字小文字も使っている場合(alphanumeric = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") # パスワードの文字数 pass_num = [1, 2, 3, 4, 5, 6, 7, 8] # 総当たりでパスワードを試す BruteForceAttack(file_pass, num_p, alphanumeric, pass_num)
・使い方
「if __name__ == "__main__":」の以下の部分を修正してください。
1) 「file_pass」の部分を解析したいpdfファイルの場所に変える
2) 「num_p」の部分を実施したい並列処理の個数に変更
使用する環境によって異なると思いますので、CPUやメモリの使用率を見ながら適当に設定して下さい。
3) 「alphanumeric」の部分を解析したい組み合わせにする
ここに入れた文字を総当たりで処理します。
コメントで数字だけの例や、大文字小文字が含まれる例も記載しているので参考にしてください。
当然ですが、数字だけなら、数字だけ総当たりした方が処理が早く終わります。
4) 「pass_num」の部分を解析したいパスワードの文字数に変更
もし文字数を覚えている場合は、指定してください。
例えば、4文字or8文字なら、pass_num = [4, 8]
・結果の表示
本コードを実行すると、下記のような結果が表示されます。
「パスワードは「????」です」と表示されたカッコ内の文言がパスワードとなります。
各文字の総当たり終了後にはその時の時間が表示されるので、あとどのくらい処理がかかりそうかの目安にしてください。
yyyy-mm-dd hh:mm:ss.ssssss 処理開始 yyyy-mm-dd hh:mm:ss.ssssss 1 文字開始 yyyy-mm-dd hh:mm:ss.ssssss 1 文字終了 yyyy-mm-dd hh:mm:ss.ssssss 2 文字開始 yyyy-mm-dd hh:mm:ss.ssssss 2 文字終了 yyyy-mm-dd hh:mm:ss.ssssss 3 文字開始 パスワードは「a1b」です yyyy-mm-dd hh:mm:ss.ssssss 処理終了
2. 試行錯誤の記録
コードの作成にあたり、色々つまづいたことがあるので、試行錯誤した内容を記載しておきます。
同じような壁にぶつかった方の参考になれば幸いです。
3.1. PDFの取り扱い
今回は、pikepdfを用いています。
この手のコードを検索したところ、PyPDF2を使うケースが多い(?)ようですが、手持ちのファイルを読み込ませたところ、以下のエラーが発生してしまいました。
raise NotImplementedError("only algorithm code 1 and 2 are supported") NotImplementedError: only algorithm code 1 and 2 are supported
詳細は全く理解できていませんが、対応していない暗号アルゴリズムの場合に発生するということなので、ファイルが問題なく読み込めたpikepdfを使用しています。
使い方はこんな感じでシンプルです。
変数pwに総当たりのパスワードをガンガン入力していって、pdfを開いてみます。
pdfが開けた時の文字列が正解のパスワードとなります。
# pikepdfのインポート from pikepdf import Pdf # pdfファイルの場所を指定 pdf_file = open(r"D:\Test\test.pdf", 'rb') # ファイルが開けたらその時のパスワードを表示 pw = ''.join(pw) try: Pdf.open(pdf_file, password=pw) print('パスワードは「{}」です'.format(pw)) except: pass
3.2. 並列処理
少しでも処理時間を減らすために、並列処理を組み込んでいます。
が、これ結構手間取りました。
(Pythonで並列処理といえば、multiprocessingライブラリのpoolらしいので、今回もこれを使ってます。)
まずは、一番手軽そうなp.mapを使ってみました。
基本はこんな感じで、「p.map(並列処理したい関数, 関数に投げる組み合わせ)」でOKです。
# 並列処理ライブラリのインポート from multiprocessing import Pool # パスワードを入力してファイルを開いてみる処理 def FileOpen(pw): ・・・ if __name__ == "__main__": # 並列処理のプロセス数 p = Pool(2) # パスワードの組み合わせ alphanumeric = ["aaaa", ・・・, "9999"] # 総当たりの組み合わせを渡す p.map(FileOpen, alphanumeric)
あとは総当たりの組み合わせを作成するだけですが、手動で総当たりの組み合わせを作成するのは無理なので、イテレータ生成関数のitertools.productなるものを使用してみました。
例えば、こんな感じで文字列を渡してあげるだけで勝手に総当たりの組み合わせを作成してくれます。
(厳密には組み合わせを全部作ってるかというとちょっと違うらしい?イテレータの意味はいまいち理解しきれなかったので、この表現でお許しを。)
repeat=8は、8文字の文字列の組み合わせを作成するという意味です。
# 総当たりの組み合わせ作成用 import itertools # パスワードに使う文字 alphanumeric = "abcdefghijklmnopqrstuvwxyz0123456789" # 総当たりの組み合わせを作成 itertools.product(alphanumeric, repeat=8)
この時は思いましたよね、「はいはい、OK。あとは簡単!」と。
で、最初に作ったコードがこちら。
※動かすとメモリ使用率100%になると思いますので、ご注意を。
# coding: UTF-8 # PDF読み込み用 from pikepdf import Pdf # 総当たりの組み合わせ作成用 import itertools # 並列処理用 from multiprocessing import Pool # パスワードを入力してファイルを開いてみる処理 def FileOpen(pw): # pdfファイルの場所を指定 pdf_file = open(r"D:\Test\test.pdf", 'rb') # ファイルが開けた時のパスワードを表示 pw = ''.join(pw) try: Pdf.open(pdf_file, password=pw) except: pass if __name__ == "__main__": # 並列処理のプロセス数 p = Pool(2) # パスワードの組み合わせ alphanumeric = "abcdefghijklmnopqrstuvwxyz0123456789" # 総当たりの組み合わせを渡す p.map(FileOpen, itertools.product(alphanumeric, repeat=8))
一見問題なさそうですが、どうやらイテレータの使い方が効率的でなかった模様。
↑のイテレータの使い方だと、全ての組み合わせを作ってから並列処理のp.mapに投げてたようです。
で、8文字の英数字の組み合わせなんていう膨大な量の組み合わせを作成したもんだから、メモリがいっぱいになってしまった。ということですね。
じゃあどうすればいいのかというと、答えは簡単で、for文とイテレータを組み合わせれば良いようです。
ちょっと用語がわからないのでコードにしちゃいますが、このようにfor文のinの後ろにイテレータ生成関数を置けばOKです。
こうすることで、組み合わせを1個作る→並列処理に投げるを繰り返すので、先ほどのようなメモリを食い尽くしてしまうことはありません。
for i in イテレータ生成関数: # 並列処理を記載
で、ぶち当たったのが、次の問題です。
for文で並列処理ってどうするのよ?ですね。
色々調べてみたところ、p.apply_asyncを使えばfor文で並列処理ができる模様。
早速作ってみたコードがこちら。
※これも動かすとメモリ使用率100%になると思いますので、ご注意を。
# coding: UTF-8 # PDF読み込み用 from pikepdf import Pdf # 総当たりの組み合わせ作成用 import itertools # 並列処理用 from multiprocessing import Pool # パスワードを入力してファイルを開いてみる処理 def FileOpen(pw): # pdfファイルの場所を指定 pdf_file = open(r"D:\Test\test.pdf", 'rb') # ファイルが開けた時のパスワードを表示 pw = ''.join(pw) try: Pdf.open(pdf_file, password=pw) except: pass if __name__ == "__main__": # 並列処理のプロセス数 p = Pool(2) # パスワードの組み合わせ alphanumeric = "abcdefghijklmnopqrstuvwxyz0123456789" # 総当たりの組み合わせを渡す for i in itertools.product(alphanumeric, repeat=8): result = p.apply_async(FileOpen, (i, )) # プロセスを閉じる p.close() p.join()
p.apply_asyncも使い方はシンプルで、並列にしたい数だけp.apply_asyncを作成すればOKです。
つまり、for文で繰り返しp.apply_asyncすればOKかと早とちりしてしまって↑のコードを作成してしまったわけですが、当然ながらp.apply_asyncの中身の処理よりもfor文を回すスピードが速いと並列処理の処理数が増え続けていってしまいます。
で、メモリが増え続けていった結果100%になってしまう、というわけですね。
ということで、並列処理の中身の処理時間をある程度見積もって定期的にfor文を回すのを待ってあげることで、メモリが100%になってしまうことを防ぐことにしました。
具体的には、以下のような流れを組み込んでいます。
・頭の100回は逐次処理(1個ずつ処理)
・100回の処理にかかった時間を計測
・1回の処理時間を算出(並列処理することも考慮)
計算式としては、「100回の処理時間 / 100回 / 並列処理数」で算出してます。
・定期的に処理を待つ
余りに高頻度でwaitすると並列処理の旨味が薄れそうな気がしたので、今回はテキトーに1秒分の処理が溜まったらwaitするようにしました。
で、できたコードがこちら。
# coding: UTF-8 # PDF読み込み用 from pikepdf import Pdf # 総当たりの組み合わせ作成用 import itertools # 並列処理用 from multiprocessing import Pool # 処理時間の計測用 import datetime # パスワードを入力してファイルを開いてみる処理 def FileOpen(pw): # pdfファイルの場所を指定 pdf_file = open(r"D:\Test\test.pdf", 'rb') # ファイルが開けた時のパスワードを表示 pw = ''.join(pw) try: Pdf.open(pdf_file, password=pw) except: pass if __name__ == "__main__": # 並列処理のプロセス数 num_p = 2 p = Pool(num_p) # パスワードの組み合わせ alphanumeric = "abcdefghijklmnopqrstuvwxyz0123456789" # 総当たりの組み合わせを渡す # 処理回数のカウント用 cnt = 1 # 総当たりの組み合わせを渡す for i in itertools.product(alphanumeric, repeat=8): # 1~100回目まで単発処理して処理時間を記録 if cnt <= 1: # 1回目の処理開始時間を記録 start = datetime.datetime.now() result = p.apply_async(FileOpen, (i, )) result.wait() elif 1 < cnt <= 100: result = p.apply_async(FileOpen, (i, )) result.wait() # 101回目以降は並列処理 else: result = p.apply_async(FileOpen, (i, )) if cnt == 100: # 100回目の処理終了時間を記録 end = datetime.datetime.now() # 1~100回目の処理にかかった時間を算出 start2end = end - start # 1回の処理にかかる時間を算出 wait = start2end.total_seconds() / 100 # 1秒分の処理時間が溜まる処理回数を算出 waitspan = 1 // wait * num_p # 1秒分の処理時間が溜まる度に処理終了を待つ if cnt > 100 and cnt % waitspan == 0: result.wait() # 処理回数をカウント cnt = cnt + 1 # プロセスを閉じる p.close() p.join()
このコードであれば、メモリが増え続けるようなことはないはずです。(多分。。)
これを色んな文字数に対応させたり、並列処理数等の設定がし易いように修正したものが「1. 今回作成したコード」に貼り付けてあるコードです。
素人丸出しのつまづきもありそうですが、似たような壁にぶつかった方もいるかもしれないので、お役に立てれば幸いです。
(初歩的なミスって意外と解決策が見つからないこともありますし。)
コメント
コメントを投稿