Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update for floppsy rev b & add DOS floppy archiver #12

Merged
merged 3 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 172 additions & 29 deletions adafruit_floppy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* Author(s): Jeff Epler
"""

import struct
import floppyio
from digitalio import DigitalInOut, Pull
from micropython import const
Expand Down Expand Up @@ -55,7 +56,7 @@ class Floppy: # pylint: disable=too-many-instance-attributes

_track: typing.Optional[int]

def __init__(
def __init__( # pylint: disable=too-many-locals
self,
*,
densitypin: microcontroller.Pin,
Expand All @@ -72,6 +73,7 @@ def __init__(
wrdatapin: typing.Optional[microcontroller.Pin] = None,
wrgatepin: typing.Optional[microcontroller.Pin] = None,
floppydirectionpin: typing.Optional[microcontroller.Pin] = None,
floppyenablepin: typing.Optional[microcontroller.Pin] = None,
) -> None:
self._density = DigitalInOut(densitypin)
self._density.pull = Pull.UP
Expand Down Expand Up @@ -102,6 +104,10 @@ def __init__(
if self._floppydirection:
self._floppydirection.switch_to_output(True)

self._floppyenable = _optionaldigitalinout(floppyenablepin)
if self._floppyenable:
self._floppyenable.switch_to_output(False)

self._track = None

def _do_step(self, direction, count):
Expand Down Expand Up @@ -156,10 +162,12 @@ def track(self, track: int) -> None:
raise ValueError("Invalid seek to negative track number")

delta = track - self.track
if delta < 0:
self._do_step(_STEP_OUT, -delta)
elif delta > 0:
self._do_step(_STEP_IN, delta)
if delta:
if delta < 0:
self._do_step(_STEP_OUT, -delta)
elif delta > 0:
self._do_step(_STEP_IN, delta)
_sleep_ms(_STEP_DELAY_MS)

self._track = track
self._check_inpos()
Expand Down Expand Up @@ -205,7 +213,7 @@ def side(self) -> int:
def side(self, head: int) -> None:
self._side.value = head == 0

def flux_readinto(self, buf: "circuitpython_typing.WritableBuffer") -> int:
def flux_readinto(self, buf: "circuitpython_typing.WriteableBuffer") -> int:
"""Read flux transition information into the buffer.

The function returns when the buffer has filled, or when the index input
Expand All @@ -222,7 +230,8 @@ def flux_readinto(self, buf: "circuitpython_typing.WritableBuffer") -> int:
class FloppyBlockDevice: # pylint: disable=too-many-instance-attributes
"""Wrap an MFMFloppy object into a block device suitable for `storage.VfsFat`

The default heads/sectors/tracks setting are for 3.5", 1.44MB floppies.
The default is to autodetect the data rate and the geometry of an inserted
floppy using the floppy's "BIOS paramter block"

In the current implementation, the floppy is read-only.

Expand All @@ -243,30 +252,75 @@ class FloppyBlockDevice: # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments
self,
floppy,
heads=2,
sectors=18,
tracks=80,
flux_buffer=None,
t1_nom_ns: float = 1000,
*,
max_sectors=18,
autodetect: bool = True,
heads: int | None = None,
sectors: int | None = None,
tracks: int | None = None,
flux_buffer: circuitpython_typing.WriteableBuffer | None = None,
t1_nom_ns: float | None = None,
keep_selected: bool = False,
):
self.floppy = floppy
self.heads = heads
self.sectors = sectors
self.tracks = tracks
self.flux_buffer = flux_buffer or bytearray(sectors * 12 * 512)
self.track0side0_cache = memoryview(bytearray(sectors * 512))
self.track0side0_validity = bytearray(sectors)
self.track_cache = memoryview(bytearray(sectors * 512))
self.track_validity = bytearray(sectors)
self.flux_buffer = flux_buffer or bytearray(max_sectors * 12 * 512)
self.track0side0_cache = memoryview(bytearray(max_sectors * 512))
self.track_cache = memoryview(bytearray(max_sectors * 512))
self._keep_selected = keep_selected
self.cached_track = -1
self.cached_side = -1

self._t2_5_max = round(2.5 * t1_nom_ns * floppyio.samplerate * 1e-9)
self._t3_5_max = round(3.5 * t1_nom_ns * floppyio.samplerate * 1e-9)
if autodetect:
self.autodetect()
else:
self.setformat(heads, sectors, tracks, t1_nom_ns)

if keep_selected:
self.floppy.selected = True
self.floppy.spin = True

@property
def keep_selected(self) -> bool:
"""Whether to keep the drive selected & spinning between operations

This can make operations faster by avoiding spin up time"""
return self._keep_selected

@keep_selected.setter
def keep_selected(self, value: bool):
self.floppy.selected = value
self.floppy.spin = value

def _select_and_spin(self, value: bool):
if self.keep_selected:
return
self.floppy.selected = value
self.floppy.spin = value

def on_disk_change(self):
"""This function (or autodetect or setformat) must be called after a disk is changed

Flushes the cached floppy data"""

self._track_read(self.track0side0_cache, self.track0side0_validity, 0, 0)

self.cached_track = -1
self.cached_side = -1

def setformat(self, heads, sectors, tracks, t1_nom_ns):
"""Set the floppy format details

This also calls on_disk_change to flush cached floppy data."""
self.heads = heads
self.sectors = sectors
self.tracks = tracks
self._t1_nom_ns = t1_nom_ns
self._t2_5_max = round(2.5 * t1_nom_ns * floppyio.samplerate * 1e-9)
self._t3_5_max = round(3.5 * t1_nom_ns * floppyio.samplerate * 1e-9)
self.track0side0_validity = bytearray(sectors)
self.track_validity = bytearray(sectors)
self.on_disk_change()

def deinit(self):
"""Deinitialize this object"""
self.floppy.deinit()
Expand Down Expand Up @@ -311,22 +365,25 @@ def _get_track_data(self, track, side):
return self.track_cache, self.track_validity

def _track_read(self, track_data, validity, track, side):
self.floppy.selected = True
self.floppy.spin = True
self._select_and_spin(True)
self.floppy.track = track
self.floppy.side = side
self._mfm_readinto(track_data, validity)
self.floppy.spin = False
self.floppy.selected = False
self._select_and_spin(False)
self.cached_track = track
self.cached_side = side

def _mfm_readinto(self, track_data, validity):
n = 0
exc = None
for i in range(5):
self.floppy.flux_readinto(self.flux_buffer)
print("timing bins", self._t2_5_max, self._t3_5_max)
try:
self.floppy.flux_readinto(self.flux_buffer)
except RuntimeError as error:
exc = error
continue
n = floppyio.mfm_readinto(
track_data,
track_data[: 512 * self.sectors],
self.flux_buffer,
self._t2_5_max,
self._t3_5_max,
Expand All @@ -335,3 +392,89 @@ def _mfm_readinto(self, track_data, validity):
)
if n == self.sectors:
break
if n == 0 and exc is not None:
raise exc

def _detect_diskformat_from_flux(self):
sector = self.track_cache[:512]
# The first two numbers are HD and DD rates. The next two are the bit
# rates for 300RPM media read in a 360RPM drive.
for t1_nom_ns in [1_000, 2_000, 8_33, 1_667]:
t2_5_max = round(2.5 * t1_nom_ns * floppyio.samplerate * 1e-9)
t3_5_max = round(3.5 * t1_nom_ns * floppyio.samplerate * 1e-9)

n = floppyio.mfm_readinto(
sector,
self.flux_buffer,
t2_5_max,
t3_5_max,
)

if n == 0:
continue

if sector[510] != 0x55 or sector[511] != 0xAA:
print("did not find boot signature 55 AA")
print(
"First 16 bytes in sector:",
" ".join("%02x" % c for c in sector[:16]),
)
print(
"Final 16 bytes in sector:",
" ".join("%02x" % c for c in sector[-16:]),
)
continue

n_sectors_track = sector[0x18]
n_heads = sector[0x1A]
if n_heads != 2:
print(f"unsupported head count {n_heads=}")
continue
n_sectors_total = struct.unpack("<H", sector[0x13:0x15])[0]
n_tracks = n_sectors_total // (n_heads * n_sectors_track)
f_tracks = n_sectors_total % (n_heads * n_sectors_track)
if f_tracks != 0:
# pylint: disable=line-too-long
print(
f"Dubious geometry! {n_sectors_total=} {n_sectors_track=} {n_heads=} is {n_tracks=}+{f_tracks=}"
)
n_tracks += 1

return {
"heads": n_heads,
"sectors": n_sectors_track,
"tracks": n_tracks,
"t1_nom_ns": t1_nom_ns,
}

def autodetect(self):
"""Detect an inserted DOS floppy

The floppy must have a standard MFM data rate & DOS 2.0 compatible Bios
Parameter Block (BPB). Almost all FAT formatted floppies for DOS & Windows
should autodetect in this way.

This also flushes the cached data.
"""
self._select_and_spin(True)
self.floppy.track = 1
self.floppy.track = 0
self.floppy.side = 0
exc = None
try:
for _ in range(5): # try repeatedly to read track 0 side 0 sector 0
try:
self.floppy.flux_readinto(self.flux_buffer)
except RuntimeError as error:
exc = error
continue
diskformat = self._detect_diskformat_from_flux()
if diskformat is not None:
break
finally:
self._select_and_spin(False)

if diskformat is not None:
self.setformat(**diskformat)
else:
raise OSError("Failed to detect floppy format") from exc
2 changes: 2 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
API Documentation
-----------------

.. If you created a package, create one automodule per module in the package.

Expand Down
104 changes: 104 additions & 0 deletions examples/dos_archiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 Jeff Epler for Adafruit Industries
#
# SPDX-License-Identifier: Unlicense

import os
import sdcardio
import board
import storage
import adafruit_floppy

"""DOS floppy archiver for Adafruit Floppsy

Insert an SD card & hook up your floppy drive.
Open the REPL / serial connection
Insert a floppy and press Enter to archive it
Do this for as many floppies as you like."""

floppy = adafruit_floppy.Floppy(
densitypin=board.DENSITY,
indexpin=board.INDEX,
selectpin=board.SELECT,
motorpin=board.MOTOR,
directionpin=board.DIRECTION,
steppin=board.STEP,
track0pin=board.TRACK0,
protectpin=board.WRPROT,
rddatapin=board.RDDATA,
sidepin=board.SIDE,
readypin=board.READY,
wrdatapin=board.WRDATA,
wrgatepin=board.WRGATE,
floppydirectionpin=board.FLOPPY_DIRECTION,
floppyenablepin=board.FLOPPY_ENABLE,
)

_image_counter = 0


def open_next_image(extension="img"):
"""Return an opened numbered file on the sdcard, such as "img01234.jpg"."""
global _image_counter # pylint: disable=global-statement
try:
os.stat("/sd")
except OSError as exc: # no SD card!
raise RuntimeError("No SD card mounted") from exc
while True:
filename = "/sd/dsk%04d.%s" % (_image_counter, extension)
_image_counter += 1
try:
os.stat(filename)
except OSError:
break
print("Writing to", filename)
return open(filename, "wb")


sdcard = sdcardio.SDCard(board.SPI(), board.SD_CS)
vfs = storage.VfsFat(sdcard)
storage.mount(vfs, "/sd")

dev = None
blockdata = bytearray(512)
baddata = b"BADDATA0" * 64

while True:
if dev is not None:
dev.floppy.keep_selected = False
input("Insert disk and press ENTER")

try:
if dev is None:
dev = adafruit_floppy.FloppyBlockDevice(floppy, keep_selected=True)
else:
dev.floppy.keep_selected = True
dev.autodetect()
except OSError as e:
print(e)
continue

bad_blocks = good_blocks = 0
total_blocks = dev.count()
pertrack = dev.sectors * dev.heads
with open_next_image() as f:
for i in range(total_blocks):
if i % pertrack == 0:
print(end=f"{i//pertrack:02d}")
try:
dev.readblocks(i, blockdata)
print(end=".")
f.write(blockdata)
good_blocks += 1
except Exception as e: # pylint: disable=broad-exception-caught
bad_blocks += 1
print(end="!")
f.write(baddata)
if i % pertrack == (pertrack // 2 - 1):
print(end="|")
if i % pertrack == (pertrack - 1):
print()

print(
f"{good_blocks} good + {bad_blocks} bad blocks",
f"out of {total_blocks} ({total_blocks//2}KiB)",
)